728x90

Duck DB + Python 코드를 이용해서 대용량 데이터를 처리하는 방법을 소개합니다.

DuckDB란?

DuckDB는 임베디드 분석 데이터베이스로, 대규모 데이터 처리를 로컬 환경에서도 빠르고 효율적으로 수행할 수 있도록 설계된 시스템입니다. SQLite와 비슷하게 작동하지만, 주로 데이터 분석 워크로드에 최적화되어 있습니다. 이 시스템은 파이썬, R 등 다양한 언어와 통합되며, OLAP(Online Analytical Processing) 쿼리에 특화되어 있어 데이터 사이언스와 분석에 적합합니다.

<관련 포스팅> [DuckDB] what is DuckDB

(source : duckdb blog)

 

 

사전준비

1) 테스트용 대용량 데이터 생성

다음 코드를 이용하여 테스트용 대용량  CSV을 생성합니다.

 

이 코드는 실행하면 숫자값만 포함된 컬럼이 100개인 1천만 행의 데이터를 만들고, 약 3.6GB 의 크기의 CSV 파일로 저장합니다.

 

<CSV 파일 포맷>

v1 ~ v99 의 100개 컬럼 : 각 컬럼에는 0~1000사이의 숫자가 랜덤으로 기록되어 있음.

import math
import pandas
import os 

def generate_data(rows, cols):
    return pd.DataFrame(np.random.randint(0, 1000, size=(rows, cols)), columns=[f"v{num}" for num in range(cols)])

def generate_csv(output, rows=10, cols=1, chunksize=1000000):
    if rows > chunksize:
        # Pandas 메모리 이슈로 인해서 분할 처리함. 
        partitions = math.ceil(rows / chunksize)
        mod_rows = (rows % chunksize)
        for n in range(partitions):
            header = not os.path.exists(output)
            partition_rows = mod_rows if n == (partitions-1) and mod_rows>0 else chunksize
            print(f"partition={n} / generation rows={partition_rows}")
            df = generate_data(partition_rows, cols)
            df.to_csv(output, mode="a", index=False, header=header)    
    else:
        header = not os.path.exists(output)
        df = generate_dumidata(rows, cols)
        df.to_csv(output, index=False, header=header)

ROWS = 10**7        # 생성할 행수
COLS = 100          # 생성할 데이터의 컬럼수.
CHUNKSIZE = 10**5   # 분할처리할 행수.
OUTPUT_FILE = "./largedata.csv"  # 출력한 CSV파일
if os.path.exists(OUTPUT_FILE):
    os.remove(OUTPUT_FILE)
    print(f"`{OUTPUT_FILE}` removed.")
    
generate_csv(OUTPUT_FILE, ROWS, COLS, CHUNKSIZE)

file_size = os.path.getsize(OUTPUT_FILE) / 1024 / 1024 / 1024
print( f"maked data : {OUTPUT_FILE} ({round(file_size,1)} GB)" )

 

2) duckdb 설치

duckdb를 모듈을 설치합니다.

pip install duckdb

 

데이터 분석 

1) 데이터(csv) 불러오기 

사전 준비 과정에서 생성한 python duckdb.sql 함수를 이용해서 duckdb SQL 문을 실행합니다.
SELECT * FROM read_csv() 함수를 이용해서 csv 파일을 불러옵니다.

pandas에서는 dataframe에서 큰 사이즈의 csv파일을 불러올 때 chucksize 지정해서 불러옵니다. 이렇게 하더라도 읽는데 소요되는 시간이나 메모리 문제가 발생할 수 있습니다. 하지만, duckdb에서는 큰 파일을 불러오는 방법이 아주 간편합니다. 

 

import duckdb 
from time import time

start = time()
output_file = "./largedata.csv"
duck_df = duckdb.sql(f"SELECT * FROM read_csv('{output_file}')")
end = time()

print(f"durations : {end-start} s")

해당 csv파일 일반적인 형식으로 별도에 옵션을 지정하지 않았습니다. 구분자나 특정 컬럼만 선택해서 불러오는 등의 특별한 케이스라면 Duckdb API문서를 참고해주시기 바랍니다. 

더보기

duckdb 의 read_csv 함수에 대해서 문서를 참고 바랍니다.

CSV Import – DuckDB

SELECT * 
FROM read_csv( 'flights.csv', 
		delim = '|', 
                header = true, 
                columns = { 'FlightDate': 'DATE',
                            'UniqueCarrier': 'VARCHAR', 
                            'OriginCityName': 'VARCHAR', 
                            'DestCityName': 'VARCHAR' 
                          }
                );

 

