"[PyTorch Neural Network Implementation Handbook](https://www.amazon.co.jp/PyTorch%E3%83%8B%E3%83%A5%E3%83%BC%E3%83%A9%E3%83" % AB% E3% 83% 8D% E3% 83% 83% E3% 83% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF% E5% AE% 9F% E8% A3% 85 % E3% 83% 8F% E3% 83% B3% E3% 83% 89% E3% 83% 96% E3% 83% 83% E3% 82% AF-Python% E3% 83% A9% E3% 82% A4 % E3% 83% 96% E3% 83% A9% E3% 83% AA% E5% AE% 9A% E7% 95% AA% E3% 82% BB% E3% 83% AC% E3% 82% AF% E3 % 82% B7% E3% 83% A7% E3% 83% B3-% E5% AE% AE% E6% 9C% AC-% E5% 9C% AD% E4% B8% 80% E9% 83% 8E / dp / 4798055476) ”, Chapter 5 RNN has been read, so I tried to analyze the stock price.
It is the memorandum.
From the stock price (opening price, high price, low price, closing price) of Toyota Motor Corporation (7203) for the past 20 years, we predicted whether the next day's return (next day closing price-next day opening price) would be 2-3.5% ( Binary classification problem).
The reason for 2-3.5% is that 1) to secure the minimum return, and 2) to ignore stock price fluctuations due to fundamental factors such as news. Also, when I analyzed the daily return (closing price-opening price) of TOPIX500 before, the return of 2-3.5% was around 5%, which was just right for forecasting.
range | return(%) |
---|---|
~ -3.5 | 4.5 |
-3.5 ~ -0.5 | 7.4 |
-2.0 ~ -0.5 | 22.6 |
-0.5 ~ 0.5 | 34.5 |
0.5 ~ 2.0 | 19.5 |
2.0 ~ 3.5 | 6.5 |
3.5 ~ | 5.0 |
result When the data for the past 75 days was predicted as an explanatory variable, the correct answer rate was 97.42% (← suspicious, so verification required).
For implementation, I referred to the book at the beginning and this site. https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/
First, import the required library.
import torch
import torch.nn as nn
import torch.optim as optim
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
Since the analysis data is on Google Drive, make sure that you can access the drive with the following code.
from google.colab import drive
drive.mount('/content/drive')
Check if cuda can be used and specify device.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
This time, we will use the stock price data of Toyota Motor Corporation (7203) (data for 27 years since 1983).
df_init = pd.read_csv('/content/drive/My Drive/XXXXXXXXXX/7203.csv', encoding='sjis')
df_init.head()
Stock code | date | Open price | High price | Low price | closing price | |
---|---|---|---|---|---|---|
0 | 7203 | 30320 | 747.911341 | 754.710535 | 741.112147 | 741.112147 |
1 | 7203 | 30321 | 747.911341 | 747.911341 | 720.714565 | 734.312953 |
2 | 7203 | 30322 | 720.714565 | 727.513759 | 707.116177 | 713.915371 |
3 | 7203 | 30323 | 727.513759 | 734.312953 | 713.915371 | 727.513759 |
4 | 7203 | 30324 | 727.513759 | 727.513759 | 720.714565 | 727.513759 |
Even if there are too many variables, 1) the calculation time will be long, and 2) there is a risk of overfitting, so this time we will focus on the variables only for the opening price, high price, low price, and closing price.
df = pd.DataFrame()
df['open'] = df_init['Open price']
df['high'] = df_init['High price']
df['low'] = df_init['Low price']
df['close'] = df_init['closing price']
#Next day return(closing price-Open price)Calculate and 2-3.5%Set the flag to 1 when.
df['return'] = (df_init['closing price'].shift() - df_init['Open price'].shift())/df_init['Open price'].shift()
df['return'] = ((df['return']>=0.02) & (df['return']<=0.035)).astype(int)
print(len(df))
print(sum(df['return']))
df.head()
Create data for time series analysis. This time, we will use the data for the past 75 days (≓ 3 months) as an explanatory variable.
window = 75
def create_inout_sequences(in_data, in_label, window):
out_seq = []
out_label = []
length = len(in_data)
for i in range(window, length):
tmp_data = in_data[i-window:i+1] / in_data[i,3]
tmp_label = [in_label[i]]
out_seq.append(torch.Tensor(tmp_data))
out_label.append(torch.Tensor(tmp_label).type(torch.long))
return out_seq, out_label
out_seq, out_label = create_inout_sequences(df.iloc[:,:4].values, df.iloc[:,4].values, window)
Output the data and check if it is the desired data.
print(len(out_seq))
print(out_seq[0])
print(out_label[0])
'''output
8660
tensor([[1.0577, 1.0673, 1.0481, 1.0481],
[1.0577, 1.0577, 1.0192, 1.0385],
[1.0192, 1.0288, 1.0000, 1.0096],
[1.0288, 1.0385, 1.0096, 1.0288],
[1.0288, 1.0288, 1.0192, 1.0288],
~~ Omitted ~~
[1.0288, 1.0385, 1.0288, 1.0385],
[1.0288, 1.0385, 1.0192, 1.0192],
[1.0192, 1.0288, 1.0000, 1.0000],
[1.0096, 1.0192, 1.0000, 1.0192],
[1.0192, 1.0288, 1.0000, 1.0000]])
tensor([0])
'''
Divide the data into training, evaluation, and inference. The number of each data is appropriate. There is an interval of 100 days (> 75 days) between each data so that the data do not overlap.
x_train = out_seq[:5000]
x_valid = out_seq[5100:6000]
x_test = out_seq[6100:]
y_train = out_label[:5000]
y_valid = out_label[5100:6000]
y_test = out_label[6100:]
Build a model consisting of input → LSTM → fully connected layer. Since it is a binary classification, the output dimension is "2". Since this is a trial, there is no deep meaning to the size of the batch or hidden layer.
input_size=4
batch_size = 32
hidden_layer_size=50
output_size=2
class LstmClassifier(nn.Module):
def __init__(self, input_size, hidden_layer_size, output_size, batch_size):
super().__init__()
self.batch_size = batch_size
self.hidden_layer_size = hidden_layer_size
#lstm defaults to batch_first=Since it is False, batch_first=Set to True
self.lstm = nn.LSTM(input_size, hidden_layer_size, batch_first=True)
self.fc = nn.Linear(hidden_layer_size, output_size)
self.softmax = nn.Softmax(dim=1)
#Set initial hidden state and cell state
self.hidden_cell = (torch.zeros(1, self.batch_size, self.hidden_layer_size).to(device),
torch.zeros(1, self.batch_size, self.hidden_layer_size).to(device))
def forward(self, input_seq):
x = input_seq
#Propagate LSTM
lstm_out, self.hidden_cell = self.lstm(x, self.hidden_cell)
out = self.fc(self.hidden_cell[0])
out = out[-1]
return out
model = LstmClassifier(input_size, hidden_layer_size, output_size, batch_size)
model = model.to(device)
model
'''output
LstmClassifier(
(lstm): LSTM(4, 50, batch_first=True)
(fc): Linear(in_features=50, out_features=2, bias=True)
(softmax): Softmax(dim=1)
)
'''
Use cross entropy for the loss function and Adam for the optimization function.
criterion = nn.CrossEntropyLoss()
optimiser = optim.Adam(model.parameters())
For the time being, let's carry out learning with the number of epochs set to 100.
The gradient is cut off by detach for each epoch, but since RNN has a large amount of calculation, it seems that intermediate results that are no longer needed to reduce memory usage are deleted by detach ([Reference](https: /). /discuss.pytorch.org/t/runtimeerror-trying-to-backward-through-the-graph-a-second-time-but-the-buffers-have-already-been-freed-specify-retain-graph-true -when-calling-backward-the-first-time / 6795/3)).
num_epochs = 100
train_loss_list = []
train_acc_list = []
val_loss_list = []
val_acc_list = []
#Stop backpropagation in the middle
def detach(states):
return [state.detach() for state in states]
#Combine tensors
def cat_Tensor(data, i_batch, batch_size):
for i, idx in enumerate(range(i_batch*batch_size, (i_batch+1)*batch_size)):
#Increase dimensions
tmp = torch.unsqueeze(data[idx], 0)
if i==0:
output = tmp
else:
output = torch.cat((output, tmp), 0)
return output
for i_epoch in range(num_epochs):
train_loss = 0
train_acc = 0
val_loss = 0
val_acc = 0
#train
model.train()
n_batch = len(x_train)//batch_size
for i_batch in range(n_batch):
seq = cat_Tensor(x_train, i_batch, batch_size)
labels = cat_Tensor(y_train, i_batch, batch_size)
labels = torch.squeeze(labels, 1)
seq = seq.to(device)
labels = labels.to(device)
#Reset gradient
optimiser.zero_grad()
#Stop backpropagation in the middle. Error countermeasures
model.hidden_cell = detach(model.hidden_cell)
#Forward propagation
outputs = model(seq)
#Backpropagation of error
loss = criterion(outputs, labels)
#Accumulation of error
train_loss += loss.item()
train_acc += (outputs.max(1)[1] == labels).sum().item()
#Backpropagation calculation
loss.backward()
#Weight update
optimiser.step()
avg_train_loss = train_loss / n_batch
avg_train_acc = train_acc / (n_batch*batch_size)
#val
model.eval()
with torch.no_grad():
n_batch = len(x_valid)//batch_size
for i_batch in range(n_batch):
seq = cat_Tensor(x_valid, i_batch, batch_size)
labels = cat_Tensor(y_valid, i_batch, batch_size)
labels = torch.squeeze(labels, 1)
seq = seq.to(device)
labels = labels.to(device)
#Forward propagation
outputs = model(seq)
loss = criterion(outputs, labels)
#Accumulation of error
val_loss += loss.item()
val_acc += (outputs.max(1)[1] == labels).sum().item()
avg_val_loss = val_loss / n_batch
avg_val_acc = val_acc / (n_batch*batch_size)
print ('Epoch [{}/{}], Loss: {loss:.4f}, val_loss: {val_loss:.4f}, Acc:{acc:.4f}, val_acc: {val_acc:.4f}'
.format(i_epoch+1, num_epochs, loss=avg_train_loss, val_loss=avg_val_loss,
acc=avg_train_acc, val_acc=avg_val_acc))
train_loss_list.append(avg_train_loss)
train_acc_list.append(avg_train_acc)
val_loss_list.append(avg_val_loss)
val_acc_list.append(avg_val_acc)
'''output
Epoch [1/100], Loss: 0.1198, val_loss: 0.0632, Acc:0.9439, val_acc: 0.9743
Epoch [2/100], Loss: 0.1147, val_loss: 0.0609, Acc:0.9397, val_acc: 0.9743
Epoch [3/100], Loss: 0.1119, val_loss: 0.0590, Acc:0.9403, val_acc: 0.9743
Epoch [4/100], Loss: 0.1096, val_loss: 0.0569, Acc:0.9407, val_acc: 0.9743
Epoch [5/100], Loss: 0.1069, val_loss: 0.0557, Acc:0.9417, val_acc: 0.9754
Epoch [6/100], Loss: 0.1046, val_loss: 0.0544, Acc:0.9437, val_acc: 0.9754
Epoch [7/100], Loss: 0.1032, val_loss: 0.0525, Acc:0.9455, val_acc: 0.9799
Epoch [8/100], Loss: 0.1023, val_loss: 0.0507, Acc:0.9459, val_acc: 0.9799
Epoch [9/100], Loss: 0.1012, val_loss: 0.0500, Acc:0.9457, val_acc: 0.9788
Epoch [10/100], Loss: 0.0998, val_loss: 0.0486, Acc:0.9469, val_acc: 0.9799
~~ Omitted ~~
Epoch [95/100], Loss: 0.0669, val_loss: 0.0420, Acc:0.9688, val_acc: 0.9888
Epoch [96/100], Loss: 0.0665, val_loss: 0.0419, Acc:0.9692, val_acc: 0.9888
Epoch [97/100], Loss: 0.0662, val_loss: 0.0419, Acc:0.9698, val_acc: 0.9888
Epoch [98/100], Loss: 0.0659, val_loss: 0.0419, Acc:0.9702, val_acc: 0.9888
Epoch [99/100], Loss: 0.0656, val_loss: 0.0419, Acc:0.9704, val_acc: 0.9888
Epoch [100/100], Loss: 0.0652, val_loss: 0.0417, Acc:0.9708, val_acc: 0.9888
'''
Let's visualize if you are learning properly.
import matplotlib.pyplot as plt
%matplotlib inline
plt.figure()
plt.plot(range(num_epochs), train_loss_list, color='blue', linestyle='-', label='train_loss')
plt.plot(range(num_epochs), val_loss_list, color='green', linestyle='--', label='val_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.title('Training and validation loss')
plt.grid()
plt.figure()
plt.plot(range(num_epochs), train_acc_list, color='blue', linestyle='-', label='train_acc')
plt.plot(range(num_epochs), val_acc_list, color='green', linestyle='--', label='val_acc')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('acc')
plt.title('Training and validation accuracy')
plt.grid()
I will try to make predictions using data that I have not used for training and evaluation.
model.eval()
with torch.no_grad():
total = 0
test_acc = 0
n_batch = len(x_test)//batch_size
for i_batch in range(n_batch):
seq = cat_Tensor(x_test, i_batch, batch_size)
labels = cat_Tensor(y_test, i_batch, batch_size)
labels = torch.squeeze(labels, 1)
seq = seq.to(device)
labels = labels.to(device)
outputs = model(seq)
test_acc += (outputs.max(1)[1] == labels).sum().item()
total += labels.size(0)
print('accuracy: {} %'.format(100 * test_acc / total))
'''output
accuracy: 97.421875 %
'''
The accuracy was 97.42%, and we were able to make highly accurate predictions. However, I feel that it is too expensive, so I would like to verify it later.
Recommended Posts