본문 바로가기

주식 데이터 모델링

딥러닝으로 주식 예측하는 논문 구현 [Stock Market Prediction on High-Frequency Data Using Generative Adversarial Nets] - 5. 구현(2) - 데이터 전처리

 이 논문에서는 다양한 기술적 지표 들을 사용하고, 저는 그 지표들을 직접 계산한다고 말씀 드렸습니다. 물론 HTS 등을 이용하면 훨씬 쉽게 추출 할 수 있겠으나... 파이썬을 통해 계산해 보았습니다.

 논문에서 사용한 기술적 지표들을 위와 같습니다. 

1. 시가

2. 고가

3. 저가

4. 거래량

5. 턴오버

6. Bias(?)

7. 볼린져 밴드

8. DMI

9. 지수 이동 평균

10. 스토캐스틱 인덱스

11. 이동평균

12. MACD

13. RSI

 먼저 말씀 드릴 것은, 저도 처음 보는 지표들이 몇개 있었고 그 중 6번째 지표인 Bias는 무엇인지 정확히 알 수가 없어서 사용하지 않았습니다. 그럼 나머지 지표가 12 종류입니다. 각각의 지표를 설명하기에는, 이미 양질의 자료들이 너무 많기 때문에 생략했습니다. 

 

import os
import re

import requests
import numpy as np
import pandas as pd
import FinanceDataReader as fdr

from bs4 import BeautifulSoup

def cal_num_stock(url) :
    #상장 주식수 크롤링
    r = requests.get(url)
    soup = BeautifulSoup(r.text, 'lxml')
    items = soup.find_all('table', {"summary" : "시가총액 정보"})
    items = items[0].find_all("td")
    nums = re.findall("\d+", str(items[2]))
    num_stock = 0
    digits = len(nums) - 1
    for num in nums :
            num_stock += int(num) * 1000 ** digits
            digits -= 1

    return num_stock

def cal_bb(stock, w=20, k=2) :
    x = pd.Series(stock)
    mbb = x.rolling(w, min_periods=1).mean()
    ubb = mbb + k * x.rolling(w, min_periods=1).std()
    lbb = mbb - k * x.rolling(w, min_periods=1).std()

    return mbb, ubb, lbb

def cal_dmi(data, n=14, n_ADX=14) :
    #https://github.com/Crypto-toolbox/pandas-technical-indicators/blob/master/technical_indicators.py : ADX
    i = 0
    UpI = []
    DoI = []
    while i + 1 <= data.index[-1] :
        UpMove = data.loc[i + 1, "High"] - data.loc[i, "High"]
        DoMove = data.loc[i, "Low"] - data.loc[i+1, "Low"]
        if UpMove > DoMove and UpMove > 0 :
            UpD = UpMove
        else :
            UpD = 0
        UpI.append(UpD)
        if DoMove > UpMove and DoMove > 0 :
            DoD = DoMove
        else :
            DoD = 0
        DoI.append(DoD)
        i = i + 1

    i = 0
    TR_l = [0]
    while i < data.index[-1]:
        TR = max(data.loc[i + 1, 'High'], data.loc[i, 'Close']) - min(data.loc[i + 1, 'Low'], data.loc[i, 'Close'])
        TR_l.append(TR)
        i = i + 1
    TR_s = pd.Series(TR_l)
    ATR = pd.Series(TR_s.ewm(span=n, min_periods=1).mean())
    UpI = pd.Series(UpI)
    DoI = pd.Series(DoI)
    PosDI = pd.Series(UpI.ewm(span=n, min_periods=1).mean() / ATR)
    NegDI = pd.Series(DoI.ewm(span=n, min_periods=1).mean() / ATR)
    ADX = pd.Series((abs(PosDI - NegDI) / (PosDI + NegDI)).ewm(span=n_ADX, min_periods=1).mean(),
                    name='ADX_' + str(n) + '_' + str(n_ADX))

    return ADX

