개요
웹에서 수집한 RAW-DATA(CSV)를 읽어서 가공하고 DB화하는 내용입니다.
데이터는 주식 정보를 제공하는 사이트인 finviz.com의 Screener 메뉴에서 제공하는 주식 데이터가 CSV로 존재한다는 전제로 CSV파일을 Python코드로 가공하여 DB(MYSQL)화하는 ETL(Extract-Transfer-Load) 실전 예제입니다.
특히, 데이터 가공을 Python의 Pandas가 아닌 테이블로 로드하여 SQL로 가공하는 기법을 소개하는 내용입니다.
예제에서 사용하는 RAW-DATA입니다. 다운로드 받어서 적절한 위치로 복사해주시면 됩니다.
샘플데이터 :
다음 Jupyter Notebook에서 샘플 데이터를 확인하는 코드입니다.
import os
import sys
import pandas as pd
#CSV 파일의 위치를 지정한다.
targetfile = "D:\\pyproject\\data\\fv_screener_sampledata.csv"
sourceData = pd.read_csv(targetfile, index_col=False)
sourceData.head()
df_rawdata = sourceData ##.head()
df_rawdata.head()
df_rawdata.drop([df_rawdata.columns[0]], axis=1, inplace=True)
df_rawdata.fillna(0, inplace=True)
df_rawdata
데이터 처리 플로우
fv_screener_sampledata.csv
--> superstock.finviz_rawdata
--> superstock.stockfactor --> superstock.tags
--> superstock.stockfactor_history
그럼 본격적으로 데이터 처리 과정 및 구현 코드를 살펴보겠습니다.
<데이터 처리 과정>
1. Create databases.
2. Install Python Mysql-connector & Import.
3. Define db(mysql) connect function.
4. Define sql execute function.
5. Define migration sql.
6. Define data process.
7. Execute.
1. Create databases
다음 쿼리를 실행하여 DB를 생성한다.
- common.FN_ShortValueToNumber() : "1K"와 같은 단축 표기 형식을 숫자 타입인 1000로 변화하는 함수로 데이터 가공할 때 활용되는 함수입니다.
- superstock.finviz_rawdata : csv 파일을 1차 가공한 데이터를 관리하는 테이블
- superstock.stockfactor : csv 파일을 최종 가공한 데이터를 관리하는 테이블이며, 가장 최신의 주식 종목정보를 유지하는 테이블.
- superstock.stockfactor_history : csv 파일을 최종 가공한 데이터의 히스토리를 관리하는 테이블.
- symbole : 미사용
- tags : 태그값은 특정 컬럼의 태그값을 정의하는 테이블. (예) Sector 컬럼에 "Technology", "Heathcare"....
2. Install python mysql-connector & import
pip install mysql-connector-python
import os
import sys
import pandas as pd
import mysql.connector as msql
from mysql.connector import Error
3. Define db(mysql) connect function
# DB Connection config.
conn_params_dic = {
"host" : "localhost",
"database" : "superstock",
"user" : "yourusername",
"password" : "yourpass"
}
# Mysql DB에 접속하는 함수로 Connection 객체를 반환합니다.
def connect(conn_params_dic):
conn = None
try:
print('Connecting to the MySQL...........')
conn = msql.connect(**conn_params_dic)
print("Connection successfully..................")
except Error as err:
print("Error while connecting to MySQL", err)
# set the connection to 'None' in case of error
conn = None
return conn
4. Define sql execute function
특히 cursor.executemany 함수를 통해서 많은 행을 효과적으로 INSERT하는 기법을 소개합니다.
# dataframe을 cursor.executemany()로 INSERT하는 함수
def execute_many(conn, datafrm, table):
# Creating a list of tupples from the dataframe values
tpls = [tuple(x) for x in datafrm.to_numpy()]
# dataframe columns with Comma-separated
cols = '`,`'.join(list(datafrm.columns))
cols = "`"+cols+"`"
params="%s" + ",%s"*(len(df_rawdata.columns)-1)
# SQL query to execute
#sql = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s)" % (table, cols)
sql = "INSERT INTO %s(%s) VALUES(%s)" % (table, cols, params)
print("SQL:", sql)
cursor = conn.cursor()
try:
cursor.executemany(sql, tpls)
conn.commit()
print("Data inserted using execute_many() successfully...")
except Error as e:
print("Error while inserting to MySQL", e)
cursor.close()
# cursor.execute()로 SQL을 실행하는 함수
def execute(conn, sql, multistmt=False):
cursor = conn.cursor()
print("SQL:", sql)
try:
cursor.execute(sql, multi=multistmt)
conn.commit()
print("execute() successfully...")
except Error as e:
print("Error while executing to MySQL", e)
cursor.close()
5. Define migration sql
sql_dict={
"stockfactor_bulkinsert" : """INSERT INTO `stockfactor`
SELECT CAST(`idt` AS DATE) AS base_date
, A.`No.` AS `No`
, CAST(A.`Ticker` AS VARCHAR(10)) AS `Ticker`
, CAST(A.`Company` AS VARCHAR(300)) AS `Company`
, CAST(A.`Sector` AS VARCHAR(100)) AS `Sector`
, CAST(A.`Industry` AS VARCHAR(100)) AS `Industry`
, CAST(A.`Country` AS VARCHAR(50)) AS `Country`
, common.FN_ShortValueToNumber(A.`Market Cap`) AS `Market_Cap`
, A.`P/E` AS `PE`
, A.`Fwd P/E` AS `Fwd_PE`
, A.`PEG` AS `PEG`
, A.`P/S` AS `PS`
, A.`P/B` AS `PB`
, A.`P/C` AS `PC`
, A.`P/FCF` AS `PFCF`
, common.FN_ShortValueToNumber(A.`Dividend`) AS `Dividend`
, common.FN_ShortValueToNumber(A.`Payout Ratio`) AS `Payout_Ratio`
, A.`EPS` AS `EPS`
, common.FN_ShortValueToNumber(A.`EPS this Y`) AS `EPS_this_Y`
, common.FN_ShortValueToNumber(A.`EPS next Y`) AS `EPS_next_Y`
, common.FN_ShortValueToNumber(A.`EPS past 5Y`) AS `EPS_past_5Y`
, common.FN_ShortValueToNumber(A.`EPS next 5Y`) AS `EPS_next_5Y`
, common.FN_ShortValueToNumber(A.`Sales past 5Y`) AS `Sales_past_5Y`
, common.FN_ShortValueToNumber(A.`EPS Q/Q`) AS `EPS_QoQ`
, common.FN_ShortValueToNumber(A.`Sales Q/Q`) AS `Sales_QoQ`
, common.FN_ShortValueToNumber(A.`Outstanding`) AS `Outstanding`
, common.FN_ShortValueToNumber(A.`Float`) AS `Float`
, common.FN_ShortValueToNumber(A.`Insider Own`) AS `Insider_Own`
, common.FN_ShortValueToNumber(A.`Insider Trans`) AS `Insider_Trans`
, common.FN_ShortValueToNumber(A.`Inst Own`) AS `Inst_Own`
, common.FN_ShortValueToNumber(A.`Inst Trans`) AS `Inst_Trans`
, common.FN_ShortValueToNumber(A.`Float Short`) AS `Float_Short`
, A.`Short Ratio` AS `Short_Ratio`
, common.FN_ShortValueToNumber(A.`ROA`) AS `ROA`
, common.FN_ShortValueToNumber(A.`ROE`) AS `ROE`
, common.FN_ShortValueToNumber(A.`ROI`) AS `ROI`
, A.`Curr R` AS `Curr_R`
, A.`Quick R` AS `Quick_R`
, A.`LTDebt/Eq` AS `LTDebtPerEq`
, A.`Debt/Eq` AS `DebtPerEq`
, common.FN_ShortValueToNumber(A.`Gross M`) AS `Gross_M`
, common.FN_ShortValueToNumber(A.`Oper M`) AS `Oper_M`
, common.FN_ShortValueToNumber(A.`Profit M`) AS `Profit_M`
, common.FN_ShortValueToNumber(A.`Perf Week`) AS `Perf_Week`
, common.FN_ShortValueToNumber(A.`Perf Month`) AS `Perf_Month`
, common.FN_ShortValueToNumber(A.`Perf Quart`) AS `Perf_Quart`
, common.FN_ShortValueToNumber(A.`Perf Half`) AS `Perf_Half`
, common.FN_ShortValueToNumber(A.`Perf Year`) AS `Perf_Year`
, common.FN_ShortValueToNumber(A.`Perf YTD`) AS `Perf_YTD`
, A.`Beta` AS `Beta`
, A.`ATR` AS `ATR`
, common.FN_ShortValueToNumber(A.`Volatility W`) AS `Volatility_W`
, common.FN_ShortValueToNumber(A.`Volatility M`) AS `Volatility_M`
, common.FN_ShortValueToNumber(A.`SMA20`) AS `SMA20`
, common.FN_ShortValueToNumber(A.`SMA50`) AS `SMA50`
, common.FN_ShortValueToNumber(A.`SMA200`) AS `SMA200`
, common.FN_ShortValueToNumber(A.`50D High`) AS `50D_High`
, common.FN_ShortValueToNumber(A.`50D Low`) AS `50D_Low`
, common.FN_ShortValueToNumber(A.`52W High`) AS `52W_High`
, common.FN_ShortValueToNumber(A.`52W Low`) AS `52W_Low`
, A.`RSI` AS `RSI`
, common.FN_ShortValueToNumber(A.`from Open`) AS `from_Open`
, common.FN_ShortValueToNumber(A.`Gap`) AS `Gap`
, A.`Recom` AS `Recom`
, common.FN_ShortValueToNumber(A.`Avg Volume`) AS `Avg_Volume`
, A.`Rel Volume` AS `Rel_Volume`
, A.`Price` AS `Price`
, common.FN_ShortValueToNumber(A.`Change`) AS `Change`
, A.`Volume` AS `Volume`
, CAST(A.`Earnings` AS VARCHAR(30)) AS `Earnings`
, A.`Target Price` AS `Target_Price`
, CAST(A.`IPO Date` AS VARCHAR(30)) AS `IPO_Date`
, CAST(`idt` AS DATETIME) AS idt
FROM finviz_rawdata AS A
"""
, "stockfactor_truncate" : " TRUNCATE TABLE `stockfactor` "
, "stockfactor_history_recentlydate_delete" : " DELETE FROM stockfactor_history WHERE base_date = (SELECT base_date FROM `stockfactor` LIMIT 1) "
, "stockfactor_history_bulkinsert" : " INSERT INTO stockfactor_history SELECT * FROM stockfactor "
, "tags_create_view" : """
CREATE OR REPLACE VIEW `vw_config_etl__tag_key_seed`
AS
SELECT 'Sector' AS tag_group
, 1000 AS key_seed
UNION ALL
SELECT 'Industry' AS tag_group
, 2000 AS key_seed
UNION ALL
SELECT 'Country' AS tag_group
, 3000 AS key_seed
"""
, "tags_create_sector_tags" : """
CREATE OR REPLACE TEMPORARY TABLE `tmp_sector_tags`
AS
SELECT base_date AS base_date
, ROW_NUMBER() OVER(ORDER BY Sector) + key_seed AS tag_key
, 'Sector' AS tag_group
, Sector AS tag
, 0 AS parent_tag_key
FROM stockfactor A
JOIN
vw_config_etl__tag_key_seed B
ON( 'Sector' = B.tag_group )
GROUP BY base_date, Sector
"""
, "tags_create_industry_tags" : """
CREATE OR REPLACE TEMPORARY TABLE `tmp_industry_tags`
AS
SELECT A.base_date
, A.tag_key
, A.tag_group
, A.tag
, C.tag_key AS parent_tag_key
FROM (
SELECT base_date AS base_date
, ROW_NUMBER() OVER(ORDER BY Industry) + key_seed AS tag_key
, 'Industry' AS tag_group
, Industry AS tag
, Sector AS parent_tag
FROM stockfactor A
JOIN
vw_config_etl__tag_key_seed B
ON( 'Industry' = B.tag_group )
GROUP BY base_date, Industry, Sector
) A
JOIN
tmp_sector_tags C
ON(A.parent_tag = C.tag)
"""
, "tags_create_country_tags" : """
CREATE OR REPLACE TEMPORARY TABLE `tmp_country_tags`
AS
SELECT base_date AS base_date
, ROW_NUMBER() OVER(ORDER BY Country) + key_seed AS tag_key
, 'Country' AS tag_group
, Country AS tag
, 0 AS parent_tag_key
FROM stockfactor A
JOIN
vw_config_etl__tag_key_seed B
ON( 'Country' = B.tag_group )
GROUP BY base_date, Country
"""
, "tags_truncate" : """
TRUNCATE TABLE `tags`
"""
, "tags_make_tags" : """
INSERT INTO `tags`
SELECT * FROM `tmp_sector_tags`
UNION ALL
SELECT * FROM `tmp_industry_tags`
UNION ALL
SELECT * FROM `tmp_country_tags`
"""
}
6. Define data process.
( CSV Load, Import, Cleansing... )
# csv파일을 dataframe으로 불러오는 함수.
def etl_task00__load_rawdata(csvfile):
try:
# Loading data from github
targetfile = csvfile
df_rawdata = pd.read_csv(targetfile, index_col=False)
#첫번째행 제거
df_rawdata.drop([df_rawdata.columns[0]], axis=1, inplace=True)
#nan값 0으로 치환
df_rawdata.fillna(0, inplace=True)
ret = df_rawdata
except Error as e:
print(f"etl_task00__load_rawdata():Error while file-loading. (file={csvfile})", e)
ret = None
finally:
return ret
# csv파일을 finviz_rawdata 테이블로 임포트하는 함수
def etl_task01__import_rawdata(dataframe):
try:
print("*"*50)
print("task : etl_task02__make_stockfactor")
conn = connect(conn_params_dic)
if(conn!=None):
dest_table = "finviz_rawdata"
execute(conn=conn, sql=f"truncate table {dest_table}")
execute_many(conn, dataframe, dest_table)
else:
print("etl_task01__import_rawdata():Connection fail!")
success = True
except Error as e:
print("etl_task01__import_rawdata():Error while inserting to MySQL", e)
success = False
finally:
conn.close()
print("etl_task01__import_rawdata():db connection closed.")
print("*"*50)
return success
# stockfactor 테이블의 데이터를 생성하는 함수
def etl_task02__make_stockfactor():
try:
print("*"*50)
print("task : etl_task02__make_stockfactor")
conn = connect(conn_params_dic)
if(conn!=None):
execute(conn, sql_dict["stockfactor_truncate"])
execute(conn, sql_dict["stockfactor_bulkinsert"])
else:
print("etl_task02__make_stockfactor():Connection fail!")
success = True
except Error as e:
print("etl_task02__make_stockfactor():Error while inserting to MySQL", e)
success = False
finally:
conn.close()
print("etl_task02__make_stockfactor():db connection closed.")
print("*"*50)
return success
# stockfactor_history 테이블의 데이터를 생성하는 함수
def etl_task03__make_stockfactor_history():
try:
print("*"*50)
print("task : etl_task03__make_stockfactor_history")
conn = connect(conn_params_dic)
if(conn!=None):
execute(conn, sql_dict["stockfactor_history_recentlydate_delete"])
execute(conn, sql_dict["stockfactor_history_bulkinsert"])
else:
print("etl_task03__make_stockfactor_history():Connection fail!")
success = True
except Error as e:
print("etl_task03__make_stockfactor_history():Error while inserting to MySQL", e)
success = False
finally:
conn.close()
print("etl_task03__make_stockfactor_history():db connection closed.")
print("*"*50)
return success
# tags 테이블의 데이터를 생성
def etl_task04__make_tags():
try:
print("*"*50)
print("task : etl_task04__make_tags")
conn = connect(conn_params_dic)
if(conn!=None):
#임시테이블 사용하는 쿼리는 같은 세션이어야함.
cursor = conn.cursor()
cursor.execute(sql_dict["tags_create_view"])
cursor.execute(sql_dict["tags_create_sector_tags"])
cursor.execute(sql_dict["tags_create_industry_tags"])
cursor.execute(sql_dict["tags_create_country_tags"])
cursor.execute(sql_dict["tags_truncate"])
cursor.execute(sql_dict["tags_make_tags"])
conn.commit()
else:
print("etl_task04__make_tags():Connection fail!")
success = True
except Error as e:
print("etl_task04__make_tags():Error while inserting to MySQL", e)
success = False
finally:
conn.close()
print("etl_task04__make_tags():db connection closed.")
print("*"*50)
return success
# 데이터 처리 결과 확인 함수
def etl_task05__check_table():
try:
print("*"*50)
print("task : etl_task05__check_table")
conn = connect(conn_params_dic)
if(conn!=None):
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM `tags`")
result = cursor.fetchall()
print(f"`tags` table rowcount : {result[0][0]}")
conn.commit()
cursor.execute("SELECT COUNT(*) FROM `stockfactor`")
result = cursor.fetchall()
print(f"`stockfactor` table rowcount : {result[0][0]}")
else:
print("etl_task05__check_table():Connection fail!")
success = True
except Error as e:
print("etl_task05__check_table():Error while inserting to MySQL", e)
success = False
finally:
conn.close()
print("etl_task05__check_table():db connection closed.")
print("*"*50)
return success
#etl main 함수
#데이터 처리하는 플로우를 정의하는 함수
def etl_main(csvfile):
dataframe = etl_task00__load_rawdata(csvfile)
#if(dataframe==None):
# print(f"etl_task00__load_rawdata() fail!(file={csvfile})")
# return 1
result = etl_task01__import_rawdata(dataframe)
if (result==False):
print("etl_task01__import_rawdata() fail!")
return 1
result = etl_task02__make_stockfactor()
if (result==False):
print("etl_task02__make_stockfactor() fail!")
return 2
result = etl_task03__make_stockfactor_history()
if (result==False):
print("etl_task03__make_stockfactor_history() fail!")
return 3
result = etl_task04__make_tags()
if (result==False):
print("etl_task04_make_tags() fail!")
return 4
result = etl_task05__check_table()
if (result==False):
print("etl_task05__check_table() fail!")
return 5
7. Execute
#main 실행함수
def main():
#위에서 다운로드한 csv을 경로를 지정합니다.
rawdatafile = "D:\\pyproject\\data\\fv_screener_sampledata.csv"
#데이터 처리를 실행한다.
etl_main(rawdatafile)
main()
<추가적인 코멘트>
CSV 용량이 크다면 Pandas가 아닌 다른 라이브러리를 추천합니다.
<Python 관련 글>
'Data Science > Python' 카테고리의 다른 글
[Python] Mysql에서 멀티 행을 Update & Insert 하는 코드 (0) | 2022.02.16 |
---|---|
[Python] 대용량 데이터 처리 및 분석을 위한 PyArrow (Apache Arrow) (0) | 2022.02.16 |
[PYTHON] 유용한 정규식 모음 (0) | 2021.09.16 |
[스크랩] opencv_tutorials (0) | 2021.07.21 |
Extracting Speech from Video using Python (0) | 2020.09.17 |
최근댓글