Thanks. The other day, I promised to go to a public bath with my juniors, but I was refused, "I'm sorry, I missed the last train with a girl ...", and the work progressed with anger and the article was completed.
So, I introduced it in the previous article, Forecasting the Nikkei 225 with Pytorch ~ Intermission ~.
Using Deep Learning Neural Networks and Candlestick Chart Representation to Predict Stock Market
I've created, coded, and learned the dataset for, so I'll keep it as a memorandum.
The dataset used here is like a chart, but it's a bit special. (For details, please see the paper !!)
What was created with reference to the paper
In this way, the image is 48x48 and the top and bottom are represented by red and green. Below is the code.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import glob
import pandas as pd
import numpy as np
import sys
import matplotlib.pyplot as plt
import datetime
import openpyxl
#Candlestick depiction
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
path = "/content/drive/My Drive/Colab/pytorch_pre/"
sys.path.append("/content/drive/My Drive/Colab/pytorch_pre")
import mpl_finance
Library related import. Since google colaboratory is used, path etc. indicates the location where the files on the drive are stored. Probably unusual is mpl_finance. It can be downloaded from git hub. It is a convenient one that can be used to depict candlesticks. mlp_finance
future_num = 1
fname = path + "data/nikkei_heikin_x3.csv"
flist = glob.glob(fname)
for file in flist:
dt = pd.read_csv(file, header=0, encoding="utf-8_sig", index_col='Datetime')
dt = dt.sort_values('Datetime')
future_price = dt.iloc[future_num:-18]['closing price'].values
curr_price = dt.iloc[:-18-future_num]['closing price'].values
#future_Treat the price compared with num days later as the correct label
y_data_tmp = future_price / curr_price
#Prepare a list for correct labels
y_data = np.zeros_like(y_data_tmp)
y_data_nam = np.zeros(len(y_data_tmp)+1, dtype=object)
Y_data = np.zeros((len(y_data_tmp)+1, 2), dtype=object)
Y_columns = ["label","ImageName"]
#Future to predict_Correct answer if num days later rises
for i in range(len(y_data_tmp)):
Y_data[i,0] = 0
if y_data_tmp[i] >= 1.0:
y_data[i] = 1
Y_data[i,0] = 1
count = 0
for i in y_data:
if i == 1:
count += 1
for i in range(1,len(y_data_tmp)+1):
y_data_nam[i] = "{:04}.png ".format(i-1)
Y_data[i,1] = y_data_nam[i]
Y_data[0,0] = "label"
Y_data[0,1] = "ImageName"
dn = pd.DataFrame(Y_data)
dn.columns = Y_columns
np.savetxt(path+"debug/y_data.csv", dn, delimiter=",", fmt='%s') #Write to csv file
I will do something about labeling around here. The read data is organized and binarized by 1 if the price on the day is higher than the previous day and 0 if the price is lower than the previous day. In order to be able to call the label from the image name later, arrange the file name (imagename) and label in the same row in csv and make it like a table.
img_create = 1
if img_create == 1:
seq_len = 20
df = pd.read_csv(fname, parse_dates=True, index_col=0)
df = df.sort_values('Datetime')
df = df.rename(columns={'Open price':'Open','High price':'High','Low price':'Low','closing price':'Close','Volume':'Volume'})
df['Datetime'] = df['Datetime'].map(mdates.date2num)
for i in range(0, len(df)):
c = df.iloc[i:i + int(seq_len) - 1, :]
if len(c) == int(seq_len-1):
# Date,Open,High,Low,Adj Close,Volume
ohlc = zip(c['Datetime'], c['Open'], c['High'],
c['Low'], c['Close'], c['Volume'])
my_dpi = 96
fig = plt.figure(figsize=(48 / my_dpi, 48 / my_dpi), dpi=my_dpi)
ax1 = plt.subplot2grid((1, 1), (0, 0))
# candlestick2_ohlc(ax1, c['Open'],c['High'],c['Low'],c['Close'], width=0.4, colorup='#77d879', colordown='#db3f3f')
mpl_finance.candlestick_ohlc(ax1, ohlc, width=0.4,
colorup='#77d879', colordown='#db3f3f')
# pngfile = 'datasets/{}_{}_{}.png'.format(
# i, seq_len, fname[11:-4])
pngfile = 'datasets/{:04}.png'.format(i)
fig.savefig(path+pngfile, pad_inches=0, transparent=False)
print("Converting olhc to candlestik finished.")
This is almost the same as the paper. Volume display is not possible. Please let me know if you can. Since it is not the same form of numerical data as the paper, it is matched. This is the data I used.
Match the shape by renaming. This completes the image of the dataset.
The training model uses ResNet. In the paper, SOTA was recorded using a unique model structure, but ResNet also obtained sufficient results, so I will use this.
class MyDataSet(Dataset):
def __init__(self, csv_path, root_dir):
self.train_df = pd.read_csv(csv_path)
self.root_dir = root_dir
self.images = os.listdir(self.root_dir)
self.transform = transforms.Compose([transforms.ToTensor()])
self.y_columns = ["label","ImageName"]
def __len__(self):
return len(self.images)
def __getitem__(self, idx):
#Image loading
# print(self.images)
image_name = self.images[idx]
image =, image_name) )
image = image.convert('RGB') # PyTorch 0.4 or later
# label (0 or 1)
self.train_df.columns = self.y_columns
label = self.train_df.query('ImageName=="'+image_name+'"')['label'].iloc[0]
return self.transform(image), int(label)
def conv3x3(in_channels, out_channels, stride=1, groups=1, dilation=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride,
padding=dilation, groups=groups, bias=True,
def conv1x1(in_channels, out_channels, stride=1):
return nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=True)
class BasicBlock(nn.Module):
# Implementation of Basic Building Block
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3(in_channels, out_channels, stride)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3(out_channels, out_channels)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
identity_x = x # hold input for shortcut connection
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity_x = self.downsample(x)
out += identity_x # shortcut connection
return self.relu(out)
class ResidualLayer(nn.Module):
def __init__(self, num_blocks, in_channels, out_channels, block=BasicBlock):
super(ResidualLayer, self).__init__()
downsample = None
if in_channels != out_channels:
downsample = nn.Sequential(
conv1x1(in_channels, out_channels),
self.first_block = block(in_channels, out_channels, downsample=downsample)
self.blocks = nn.ModuleList(block(out_channels, out_channels) for _ in range(num_blocks))
def forward(self, x):
out = self.first_block(x)
for block in self.blocks:
out = block(out)
return out
class ResNet18(nn.Module):
def __init__(self, num_classes):
super(ResNet18, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = ResidualLayer(2, in_channels=64, out_channels=64)
self.layer2 = ResidualLayer(2, in_channels=64, out_channels=128)
self.layer3 = ResidualLayer(
2, in_channels=128, out_channels=256)
self.layer4 = ResidualLayer(
2, in_channels=256, out_channels=512)
self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512, num_classes)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.maxpool(out)
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = self.avg_pool(out)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out
class Trainer:
def __init__(self, model, optimizer, criterion):
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.model =
self.optimizer = optimizer
self.criterion = criterion
def epoch_train(self, train_loader):
epoch_loss = 0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs =
targets =
outputs = self.model(inputs)
loss = self.criterion(outputs, targets)
epoch_loss += loss.item()
_, predicted = torch.max(, 1)
total += targets.size(0)
correct += predicted.eq(
epoch_loss /= len(train_loader)
acc = 100 * correct / total
return epoch_loss, acc
def epoch_valid(self, valid_loader):
epoch_loss = 0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(valid_loader):
inputs =
targets =
outputs = self.model(inputs)
loss = self.criterion(outputs, targets)
epoch_loss += loss.item()
_, predicted = torch.max(, 1)
total += targets.size(0)
correct += predicted.eq(
epoch_loss /= len(valid_loader)
acc = 100 * correct / total
return epoch_loss, acc
def params(self):
return self.model.state_dict()
It defines ResNet, reads datasets, etc. MyDataSet reads the label and file name and labels them.
if __name__ == '__main__':
model = ResNet18(2)
epoch_n = 100
optimizer = optim.Adam(model.parameters(),
criterion = nn.CrossEntropyLoss()
trainer = Trainer(model, optimizer, criterion)
transform = transforms.Compose([
transforms.Resize((50, 50)),
tmp_data = MyDataSet(path+'debug/y_data.csv', path+'datasets/')
dtrain, dtest = train_test_split(tmp_data, test_size=0.2)#You have to have 4 outputs??
# x_train, y_train, x_test, y_test = train_test_split(tmp_data)
train_loader =, batch_size=43, shuffle=True,
valid_loader =, batch_size=43, shuffle=True,
best_acc = -1
for epoch in range(1, 1 + epoch_n):
train_loss, train_acc = trainer.epoch_train(train_loader)
valid_loss, valid_acc = trainer.epoch_valid(valid_loader)
if valid_acc > best_acc:
best_acc = valid_acc
best_params = trainer.params
print(f'EPOCH: {epoch} / {epoch_n}')
print(f'TRAIN LOSS: {train_loss:.3f}, TRAIN ACC: {train_acc:.3f}')
print(f'VALID LOSS: {valid_loss:.3f}, VALID ACC: {valid_acc:.3f}'), path + 'models/resnet.pth')
This is the part to learn. I think it's almost the same as a normal ResNet. It is a learning result.
EPOCH: 1 / 100
TRAIN LOSS: 0.700, TRAIN ACC: 62.114
VALID LOSS: 1.145, VALID ACC: 52.442
EPOCH: 2 / 100
TRAIN LOSS: 0.502, TRAIN ACC: 76.391
VALID LOSS: 0.476, VALID ACC: 78.023
EPOCH: 3 / 100
TRAIN LOSS: 0.426, TRAIN ACC: 81.248
VALID LOSS: 0.467, VALID ACC: 77.907
EPOCH: 4 / 100
TRAIN LOSS: 0.368, TRAIN ACC: 83.633
VALID LOSS: 0.357, VALID ACC: 85.000
EPOCH: 5 / 100
TRAIN LOSS: 0.324, TRAIN ACC: 86.488
VALID LOSS: 0.648, VALID ACC: 76.395
EPOCH: 6 / 100
TRAIN LOSS: 0.305, TRAIN ACC: 87.018
VALID LOSS: 0.365, VALID ACC: 84.884
EPOCH: 7 / 100
TRAIN LOSS: 0.277, TRAIN ACC: 88.284
VALID LOSS: 0.480, VALID ACC: 79.884
EPOCH: 92 / 100
TRAIN LOSS: 0.006, TRAIN ACC: 99.853
VALID LOSS: 0.874, VALID ACC: 85.349
EPOCH: 93 / 100
TRAIN LOSS: 0.025, TRAIN ACC: 99.058
VALID LOSS: 0.960, VALID ACC: 78.953
EPOCH: 94 / 100
TRAIN LOSS: 0.028, TRAIN ACC: 99.058
VALID LOSS: 0.992, VALID ACC: 85.698
EPOCH: 95 / 100
TRAIN LOSS: 0.021, TRAIN ACC: 99.264
VALID LOSS: 0.744, VALID ACC: 84.884
EPOCH: 96 / 100
TRAIN LOSS: 0.007, TRAIN ACC: 99.735
VALID LOSS: 1.000, VALID ACC: 85.349
EPOCH: 97 / 100
TRAIN LOSS: 0.008, TRAIN ACC: 99.735
VALID LOSS: 0.834, VALID ACC: 86.279
EPOCH: 98 / 100
TRAIN LOSS: 0.020, TRAIN ACC: 99.470
VALID LOSS: 0.739, VALID ACC: 85.930
EPOCH: 99 / 100
TRAIN LOSS: 0.005, TRAIN ACC: 99.853
VALID LOSS: 0.846, VALID ACC: 86.395
EPOCH: 100 / 100
TRAIN LOSS: 0.016, TRAIN ACC: 99.382
VALID LOSS: 1.104, VALID ACC: 83.953
The train data recorded 100%, and the test data also recorded 87%. Considering about 60% using the previous LSTM, the accuracy is much better.
It turns out that the accuracy can be improved by just hitting the top and bottom by using the image. I don't know if it will be profitable to actually use it, but ... Since this is a follow-up experiment to the dissertation, I'm thinking of incorporating deep-distance learning and predicting a 3% increase.
By the way, in the data of about 4300, only 70 increased by 3% from the previous day. It is 5% of the total, which is a fairly unbalanced data set.
Next time I would like to do something about this.
