728x90

큰 데이터를 데이터프레임에서 가공 / 조작할 때 속도 개선하고자,

multiprocessing 이용하는 방법을 소개합니다.

1. Import modules

 

from multiprocessing import Pool, freeze_support, cpu_count
import pandas as pd
import numpy as np
import time

2. Mutiprocessing Apply function 

np.array_split()함수로 데이터프레임을 지정한 수(프로세스수)만큼 분리한다.

Pool.map()함수를 이용해서 각 프로세스마다 분할한 데이터 프레임을 인자로 입력하여 인자로 지정한 func 을 실행한다.

여기서 출력된 데이터프레임들을 pd.concat함수로 하나로 합친다.

def dataframe_mp_apply( dataframe, func, partition_count=cpu_count() ):
    # 데이터프레임을 지정한 수로 분할한다.
    dataframe_split = np.array_split( dataframe, partition_count )

    with Pool(processes=partition_count) as mp_pool:
        df = pd.concat(mp_pool.map(func, dataframe_split))
    return df

3. Customizing apply fuction

df = dataframe_mp_apply(random_points, user_func, 4) : random_points 데이터프레임을 4개로 분할하여 user_func함수를 실행한다. 

def user_func( df ): 4개의 프로세스에 각각 할달된 데이터프레임이 user_func을 호출하게 된다. 

def user_func( df ):
    '''
    User Implementation Function 
    '''
    df["total"] = df.a + df.b
    return df
    
def dataframe_multiprocess_apply_test(datasize=1000000):
    ''' Customizing Multiprocess apply()
    '''
    start = time.time()
    random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
    df = dataframe_mp_apply(random_points, user_func, 4)
    durations = time.time() - start #Duration times
    return durations

if (__name__=="__main__"):
    freeze_support()
	durtion = dataframe_multiprocess_apply_test(1000000)
    print("Duration:", durtion)

 

 

Full Code 

from multiprocessing import Pool, freeze_support, cpu_count
import pandas as pd
import numpy as np
import time

def dataframe_mp_apply( dataframe, func, partition_count=cpu_count() ):
    '''

        Arguments :
            dataframe : 처리할 데이터 프레임.
            func : 처리할 사용자 함수.
            partition_count : 분할 처리할 갯수.
    '''
    # 데이터프레임을 지정한 수로 분할한다.
    dataframe_split = np.array_split( dataframe, partition_count )

    with Pool(processes=partition_count) as mp_pool:
        # 각 프로세스마다 분할한 데이터 프레임을 인자로 입력하여 func 을 실행하고, 
        # 출력된 데이터프레임을 concat함수로 하나로 합친다. 
        df = pd.concat(mp_pool.map(func, dataframe_split))
    return df

def user_func( df ):
    '''
    User Implementation Function 
    '''
    df["total"] = df.a + df.b
    return df 
    
def dataframe_multiprocess_apply_test(datasize=1000000):
    ''' Customizing Multiprocess apply()
    '''
    start = time.time()
    random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
    df = dataframe_mp_apply(random_points, user_func, 4)
    durations = time.time() - start #Duration times
    return durations

def dataframe_normal_apply_test(datasize=1000000):
    ''' Normal apply()
    '''
    start = time.time()
    random_points = pd.DataFrame( data={"a":np.random.random_sample(size=datasize, ), "b":np.random.random_sample(size=datasize, )} )
    random_points["total"] = random_points.apply(lambda df:  df["a"] + df["b"], axis=1)
    #print(random_points)
    durations = time.time() - start #Duration times
    return durations

#dist = math.hypot(x2-x1, y2-y1)

if (__name__=="__main__"):
    freeze_support()
    testcases = [1000, 10000, 100000, 1000000, 10000000, 100000000]
    mp_results=[]
    normal_results=[]

    print("*"*60)
    print("dataframe_multiprocess_apply_test")
    mp_results =list(map(dataframe_multiprocess_apply_test, testcases))
    print("Durations :", mp_results)  

    print("*"*60)
    print("dataframe_normal_apply_test")
    #duration = dataframe_normal_apply_test()
    normal_results =list(map(dataframe_normal_apply_test, testcases))
    print("Durations :", normal_results)

Output: 

************************************************************
dataframe_multiprocess_apply_test
Durations : [0.9009976387023926, 0.9015190601348877, 0.8859987258911133, 0.9010288715362549, 1.7544605731964111, 12.08374309539795]
************************************************************
dataframe_normal_apply_test
Durations : [0.019000768661499023, 0.13899922370910645, 1.0485196113586426, 9.680866003036499, 97.33059167861938, 1039.6141567230225]

 

위 코드 기준 + 필자의 컴퓨터 사양에서는 100만 건부터 mutiprocess가 효율을 발휘하는 것을 확인할 수 있었습니다. 

연산하는 코드의 복잡도와 데이터량에 따라 효율 측정한 후에 적용해볼 필요가 있습니다. 

Data Size Mutiprocess apply() Normal apply()
1000
0.90 0.02
10000
0.90 0.14
100000
0.88 1.04
1000000
0.90 9.68
10000000
1.75 97.33
100000000
12.08 1039.61

 

다른 대안으로는 multiprocesspandas 모듈을 이용하는 방법이 있습니다. 

pip install multiprocesspandas

 

from multiprocesspandas import applyparallel

def func(x):
	return x.mean()

df.apply_parallel(func, num_processes=30, axis=1)

 

source : https://unsplash.com/photos/UxDU0Gg5pqQ

 

728x90
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기
반응형