Duck DB + Python 코드를 이용해서 대용량 데이터를 처리하는 방법을 소개합니다.
DuckDB란?
DuckDB는 임베디드 분석 데이터베이스로, 대규모 데이터 처리를 로컬 환경에서도 빠르고 효율적으로 수행할 수 있도록 설계된 시스템입니다. SQLite와 비슷하게 작동하지만, 주로 데이터 분석 워크로드에 최적화되어 있습니다. 이 시스템은 파이썬, R 등 다양한 언어와 통합되며, OLAP(Online Analytical Processing) 쿼리에 특화되어 있어 데이터 사이언스와 분석에 적합합니다.
<관련 포스팅> [DuckDB] what is DuckDB
사전준비
1) 테스트용 대용량 데이터 생성
다음 코드를 이용하여 테스트용 대용량 CSV을 생성합니다.
이 코드는 실행하면 숫자값만 포함된 컬럼이 100개인 1천만 행의 데이터를 만들고, 약 3.6GB 의 크기의 CSV 파일로 저장합니다.
<CSV 파일 포맷>
v1 ~ v99 의 100개 컬럼 : 각 컬럼에는 0~1000사이의 숫자가 랜덤으로 기록되어 있음.
import math
import pandas
import os
def generate_data(rows, cols):
return pd.DataFrame(np.random.randint(0, 1000, size=(rows, cols)), columns=[f"v{num}" for num in range(cols)])
def generate_csv(output, rows=10, cols=1, chunksize=1000000):
if rows > chunksize:
# Pandas 메모리 이슈로 인해서 분할 처리함.
partitions = math.ceil(rows / chunksize)
mod_rows = (rows % chunksize)
for n in range(partitions):
header = not os.path.exists(output)
partition_rows = mod_rows if n == (partitions-1) and mod_rows>0 else chunksize
print(f"partition={n} / generation rows={partition_rows}")
df = generate_data(partition_rows, cols)
df.to_csv(output, mode="a", index=False, header=header)
else:
header = not os.path.exists(output)
df = generate_dumidata(rows, cols)
df.to_csv(output, index=False, header=header)
ROWS = 10**7 # 생성할 행수
COLS = 100 # 생성할 데이터의 컬럼수.
CHUNKSIZE = 10**5 # 분할처리할 행수.
OUTPUT_FILE = "./largedata.csv" # 출력한 CSV파일
if os.path.exists(OUTPUT_FILE):
os.remove(OUTPUT_FILE)
print(f"`{OUTPUT_FILE}` removed.")
generate_csv(OUTPUT_FILE, ROWS, COLS, CHUNKSIZE)
file_size = os.path.getsize(OUTPUT_FILE) / 1024 / 1024 / 1024
print( f"maked data : {OUTPUT_FILE} ({round(file_size,1)} GB)" )
2) duckdb 설치
duckdb를 모듈을 설치합니다.
pip install duckdb
데이터 분석
1) 데이터(csv) 불러오기
사전 준비 과정에서 생성한 python duckdb.sql 함수를 이용해서 duckdb SQL 문을 실행합니다.
SELECT * FROM read_csv() 함수를 이용해서 csv 파일을 불러옵니다.
pandas에서는 dataframe에서 큰 사이즈의 csv파일을 불러올 때 chucksize 지정해서 불러옵니다. 이렇게 하더라도 읽는데 소요되는 시간이나 메모리 문제가 발생할 수 있습니다. 하지만, duckdb에서는 큰 파일을 불러오는 방법이 아주 간편합니다.
import duckdb
from time import time
start = time()
output_file = "./largedata.csv"
duck_df = duckdb.sql(f"SELECT * FROM read_csv('{output_file}')")
end = time()
print(f"durations : {end-start} s")
해당 csv파일 일반적인 형식으로 별도에 옵션을 지정하지 않았습니다. 구분자나 특정 컬럼만 선택해서 불러오는 등의 특별한 케이스라면 Duckdb API문서를 참고해주시기 바랍니다.
duckdb 의 read_csv 함수에 대해서 문서를 참고 바랍니다.
SELECT *
FROM read_csv( 'flights.csv',
delim = '|',
header = true,
columns = { 'FlightDate': 'DATE',
'UniqueCarrier': 'VARCHAR',
'OriginCityName': 'VARCHAR',
'DestCityName': 'VARCHAR'
}
);
`read_csv` function Parameters
Name | Description | Type | Default |
all_varchar | Option to skip type detection for CSV parsing and assume all columns to be of type VARCHAR. This option is only supported by the read_csv function. | BOOL | false |
allow_quoted_nulls | Option to allow the conversion of quoted values to NULL values | BOOL | true |
auto_detect | Enables auto detection of CSV parameters. | BOOL | true |
auto_type_candidates | This option allows you to specify the types that the sniffer will use when detecting CSV column types. The VARCHAR type is always included in the detected types (as a fallback option). See example. | TYPE[] | default types |
columns | A struct that specifies the column names and column types contained within the CSV file (e.g., {'col1': 'INTEGER', 'col2': 'VARCHAR'}). Using this option implies that auto detection is not used. | STRUCT | (empty) |
compression | The compression type for the file. By default this will be detected automatically from the file extension (e.g., t.csv.gz will use gzip, t.csv will use none). Options are none, gzip, zstd. | VARCHAR | auto |
dateformat | Specifies the date format to use when parsing dates. See Date Format. | VARCHAR | (empty) |
decimal_separator | The decimal separator of numbers. | VARCHAR | . |
delimiter | Specifies the delimiter character that separates columns within each row (line) of the file. Alias for sep. This option is only available in the COPY statement. | VARCHAR | , |
delim | Specifies the delimiter character that separates columns within each row (line) of the file. Alias for sep. | VARCHAR | , |
escape | Specifies the string that should appear before a data character sequence that matches the quote value. | VARCHAR | " |
filename | Whether or not an extra filename column should be included in the result. | BOOL | false |
force_not_null | Do not match the specified columns' values against the NULL string. In the default case where the NULL string is empty, this means that empty values will be read as zero-length strings rather than NULLs. | VARCHAR[] | [] |
header | Specifies that the file contains a header line with the names of each column in the file. | BOOL | false |
hive_partitioning | Whether or not to interpret the path as a Hive partitioned path. | BOOL | false |
ignore_errors | Option to ignore any parsing errors encountered – and instead ignore rows with errors. | BOOL | false |
max_line_size | The maximum line size in bytes. | BIGINT | 2097152 |
names | The column names as a list, see example. | VARCHAR[] | (empty) |
new_line | Set the new line character(s) in the file. Options are '\r','\n', or '\r\n'. Note that the CSV parser only distinguishes between single-character and double-character line delimiters. Therefore, it does not differentiate between '\r' and '\n'. | VARCHAR | (empty) |
normalize_names | Boolean value that specifies whether or not column names should be normalized, removing any non-alphanumeric characters from them. | BOOL | false |
null_padding | If this option is enabled, when a row lacks columns, it will pad the remaining columns on the right with null values. | BOOL | false |
nullstr | Specifies the string that represents a NULL value or (since v0.10.2) a list of strings that represent a NULL value. | VARCHAR or VARCHAR[] | (empty) |
parallel | Whether or not the parallel CSV reader is used. | BOOL | true |
quote | Specifies the quoting string to be used when a data value is quoted. | VARCHAR | " |
sample_size | The number of sample rows for auto detection of parameters. | BIGINT | 20480 |
sep | Specifies the delimiter character that separates columns within each row (line) of the file. Alias for delim. | VARCHAR | , |
skip | The number of lines at the top of the file to skip. | BIGINT | 0 |
timestampformat | Specifies the date format to use when parsing timestamps. See Date Format. | VARCHAR | (empty) |
types or dtypes | The column types as either a list (by position) or a struct (by name). Example here. | VARCHAR[] or STRUCT | (empty) |
union_by_name | Whether the columns of multiple schemas should be unified by name, rather than by position. Note that using this option increases memory consumption. | BOOL | false |
Read multiple files
FlightDate|OriginCityName|DestCityName
1988-01-01|New York, NY|Los Angeles, CA
1988-01-02|New York, NY|Los Angeles, CA
FlightDate|UniqueCarrier|OriginCityName|DestCityName
1988-01-03|AA|New York, NY|Los Angeles, CA
SELECT * FROM read_csv(['flights3.csv', 'flights4.csv'], union_by_name = true);
Other case
SELECT *
FROM 'dir/*.csv';
SELECT *
FROM '*/*/*.csv';
SELECT *
FROM glob('*');
2) 집계 쿼리 실행
duckdb.query() 함수를 이용하면 duckdb dataframe, pandas datafrarme, polars dataframe과 통합할 수 있습니다.
duckdb python 모듈은 duckdb.sql("select * from read_csv(...)") 함수에서 반환된 duckdb dataframe(`duck_df`)를 FROM절에 그대로 사용하여 데이터를 조회할 수 있습니다.
다음 코드는 v1 값이 0~500 인 행의 v2의 평균값의 구합니다.
start = time()
groupby_duck_df = duckdb.query("SELECT v1, avg(v2) v2 FROM duck_df WHERE v1 BETWEEN 0 AND 500 GROUP BY v1 ")
end = time()
print(f"durations : {end-start}s")
groupby_duck_df.show()
집계 결과를 pandas dataframe으로 변환합니다.
summary_df = groupby_duck_df.df()
pandas dataframe plot함수를 이용해 v2의 평균값의 분포를 확인합니다.
start = time()
summary_df["v2"].plot.hist()
end = time()
print(f"duration : {end-start}s")
<테스트 결과 >
위 테스트를 Kaggle에서 진행되었습니다.
3.7gb 파일을 불러오는 것도 0.2초만 되고,
groupby 연산도 0.2초만에 결과를 확인할 수 있었습니다.
duckdb dataframe을 pandas dataframe변환하거나, 데이터 표시할 때는 2.5초 가량이 시간이 소요되었습니다.
pandas datafame에서 할수 없는 것들이 가능했습니다.
'Data Science > Python' 카테고리의 다른 글
shell command within python (0) | 2024.06.21 |
---|---|
Pandas의 한계를 극복한 5가지 라이브러리: Dask, Vaex, Modin, Cudf, Polars (0) | 2023.09.21 |
[Image Analysis] Completely Black or White Image Detection (0) | 2023.07.27 |
[Python] Transfer Pandas Dataframe to MYSQL database with SSH (0) | 2023.02.22 |
[python] Chrome web-driver options : speed up page loading. (0) | 2023.01.31 |
최근댓글