728x90

개요

웹에서 수집한 RAW-DATA(CSV)를 읽어서 가공하고 DB화하는 내용입니다. 

데이터는 주식 정보를 제공하는 사이트인 finviz.com의 Screener 메뉴에서 제공하는 주식 데이터가 CSV로 존재한다는 전제로 CSV파일을 Python코드로 가공하여 DB(MYSQL)화하는 ETL(Extract-Transfer-Load) 실전 예제입니다.

특히, 데이터 가공을 Python의 Pandas가 아닌 테이블로 로드하여 SQL로 가공하는 기법을 소개하는 내용입니다.

finviz.com screener ( source : finviz.com )  ( https://dadev.tistory.com/ )

예제에서 사용하는 RAW-DATA입니다. 다운로드 받어서 적절한 위치로 복사해주시면 됩니다. 

샘플데이터 : 

fv_screener_sampledata.csv
0.61MB

다음 Jupyter Notebook에서 샘플 데이터를 확인하는 코드입니다. 

더보기
import os
import sys
import pandas as pd

#CSV 파일의 위치를 지정한다.
targetfile = "D:\\pyproject\\data\\fv_screener_sampledata.csv"

sourceData = pd.read_csv(targetfile, index_col=False)
sourceData.head()
df_rawdata = sourceData  ##.head()
df_rawdata.head()

df_rawdata.drop([df_rawdata.columns[0]], axis=1, inplace=True)

df_rawdata.fillna(0, inplace=True)
df_rawdata
fv_screener_sampledata.csv load example (  https://dadev.tistory.com/ )

 

데이터 처리 플로우

fv_screener_sampledata.csv   

                     --> superstock.finviz_rawdata 

                                            --> superstock.stockfactor --> superstock.tags

                                            --> superstock.stockfactor_history   

 

 

그럼 본격적으로 데이터 처리 과정 및 구현 코드를 살펴보겠습니다. 

<데이터 처리 과정>

1. Create databases.

2. Install Python Mysql-connector & Import.

3. Define db(mysql) connect function.

4. Define sql execute function.

5. Define migration sql.

6. Define data process.
7. Execute.

1. Create databases

다음 쿼리를 실행하여 DB를 생성한다. 

create_database.sql
0.01MB

  • common.FN_ShortValueToNumber() : "1K"와 같은 단축 표기 형식을 숫자 타입인 1000로 변화하는 함수로 데이터 가공할 때 활용되는 함수입니다. 
  • superstock.finviz_rawdata : csv 파일을 1차 가공한 데이터를 관리하는 테이블
  • superstock.stockfactor : csv 파일을 최종 가공한 데이터를 관리하는 테이블이며, 가장 최신의 주식 종목정보를 유지하는 테이블.
  • superstock.stockfactor_history : csv 파일을 최종 가공한 데이터의 히스토리를 관리하는 테이블.
  • symbole : 미사용
  • tags : 태그값은 특정 컬럼의 태그값을 정의하는 테이블. (예) Sector 컬럼에 "Technology", "Heathcare".... 

Python 실전 ETL - Database 구조

2. Install python mysql-connector & import

pip install mysql-connector-python
import os
import sys
import pandas as pd
import mysql.connector as msql
from mysql.connector import Error

 

3. Define db(mysql) connect function 

# DB Connection config.
conn_params_dic = {
    "host"      : "localhost",
    "database"  : "superstock",
    "user"      : "yourusername",
    "password"  : "yourpass"
}

# Mysql DB에 접속하는 함수로 Connection 객체를 반환합니다.
def connect(conn_params_dic):
    conn = None
    try:
        print('Connecting to the MySQL...........')
        conn = msql.connect(**conn_params_dic)
        print("Connection successfully..................")
        
    except Error as err:
        print("Error while connecting to MySQL", err)      
        # set the connection to 'None' in case of error
        conn = None
    return conn

 

4. Define sql execute function 

특히 cursor.executemany 함수를 통해서 많은 행을 효과적으로 INSERT하는 기법을 소개합니다. 

# dataframe을 cursor.executemany()로 INSERT하는 함수
def execute_many(conn, datafrm, table):
    
    # Creating a list of tupples from the dataframe values
    tpls = [tuple(x) for x in datafrm.to_numpy()]
    
    # dataframe columns with Comma-separated
    cols = '`,`'.join(list(datafrm.columns))
    cols = "`"+cols+"`"
    
    params="%s" + ",%s"*(len(df_rawdata.columns)-1)

    # SQL query to execute
    #sql = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s)" % (table, cols)
    sql = "INSERT INTO %s(%s) VALUES(%s)" % (table, cols, params)
    print("SQL:", sql)
    cursor = conn.cursor()
    try:
        cursor.executemany(sql, tpls)
        conn.commit()
        print("Data inserted using execute_many() successfully...")
    except Error as e:
        print("Error while inserting to MySQL", e)
        cursor.close()

# cursor.execute()로 SQL을 실행하는 함수
def execute(conn, sql, multistmt=False):
    cursor = conn.cursor()
    print("SQL:", sql)
    try:
        cursor.execute(sql, multi=multistmt)
        conn.commit()
        print("execute() successfully...")
    except Error as e:
        print("Error while executing to MySQL", e)
        cursor.close()

 

5. Define migration sql

sql_dict={
    "stockfactor_bulkinsert" : """INSERT INTO `stockfactor`
        SELECT	CAST(`idt` AS DATE)					AS base_date
            ,		A.`No.`				AS `No`
            ,		CAST(A.`Ticker` 	AS VARCHAR(10))	AS `Ticker`
            ,		CAST(A.`Company`	AS VARCHAR(300))	AS `Company`
            ,		CAST(A.`Sector`	AS VARCHAR(100))	AS `Sector`
            ,		CAST(A.`Industry`	AS VARCHAR(100))	AS `Industry`
            ,		CAST(A.`Country`	AS VARCHAR(50))	AS `Country`
            ,		common.FN_ShortValueToNumber(A.`Market Cap`)			AS `Market_Cap`
            ,		A.`P/E`				AS `PE`
            ,		A.`Fwd P/E`			AS `Fwd_PE`
            ,		A.`PEG`				AS `PEG`
            ,		A.`P/S`				AS `PS`
            ,		A.`P/B`				AS `PB`
            ,		A.`P/C`				AS `PC`
            ,		A.`P/FCF`			AS `PFCF`
            ,		common.FN_ShortValueToNumber(A.`Dividend`)			AS `Dividend`
            ,		common.FN_ShortValueToNumber(A.`Payout Ratio`)		AS `Payout_Ratio`
            ,		A.`EPS`				AS `EPS`
            ,		common.FN_ShortValueToNumber(A.`EPS this Y`)			AS `EPS_this_Y`
            ,		common.FN_ShortValueToNumber(A.`EPS next Y`)			AS `EPS_next_Y`
            ,		common.FN_ShortValueToNumber(A.`EPS past 5Y`)		AS `EPS_past_5Y`
            ,		common.FN_ShortValueToNumber(A.`EPS next 5Y`)		AS `EPS_next_5Y`
            ,		common.FN_ShortValueToNumber(A.`Sales past 5Y`)		AS `Sales_past_5Y`
            ,		common.FN_ShortValueToNumber(A.`EPS Q/Q`)				AS `EPS_QoQ`
            ,		common.FN_ShortValueToNumber(A.`Sales Q/Q`)			AS `Sales_QoQ`
            ,		common.FN_ShortValueToNumber(A.`Outstanding`)		AS `Outstanding`
            ,		common.FN_ShortValueToNumber(A.`Float`)				AS `Float`
            ,		common.FN_ShortValueToNumber(A.`Insider Own`)		AS `Insider_Own`
            ,		common.FN_ShortValueToNumber(A.`Insider Trans`)		AS `Insider_Trans`
            ,		common.FN_ShortValueToNumber(A.`Inst Own`)			AS `Inst_Own`
            ,		common.FN_ShortValueToNumber(A.`Inst Trans`)			AS `Inst_Trans`
            ,		common.FN_ShortValueToNumber(A.`Float Short`)		AS `Float_Short`
            ,		A.`Short Ratio`			AS `Short_Ratio`
            ,		common.FN_ShortValueToNumber(A.`ROA`)			AS `ROA`
            ,		common.FN_ShortValueToNumber(A.`ROE`)			AS `ROE`
            ,		common.FN_ShortValueToNumber(A.`ROI`)			AS `ROI`
            ,		A.`Curr R`			AS `Curr_R`
            ,		A.`Quick R`			AS `Quick_R`
            ,		A.`LTDebt/Eq`			AS `LTDebtPerEq`
            ,		A.`Debt/Eq`			AS `DebtPerEq`
            ,		common.FN_ShortValueToNumber(A.`Gross M`)			AS `Gross_M`
            ,		common.FN_ShortValueToNumber(A.`Oper M`)			AS `Oper_M`
            ,		common.FN_ShortValueToNumber(A.`Profit M`)			AS `Profit_M`
            ,		common.FN_ShortValueToNumber(A.`Perf Week`)			AS `Perf_Week`
            ,		common.FN_ShortValueToNumber(A.`Perf Month`)			AS `Perf_Month`
            ,		common.FN_ShortValueToNumber(A.`Perf Quart`)			AS `Perf_Quart`
            ,		common.FN_ShortValueToNumber(A.`Perf Half`)			AS `Perf_Half`
            ,		common.FN_ShortValueToNumber(A.`Perf Year`)			AS `Perf_Year`
            ,		common.FN_ShortValueToNumber(A.`Perf YTD`)			AS `Perf_YTD`
            ,		A.`Beta`			AS `Beta`
            ,		A.`ATR`			AS `ATR`
            ,		common.FN_ShortValueToNumber(A.`Volatility W`)	AS `Volatility_W`
            ,		common.FN_ShortValueToNumber(A.`Volatility M`)	AS `Volatility_M`
            ,		common.FN_ShortValueToNumber(A.`SMA20`)			AS `SMA20`
            ,		common.FN_ShortValueToNumber(A.`SMA50`)			AS `SMA50`
            ,		common.FN_ShortValueToNumber(A.`SMA200`)			AS `SMA200`
            ,		common.FN_ShortValueToNumber(A.`50D High`)		AS `50D_High`
            ,		common.FN_ShortValueToNumber(A.`50D Low`)			AS `50D_Low`
            ,		common.FN_ShortValueToNumber(A.`52W High`)		AS `52W_High`
            ,		common.FN_ShortValueToNumber(A.`52W Low`)			AS `52W_Low`
            ,		A.`RSI`			AS `RSI`
            ,		common.FN_ShortValueToNumber(A.`from Open`)			AS `from_Open`
            ,		common.FN_ShortValueToNumber(A.`Gap`)			AS `Gap`
            ,		A.`Recom`			AS `Recom`
            ,		common.FN_ShortValueToNumber(A.`Avg Volume`)			AS `Avg_Volume`
            ,		A.`Rel Volume`			AS `Rel_Volume`
            ,		A.`Price`			AS `Price`
            ,		common.FN_ShortValueToNumber(A.`Change`)			AS `Change`
            ,		A.`Volume`			AS `Volume`
            ,		CAST(A.`Earnings` AS VARCHAR(30))		AS `Earnings`
            ,		A.`Target Price`	AS `Target_Price`
            ,		CAST(A.`IPO Date` AS VARCHAR(30))		AS `IPO_Date`
            ,		CAST(`idt` AS DATETIME)					AS idt
        FROM	finviz_rawdata		AS A
    """
,   "stockfactor_truncate" : " TRUNCATE TABLE `stockfactor` "   
,   "stockfactor_history_recentlydate_delete" : " DELETE FROM stockfactor_history WHERE base_date = (SELECT base_date FROM `stockfactor` LIMIT 1) "
,   "stockfactor_history_bulkinsert" : " INSERT INTO stockfactor_history SELECT * FROM stockfactor "
,   "tags_create_view" : """
        CREATE OR REPLACE VIEW `vw_config_etl__tag_key_seed`
        AS 
        SELECT 	'Sector' 	AS tag_group
            ,		1000			AS key_seed
        UNION ALL 
        SELECT 	'Industry' 	AS tag_group
            ,		2000			AS key_seed
        UNION ALL
        SELECT 	'Country'	AS tag_group
            ,		3000			AS key_seed
        """
,   "tags_create_sector_tags" : """
        CREATE OR REPLACE TEMPORARY TABLE `tmp_sector_tags`
        AS 
        SELECT 	base_date		AS base_date
            ,		ROW_NUMBER() OVER(ORDER BY Sector) + key_seed	AS tag_key
            , 		'Sector'			AS tag_group
            ,		Sector			AS tag
            ,		0					AS parent_tag_key
        FROM 		stockfactor A
                    JOIN 
                    vw_config_etl__tag_key_seed B
                    ON( 'Sector'	= B.tag_group )
        GROUP BY base_date, Sector	
    """
,   "tags_create_industry_tags" : """
        CREATE OR REPLACE TEMPORARY TABLE `tmp_industry_tags`
        AS 
        SELECT 	A.base_date
                ,	A.tag_key
                ,	A.tag_group
                ,	A.tag
                ,	C.tag_key AS parent_tag_key
        FROM 		(
                        SELECT 	base_date		AS base_date
                            ,		ROW_NUMBER() OVER(ORDER BY Industry) + key_seed	AS tag_key
                            , 		'Industry'		AS tag_group
                            ,		Industry			AS tag
                            ,		Sector			AS parent_tag
                        FROM 		stockfactor A
                                    JOIN 
                                    vw_config_etl__tag_key_seed B
                                    ON( 'Industry'	= B.tag_group )
                        GROUP BY base_date, Industry, Sector
                    ) A
                    JOIN 
                    tmp_sector_tags C
                    ON(A.parent_tag = C.tag)
    """
,   "tags_create_country_tags" : """
        CREATE OR REPLACE TEMPORARY TABLE `tmp_country_tags`
        AS 
        SELECT 	base_date		AS base_date
            ,		ROW_NUMBER() OVER(ORDER BY Country) + key_seed	AS tag_key
            , 		'Country'		AS tag_group
            ,		Country			AS tag
            ,		0					AS parent_tag_key
        FROM 		stockfactor A
                    JOIN 
                    vw_config_etl__tag_key_seed B
                    ON( 'Country'	= B.tag_group )
        GROUP BY base_date, Country	
    """

,   "tags_truncate" : """
        TRUNCATE TABLE `tags`
        """
    
,   "tags_make_tags" : """
        INSERT INTO `tags`
        SELECT * FROM `tmp_sector_tags`
        UNION ALL
        SELECT * FROM `tmp_industry_tags`
        UNION ALL
        SELECT * FROM `tmp_country_tags`
        """
}

 

6. Define data process.

( CSV Load, Import, Cleansing... )

# csv파일을 dataframe으로 불러오는 함수.
def etl_task00__load_rawdata(csvfile):
    try:
        # Loading data from github
        targetfile = csvfile
        df_rawdata = pd.read_csv(targetfile, index_col=False)
        
        #첫번째행 제거 
        df_rawdata.drop([df_rawdata.columns[0]], axis=1, inplace=True)
        
        #nan값 0으로 치환
        df_rawdata.fillna(0, inplace=True)
        ret = df_rawdata
    except Error as e:
        print(f"etl_task00__load_rawdata():Error while file-loading. (file={csvfile})", e)
        ret = None
    finally:
        return ret
        
# csv파일을 finviz_rawdata 테이블로 임포트하는 함수
def etl_task01__import_rawdata(dataframe):
    try:
        print("*"*50)
        print("task : etl_task02__make_stockfactor")
        conn = connect(conn_params_dic)
        if(conn!=None):
            dest_table = "finviz_rawdata"
            execute(conn=conn, sql=f"truncate table {dest_table}")
            execute_many(conn, dataframe, dest_table)
        else:
            print("etl_task01__import_rawdata():Connection fail!")
        success = True
    except Error as e:
        print("etl_task01__import_rawdata():Error while inserting to MySQL", e)
        success = False
    finally:
        conn.close()
        print("etl_task01__import_rawdata():db connection closed.")
        print("*"*50)
        return success

# stockfactor 테이블의 데이터를 생성하는 함수
def etl_task02__make_stockfactor():
    try:
        print("*"*50)
        print("task : etl_task02__make_stockfactor")
        conn = connect(conn_params_dic)
        if(conn!=None):
            execute(conn, sql_dict["stockfactor_truncate"])
            execute(conn, sql_dict["stockfactor_bulkinsert"])
        else:
            print("etl_task02__make_stockfactor():Connection fail!")
        success = True
    except Error as e:
        print("etl_task02__make_stockfactor():Error while inserting to MySQL", e)
        success = False
    finally:
        conn.close()
        print("etl_task02__make_stockfactor():db connection closed.")
        print("*"*50)
        return success
        
# stockfactor_history 테이블의 데이터를 생성하는 함수
def etl_task03__make_stockfactor_history():
    try:
        print("*"*50)
        print("task : etl_task03__make_stockfactor_history")
        conn = connect(conn_params_dic)
        if(conn!=None):
            execute(conn, sql_dict["stockfactor_history_recentlydate_delete"])
            execute(conn, sql_dict["stockfactor_history_bulkinsert"])
        else:
            print("etl_task03__make_stockfactor_history():Connection fail!")
        success = True
    except Error as e:
        print("etl_task03__make_stockfactor_history():Error while inserting to MySQL", e)
        success = False
    finally:
        conn.close()
        print("etl_task03__make_stockfactor_history():db connection closed.")
        print("*"*50)
        return success

# tags 테이블의 데이터를 생성
def etl_task04__make_tags():
    try:
        print("*"*50)
        print("task : etl_task04__make_tags")
        conn = connect(conn_params_dic)
        if(conn!=None):
            #임시테이블 사용하는 쿼리는 같은 세션이어야함.
            cursor = conn.cursor()
            cursor.execute(sql_dict["tags_create_view"])
            cursor.execute(sql_dict["tags_create_sector_tags"])
            
            cursor.execute(sql_dict["tags_create_industry_tags"])
            cursor.execute(sql_dict["tags_create_country_tags"])
            
            cursor.execute(sql_dict["tags_truncate"])
            cursor.execute(sql_dict["tags_make_tags"])
            conn.commit()
        else:
            print("etl_task04__make_tags():Connection fail!")
        success = True
    except Error as e:
        print("etl_task04__make_tags():Error while inserting to MySQL", e)
        success = False
    finally:
        conn.close()
        print("etl_task04__make_tags():db connection closed.")
        print("*"*50)
        return success
        
# 데이터 처리 결과 확인 함수 
def etl_task05__check_table():
    try:
        print("*"*50)
        print("task : etl_task05__check_table")
        conn = connect(conn_params_dic)
        if(conn!=None):
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM `tags`")
            result = cursor.fetchall()
            print(f"`tags` table rowcount : {result[0][0]}")
            conn.commit()
            
            cursor.execute("SELECT COUNT(*) FROM `stockfactor`")
            result = cursor.fetchall()
            print(f"`stockfactor` table rowcount : {result[0][0]}")
        else:
            print("etl_task05__check_table():Connection fail!")
        success = True
    except Error as e:
        print("etl_task05__check_table():Error while inserting to MySQL", e)
        success = False
    finally:
        conn.close()
        print("etl_task05__check_table():db connection closed.")
        print("*"*50)
        return success 

#etl main 함수 
#데이터 처리하는 플로우를 정의하는 함수 
def etl_main(csvfile):
    dataframe = etl_task00__load_rawdata(csvfile)
    #if(dataframe==None):
    #    print(f"etl_task00__load_rawdata() fail!(file={csvfile})")
    #    return 1
    
    result = etl_task01__import_rawdata(dataframe)
    if (result==False):
        print("etl_task01__import_rawdata() fail!")
        return 1
    
    result = etl_task02__make_stockfactor()
    if (result==False):
        print("etl_task02__make_stockfactor() fail!")
        return 2
    
    result = etl_task03__make_stockfactor_history()
    if (result==False):
        print("etl_task03__make_stockfactor_history() fail!")
        return 3
    
    result = etl_task04__make_tags()
    if (result==False):
        print("etl_task04_make_tags() fail!")
        return 4
    
    result = etl_task05__check_table()
    if (result==False):
        print("etl_task05__check_table() fail!")
        return 5

 

7. Execute

#main 실행함수
def main():
    #위에서 다운로드한 csv을 경로를 지정합니다.
    rawdatafile = "D:\\pyproject\\data\\fv_screener_sampledata.csv"
    
    #데이터 처리를 실행한다.
    etl_main(rawdatafile)

main()

 

<추가적인 코멘트>

CSV 용량이 크다면 Pandas가 아닌 다른 라이브러리를 추천합니다. 

 

 

python etl

 

<Python 관련 글>

[Data Scientist/Python] - [PYTHON] 유용한 정규식 모음

728x90
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기
반응형