728x90
큰 데이터를 데이터프레임에서 가공 / 조작할 때 속도 개선하고자,
multiprocessing 이용하는 방법을 소개합니다.
1. Import modules
from multiprocessing import Pool, freeze_support, cpu_count
import pandas as pd
import numpy as np
import time
2. Mutiprocessing Apply function
np.array_split()함수로 데이터프레임을 지정한 수(프로세스수)만큼 분리한다.
Pool.map()함수를 이용해서 각 프로세스마다 분할한 데이터 프레임을 인자로 입력하여 인자로 지정한 func 을 실행한다.
여기서 출력된 데이터프레임들을 pd.concat함수로 하나로 합친다.
def dataframe_mp_apply( dataframe, func, partition_count=cpu_count() ):
# 데이터프레임을 지정한 수로 분할한다.
dataframe_split = np.array_split( dataframe, partition_count )
with Pool(processes=partition_count) as mp_pool:
df = pd.concat(mp_pool.map(func, dataframe_split))
return df
3. Customizing apply fuction
df = dataframe_mp_apply(random_points, user_func, 4) : random_points 데이터프레임을 4개로 분할하여 user_func함수를 실행한다.
def user_func( df ): 4개의 프로세스에 각각 할달된 데이터프레임이 user_func을 호출하게 된다.
def user_func( df ):
'''
User Implementation Function
'''
df["total"] = df.a + df.b
return df
def dataframe_multiprocess_apply_test(datasize=1000000):
''' Customizing Multiprocess apply()
'''
start = time.time()
random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
df = dataframe_mp_apply(random_points, user_func, 4)
durations = time.time() - start #Duration times
return durations
if (__name__=="__main__"):
freeze_support()
durtion = dataframe_multiprocess_apply_test(1000000)
print("Duration:", durtion)
Full Code
from multiprocessing import Pool, freeze_support, cpu_count
import pandas as pd
import numpy as np
import time
def dataframe_mp_apply( dataframe, func, partition_count=cpu_count() ):
'''
Arguments :
dataframe : 처리할 데이터 프레임.
func : 처리할 사용자 함수.
partition_count : 분할 처리할 갯수.
'''
# 데이터프레임을 지정한 수로 분할한다.
dataframe_split = np.array_split( dataframe, partition_count )
with Pool(processes=partition_count) as mp_pool:
# 각 프로세스마다 분할한 데이터 프레임을 인자로 입력하여 func 을 실행하고,
# 출력된 데이터프레임을 concat함수로 하나로 합친다.
df = pd.concat(mp_pool.map(func, dataframe_split))
return df
def user_func( df ):
'''
User Implementation Function
'''
df["total"] = df.a + df.b
return df
def dataframe_multiprocess_apply_test(datasize=1000000):
''' Customizing Multiprocess apply()
'''
start = time.time()
random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
df = dataframe_mp_apply(random_points, user_func, 4)
durations = time.time() - start #Duration times
return durations
def dataframe_normal_apply_test(datasize=1000000):
''' Normal apply()
'''
start = time.time()
random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
random_points["total"] = random_points.apply(lambda df: df["a"] + df["b"], axis=1)
#print(random_points)
durations = time.time() - start #Duration times
return durations
#dist = math.hypot(x2-x1, y2-y1)
if (__name__=="__main__"):
freeze_support()
testcases = [1000, 10000, 100000, 1000000, 10000000, 100000000]
mp_results=[]
normal_results=[]
print("*"*60)
print("dataframe_multiprocess_apply_test")
mp_results =list(map(dataframe_multiprocess_apply_test, testcases))
print("Durations :", mp_results)
print("*"*60)
print("dataframe_normal_apply_test")
#duration = dataframe_normal_apply_test()
normal_results =list(map(dataframe_normal_apply_test, testcases))
print("Durations :", normal_results)
Output:
************************************************************
dataframe_multiprocess_apply_test
Durations : [0.9009976387023926, 0.9015190601348877, 0.8859987258911133, 0.9010288715362549, 1.7544605731964111, 12.08374309539795]
************************************************************
dataframe_normal_apply_test
Durations : [0.019000768661499023, 0.13899922370910645, 1.0485196113586426, 9.680866003036499, 97.33059167861938, 1039.6141567230225]
위 코드 기준 + 필자의 컴퓨터 사양에서는 100만 건부터 mutiprocess가 효율을 발휘하는 것을 확인할 수 있었습니다.
연산하는 코드의 복잡도와 데이터량에 따라 효율 측정한 후에 적용해볼 필요가 있습니다.
Data Size | Mutiprocess apply() | Normal apply() |
1000
|
0.90 | 0.02 |
10000
|
0.90 | 0.14 |
100000
|
0.88 | 1.04 |
1000000
|
0.90 | 9.68 |
10000000
|
1.75 | 97.33 |
100000000
|
12.08 | 1039.61 |
다른 대안으로는 multiprocesspandas 모듈을 이용하는 방법이 있습니다.
pip install multiprocesspandas
from multiprocesspandas import applyparallel
def func(x):
return x.mean()
df.apply_parallel(func, num_processes=30, axis=1)
728x90
'Data Science > Python' 카테고리의 다른 글
[python] ML Model Parameter Optimization : GridSearchCV (0) | 2022.09.17 |
---|---|
[colab] Kaggle Dataset Load in Colab (0) | 2022.09.08 |
[python] 그래프에서 두 좌표의 거리 계산 (0) | 2022.09.05 |
[python] Deepcopy a list (array) (0) | 2022.09.01 |
[python] Accelerate Requests Using asyncio (0) | 2022.09.01 |
최근댓글