`read_csv` function Parameters
Name Description Type Default
all_varchar Option to skip type detection for CSV parsing and assume all columns to be of type VARCHAR. This option is only supported by the read_csv function. BOOL false
allow_quoted_nulls Option to allow the conversion of quoted values to NULL values BOOL true
auto_detect Enables auto detection of CSV parameters. BOOL true
auto_type_candidates This option allows you to specify the types that the sniffer will use when detecting CSV column types. The VARCHAR type is always included in the detected types (as a fallback option). See example. TYPE[] default types
columns A struct that specifies the column names and column types contained within the CSV file (e.g., {'col1': 'INTEGER', 'col2': 'VARCHAR'}). Using this option implies that auto detection is not used. STRUCT (empty)
compression The compression type for the file. By default this will be detected automatically from the file extension (e.g., t.csv.gz will use gzip, t.csv will use none). Options are none, gzip, zstd. VARCHAR auto
dateformat Specifies the date format to use when parsing dates. See Date Format. VARCHAR (empty)
decimal_separator The decimal separator of numbers. VARCHAR .
delimiter Specifies the delimiter character that separates columns within each row (line) of the file. Alias for sep. This option is only available in the COPY statement. VARCHAR ,
delim Specifies the delimiter character that separates columns within each row (line) of the file. Alias for sep. VARCHAR ,
escape Specifies the string that should appear before a data character sequence that matches the quote value. VARCHAR "
filename Whether or not an extra filename column should be included in the result. BOOL false
force_not_null Do not match the specified columns' values against the NULL string. In the default case where the NULL string is empty, this means that empty values will be read as zero-length strings rather than NULLs. VARCHAR[] []
header Specifies that the file contains a header line with the names of each column in the file. BOOL false
hive_partitioning Whether or not to interpret the path as a Hive partitioned path. BOOL false
ignore_errors Option to ignore any parsing errors encountered – and instead ignore rows with errors. BOOL false
max_line_size The maximum line size in bytes. BIGINT 2097152
names The column names as a list, see example. VARCHAR[] (empty)
new_line Set the new line character(s) in the file. Options are '\r','\n', or '\r\n'. Note that the CSV parser only distinguishes between single-character and double-character line delimiters. Therefore, it does not differentiate between '\r' and '\n'. VARCHAR (empty)
normalize_names Boolean value that specifies whether or not column names should be normalized, removing any non-alphanumeric characters from them. BOOL false
null_padding If this option is enabled, when a row lacks columns, it will pad the remaining columns on the right with null values. BOOL false
nullstr Specifies the string that represents a NULL value or (since v0.10.2) a list of strings that represent a NULL value. VARCHAR or VARCHAR[] (empty)
parallel Whether or not the parallel CSV reader is used. BOOL true
quote Specifies the quoting string to be used when a data value is quoted. VARCHAR "
sample_size The number of sample rows for auto detection of parameters. BIGINT 20480
sep Specifies the delimiter character that separates columns within each row (line) of the file. Alias for delim. VARCHAR ,
skip The number of lines at the top of the file to skip. BIGINT 0
timestampformat Specifies the date format to use when parsing timestamps. See Date Format. VARCHAR (empty)
types or dtypes The column types as either a list (by position) or a struct (by name). Example here. VARCHAR[] or STRUCT (empty)
union_by_name Whether the columns of multiple schemas should be unified by name, rather than by position. Note that using this option increases memory consumption. BOOL false

Read multiple files 

flights3.csv:

FlightDate|OriginCityName|DestCityName
1988-01-01|New York, NY|Los Angeles, CA
1988-01-02|New York, NY|Los Angeles, CA

flights4.csv:

FlightDate|UniqueCarrier|OriginCityName|DestCityName
1988-01-03|AA|New York, NY|Los Angeles, CA

 

SELECT * FROM read_csv(['flights3.csv', 'flights4.csv'], union_by_name = true);

 

 

Other case

SELECT *
FROM 'dir/*.csv';
SELECT *
FROM '*/*/*.csv';
SELECT *
FROM glob('*');

2) 집계 쿼리 실행

duckdb.query() 함수를 이용하면 duckdb dataframe, pandas datafrarme, polars dataframe과 통합할 수 있습니다. 

duckdb python 모듈은 duckdb.sql("select * from read_csv(...)") 함수에서 반환된  duckdb dataframe(`duck_df`)를 FROM절에 그대로 사용하여 데이터를 조회할 수 있습니다. 

 

다음 코드는 v1 값이 0~500 인 행의 v2의 평균값의 구합니다. 

start = time()
groupby_duck_df = duckdb.query("SELECT v1, avg(v2) v2 FROM duck_df WHERE v1 BETWEEN 0 AND 500 GROUP BY v1 ")
end = time()
print(f"durations : {end-start}s")
groupby_duck_df.show()

 

집계 결과를 pandas dataframe으로 변환합니다. 

summary_df = groupby_duck_df.df()

 

pandas dataframe plot함수를 이용해 v2의 평균값의 분포를 확인합니다. 

start = time()
summary_df["v2"].plot.hist()
end = time()
print(f"duration : {end-start}s")

duckdb 대용량 처리 - v2 분포 확인.

<테스트 결과 >

위 테스트를 Kaggle에서 진행되었습니다. 

3.7gb 파일을 불러오는 것도 0.2초만 되고,

groupby 연산도 0.2초만에 결과를 확인할 수 있었습니다. 

duckdb dataframe을 pandas dataframe변환하거나, 데이터 표시할 때는 2.5초 가량이 시간이 소요되었습니다. 

pandas datafame에서 할수 없는 것들이 가능했습니다. 

 

 

728x90
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기
반응형