이 논문에서는 다양한 기술적 지표 들을 사용하고, 저는 그 지표들을 직접 계산한다고 말씀 드렸습니다. 물론 HTS 등을 이용하면 훨씬 쉽게 추출 할 수 있겠으나... 파이썬을 통해 계산해 보았습니다.
논문에서 사용한 기술적 지표들을 위와 같습니다.
1. 시가
2. 고가
3. 저가
4. 거래량
5. 턴오버
6. Bias(?)
7. 볼린져 밴드
8. DMI
9. 지수 이동 평균
10. 스토캐스틱 인덱스
11. 이동평균
12. MACD
13. RSI
먼저 말씀 드릴 것은, 저도 처음 보는 지표들이 몇개 있었고 그 중 6번째 지표인 Bias는 무엇인지 정확히 알 수가 없어서 사용하지 않았습니다. 그럼 나머지 지표가 12 종류입니다. 각각의 지표를 설명하기에는, 이미 양질의 자료들이 너무 많기 때문에 생략했습니다.
import os
import re
import requests
import numpy as np
import pandas as pd
import FinanceDataReader as fdr
from bs4 import BeautifulSoup
def cal_num_stock(url) :
#상장 주식수 크롤링
r = requests.get(url)
soup = BeautifulSoup(r.text, 'lxml')
items = soup.find_all('table', {"summary" : "시가총액 정보"})
items = items[0].find_all("td")
nums = re.findall("\d+", str(items[2]))
num_stock = 0
digits = len(nums) - 1
for num in nums :
num_stock += int(num) * 1000 ** digits
digits -= 1
return num_stock
def cal_bb(stock, w=20, k=2) :
x = pd.Series(stock)
mbb = x.rolling(w, min_periods=1).mean()
ubb = mbb + k * x.rolling(w, min_periods=1).std()
lbb = mbb - k * x.rolling(w, min_periods=1).std()
return mbb, ubb, lbb
def cal_dmi(data, n=14, n_ADX=14) :
#https://github.com/Crypto-toolbox/pandas-technical-indicators/blob/master/technical_indicators.py : ADX
i = 0
UpI = []
DoI = []
while i + 1 <= data.index[-1] :
UpMove = data.loc[i + 1, "High"] - data.loc[i, "High"]
DoMove = data.loc[i, "Low"] - data.loc[i+1, "Low"]
if UpMove > DoMove and UpMove > 0 :
UpD = UpMove
else :
UpD = 0
UpI.append(UpD)
if DoMove > UpMove and DoMove > 0 :
DoD = DoMove
else :
DoD = 0
DoI.append(DoD)
i = i + 1
i = 0
TR_l = [0]
while i < data.index[-1]:
TR = max(data.loc[i + 1, 'High'], data.loc[i, 'Close']) - min(data.loc[i + 1, 'Low'], data.loc[i, 'Close'])
TR_l.append(TR)
i = i + 1
TR_s = pd.Series(TR_l)
ATR = pd.Series(TR_s.ewm(span=n, min_periods=1).mean())
UpI = pd.Series(UpI)
DoI = pd.Series(DoI)
PosDI = pd.Series(UpI.ewm(span=n, min_periods=1).mean() / ATR)
NegDI = pd.Series(DoI.ewm(span=n, min_periods=1).mean() / ATR)
ADX = pd.Series((abs(PosDI - NegDI) / (PosDI + NegDI)).ewm(span=n_ADX, min_periods=1).mean(),
name='ADX_' + str(n) + '_' + str(n_ADX))
return ADX
def cal_ema_macd(data, n_fast=12, n_slow=26, n_signal=9) :
#https://wikidocs.net/3397
data["EMAFast"] = data["Close"].ewm(span=n_fast, min_periods=1).mean()
data["EMASlow"] = data["Close"].ewm(span=n_slow, min_periods=1).mean()
data["MACD"] = data["EMAFast"] - data["EMASlow"]
data["MACDSignal"] = data["MACD"].ewm(span=n_signal, min_periods=1).mean()
data["MACDDiff"] = data["MACD"] - data["MACDSignal"]
return data
def cal_rsi(data, N=14) :
#https://wikidocs.net/3399
U = np.where(data.diff(1) > 0, data.diff(1), 0)
D = np.where(data.diff(1) < 0, data.diff(1) * (-1), 0)
AU = pd.DataFrame(U).rolling(window=N, min_periods=1).mean()
AD = pd.DataFrame(U).rolling(window=N, min_periods=1).mean()
RSI = AU.div(AD+AU) * 100
num_nan = np.sum(np.isnan(np.array(RSI[2:])))
return RSI, num_nan
def cal_mv(data, N) :
mv_n = data.rolling(window=N, min_periods=1).mean()
return mv_n
def cal_kdjsi(data, N=15, M=5, T=3) :
L = data["Low"].rolling(window=N, min_periods=1).min()
H = data["High"].rolling(window=N, min_periods=1).max()
k = ((data["Close"] - L) / (H - L)) * 100
d = k.ewm(span=M).mean()
j = d.ewm(span=T).mean()
return k,d,j
path_dir = './data'
file_list = os.listdir(path_dir)
if not os.path.exists("./pre_data") :
os.makedirs("./pre_data")
for item in file_list :
if item.find("from") is not -1 :
data = np.loadtxt(path_dir + '/' + item, delimiter = ',')
data = pd.DataFrame(data)
data.columns = ['Open', 'High', "Low", "Close", "Volumn", "Adj"]
data = data[["Close", "Open", "High", "Low", "Volumn"]]
#Turnover
code = item[:6]
url = "https://finance.naver.com/item/main.nhn?code={}".format(code)
num_stock = cal_num_stock(url)
data["Turnover"] = data["Volumn"] / num_stock
#Bias --- ??
#Bolinger bands
data["MBB"], data["HBB"], data["LBB"] = cal_bb(data["Close"])
#moving averages
data["MV_5"] = cal_mv(data["Close"], 5)
data["MV_15"] = cal_mv(data["Close"], 15)
data["MV_60"] = cal_mv(data["Close"], 60)
#Exponential moving averages, MACD
data = cal_ema_macd(data)
#import pdb; pdb.set_trace()
#Stochastic index (stochastic oscillator)
data["KDJ_K"], data["KDJ_D"], data["KDJ_J"] = cal_kdjsi(data)
#Directional movement index
data["DMI"] = cal_dmi(data)
#Relative strength index
data["RSI"], num_nan = cal_rsi(data["Close"])
if num_nan > 1 :
print("{} has nan".format(item))
continue
np.save("./pre_data/{}.npy".format(item[:-4]), data.to_numpy())
각 지표에 대한 이해를 조금 하셨다면 코드 자체는 어렵지 않습니다. 크롤링을 할때 대부분 column 6개를 가져오게 되고, 그것의 순서를 변경 시킨다음에 각각의 지표들을 전부 계산해서 새로 저장해두었습니다.
12 종류의 지표들을 사용했지만 이동 평균, 볼린져 밴드 등 하나의 지표를 여러 값으로 판단하는 게 유의미한 경우가 있어서, 그런 경우에는 한 종류의 지표당 여러 개의 변수를 사용합니다.
사실 어려운 부분은 아니지만, 코드가 꽤나 길어서 따로 글을 썼습니다. 다음 부분에서는 Rolling Segmentation을 나름대로 구현한 글을 작성하겠습니다.