def cal_ema_macd(data, n_fast=12, n_slow=26, n_signal=9) : 
    #https://wikidocs.net/3397
    data["EMAFast"] = data["Close"].ewm(span=n_fast, min_periods=1).mean()
    data["EMASlow"] = data["Close"].ewm(span=n_slow, min_periods=1).mean()
    data["MACD"] = data["EMAFast"] - data["EMASlow"]
    data["MACDSignal"] = data["MACD"].ewm(span=n_signal, min_periods=1).mean()
    data["MACDDiff"] = data["MACD"] - data["MACDSignal"]
    
    return data

def cal_rsi(data, N=14) :
    #https://wikidocs.net/3399
    U = np.where(data.diff(1) > 0, data.diff(1), 0)
    D = np.where(data.diff(1) < 0, data.diff(1) * (-1), 0)

    AU = pd.DataFrame(U).rolling(window=N, min_periods=1).mean()
    AD = pd.DataFrame(U).rolling(window=N, min_periods=1).mean()
    RSI = AU.div(AD+AU) * 100

	num_nan = np.sum(np.isnan(np.array(RSI[2:])))
    
    return RSI, num_nan

def cal_mv(data, N) :
    mv_n = data.rolling(window=N, min_periods=1).mean()

    return mv_n

def cal_kdjsi(data, N=15, M=5, T=3) :
    L = data["Low"].rolling(window=N, min_periods=1).min()
    H = data["High"].rolling(window=N, min_periods=1).max()

    k = ((data["Close"] - L) / (H - L)) * 100
    d = k.ewm(span=M).mean()
    j = d.ewm(span=T).mean()

    return k,d,j


path_dir = './data'
file_list = os.listdir(path_dir)

if not os.path.exists("./pre_data") :
    os.makedirs("./pre_data")

for item in file_list :

    if item.find("from") is not -1 :
        data = np.loadtxt(path_dir + '/' + item, delimiter = ',')
        data = pd.DataFrame(data)
        data.columns = ['Open', 'High', "Low", "Close", "Volumn", "Adj"]
        data = data[["Close", "Open", "High", "Low", "Volumn"]]
        
        #Turnover
        code = item[:6]
        url = "https://finance.naver.com/item/main.nhn?code={}".format(code)
        num_stock = cal_num_stock(url)
        data["Turnover"] = data["Volumn"] / num_stock

        #Bias --- ??
        #Bolinger bands  
        data["MBB"], data["HBB"], data["LBB"] = cal_bb(data["Close"])

        #moving averages
        data["MV_5"] = cal_mv(data["Close"], 5)
        data["MV_15"] = cal_mv(data["Close"], 15)
        data["MV_60"] = cal_mv(data["Close"], 60)

        #Exponential moving averages, MACD
        data = cal_ema_macd(data)

        #import pdb; pdb.set_trace()

        #Stochastic index (stochastic oscillator)
        data["KDJ_K"], data["KDJ_D"], data["KDJ_J"] = cal_kdjsi(data)

        #Directional movement index
        data["DMI"] = cal_dmi(data)

        #Relative strength index
        data["RSI"], num_nan = cal_rsi(data["Close"])
        if num_nan > 1 :
            print("{} has nan".format(item))
            continue
        np.save("./pre_data/{}.npy".format(item[:-4]), data.to_numpy())

 각 지표에 대한 이해를 조금 하셨다면 코드 자체는 어렵지 않습니다. 크롤링을 할때 대부분 column 6개를 가져오게 되고, 그것의 순서를 변경 시킨다음에 각각의 지표들을 전부 계산해서 새로 저장해두었습니다. 

 12 종류의 지표들을 사용했지만 이동 평균, 볼린져 밴드 등 하나의 지표를 여러 값으로 판단하는 게 유의미한 경우가 있어서, 그런 경우에는 한 종류의 지표당 여러 개의 변수를 사용합니다. 

 

 사실 어려운 부분은 아니지만, 코드가 꽤나 길어서 따로 글을 썼습니다. 다음 부분에서는 Rolling Segmentation을 나름대로 구현한 글을 작성하겠습니다.