Stock Price Forecasting Using LSTM_1

"[PyTorch Neural Network Implementation Handbook](https://www.amazon.co.jp/PyTorch%E3%83%8B%E3%83%A5%E3%83%BC%E3%83%A9%E3%83" % AB% E3% 83% 8D% E3% 83% 83% E3% 83% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF% E5% AE% 9F% E8% A3% 85 % E3% 83% 8F% E3% 83% B3% E3% 83% 89% E3% 83% 96% E3% 83% 83% E3% 82% AF-Python% E3% 83% A9% E3% 82% A4 % E3% 83% 96% E3% 83% A9% E3% 83% AA% E5% AE% 9A% E7% 95% AA% E3% 82% BB% E3% 83% AC% E3% 82% AF% E3 % 82% B7% E3% 83% A7% E3% 83% B3-% E5% AE% AE% E6% 9C% AC-% E5% 9C% AD% E4% B8% 80% E9% 83% 8E / dp / 4798055476) ”, Chapter 5 RNN has been read, so I tried to analyze the stock price.

It is the memorandum.

Introduction

From the stock price (opening price, high price, low price, closing price) of Toyota Motor Corporation (7203) for the past 20 years, we predicted whether the next day's return (next day closing price-next day opening price) would be 2-3.5% ( Binary classification problem).

The reason for 2-3.5% is that 1) to secure the minimum return, and 2) to ignore stock price fluctuations due to fundamental factors such as news. Also, when I analyzed the daily return (closing price-opening price) of TOPIX500 before, the return of 2-3.5% was around 5%, which was just right for forecasting.

range return(%)
~ -3.5 4.5
-3.5 ~ -0.5 7.4
-2.0 ~ -0.5 22.6
-0.5 ~ 0.5 34.5
0.5 ~ 2.0 19.5
2.0 ~ 3.5 6.5
3.5 ~ 5.0

result When the data for the past 75 days was predicted as an explanatory variable, the correct answer rate was 97.42% (← suspicious, so verification required).

For implementation, I referred to the book at the beginning and this site. https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/

Preparation

First, import the required library.

import torch
import torch.nn as nn
import torch.optim as optim

%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

Since the analysis data is on Google Drive, make sure that you can access the drive with the following code.

from google.colab import drive
drive.mount('/content/drive')

Check if cuda can be used and specify device.

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

Data reading

This time, we will use the stock price data of Toyota Motor Corporation (7203) (data for 27 years since 1983).

df_init = pd.read_csv('/content/drive/My Drive/XXXXXXXXXX/7203.csv', encoding='sjis')
df_init.head()
Stock code date Open price High price Low price closing price
0 7203 30320 747.911341 754.710535 741.112147 741.112147
1 7203 30321 747.911341 747.911341 720.714565 734.312953
2 7203 30322 720.714565 727.513759 707.116177 713.915371
3 7203 30323 727.513759 734.312953 713.915371 727.513759
4 7203 30324 727.513759 727.513759 720.714565 727.513759

Even if there are too many variables, 1) the calculation time will be long, and 2) there is a risk of overfitting, so this time we will focus on the variables only for the opening price, high price, low price, and closing price.

df = pd.DataFrame()
df['open'] = df_init['Open price']
df['high'] = df_init['High price']
df['low'] = df_init['Low price']
df['close'] = df_init['closing price']
#Next day return(closing price-Open price)Calculate and 2-3.5%Set the flag to 1 when.
df['return'] = (df_init['closing price'].shift() - df_init['Open price'].shift())/df_init['Open price'].shift()
df['return'] = ((df['return']>=0.02) & (df['return']<=0.035)).astype(int)
print(len(df))
print(sum(df['return']))
df.head()

Create data for time series analysis. This time, we will use the data for the past 75 days (≓ 3 months) as an explanatory variable.

window = 75

def create_inout_sequences(in_data, in_label, window):
    out_seq = []
    out_label = [] 
    length = len(in_data)
    for i in range(window, length):
        tmp_data = in_data[i-window:i+1] / in_data[i,3]
        tmp_label = [in_label[i]]
        out_seq.append(torch.Tensor(tmp_data))
        out_label.append(torch.Tensor(tmp_label).type(torch.long))
    return out_seq, out_label

out_seq, out_label = create_inout_sequences(df.iloc[:,:4].values, df.iloc[:,4].values, window)

Output the data and check if it is the desired data.

print(len(out_seq))
print(out_seq[0])
print(out_label[0])

'''output
8660
tensor([[1.0577, 1.0673, 1.0481, 1.0481],
        [1.0577, 1.0577, 1.0192, 1.0385],
        [1.0192, 1.0288, 1.0000, 1.0096],
        [1.0288, 1.0385, 1.0096, 1.0288],
        [1.0288, 1.0288, 1.0192, 1.0288],
~~ Omitted ~~
        [1.0288, 1.0385, 1.0288, 1.0385],
        [1.0288, 1.0385, 1.0192, 1.0192],
        [1.0192, 1.0288, 1.0000, 1.0000],
        [1.0096, 1.0192, 1.0000, 1.0192],
        [1.0192, 1.0288, 1.0000, 1.0000]])
tensor([0])
'''

Divide the data into training, evaluation, and inference. The number of each data is appropriate. There is an interval of 100 days (> 75 days) between each data so that the data do not overlap.

x_train = out_seq[:5000]
x_valid = out_seq[5100:6000]
x_test = out_seq[6100:]
y_train = out_label[:5000]
y_valid = out_label[5100:6000]
y_test = out_label[6100:]

Modeling

Build a model consisting of input → LSTM → fully connected layer. Since it is a binary classification, the output dimension is "2". Since this is a trial, there is no deep meaning to the size of the batch or hidden layer.

input_size=4
batch_size = 32
hidden_layer_size=50
output_size=2

class LstmClassifier(nn.Module):
    def __init__(self, input_size, hidden_layer_size, output_size, batch_size):
        super().__init__()
        self.batch_size = batch_size
        self.hidden_layer_size = hidden_layer_size
        #lstm defaults to batch_first=Since it is False, batch_first=Set to True
        self.lstm = nn.LSTM(input_size, hidden_layer_size, batch_first=True)
        self.fc = nn.Linear(hidden_layer_size, output_size)
        self.softmax = nn.Softmax(dim=1)
        #Set initial hidden state and cell state
        self.hidden_cell = (torch.zeros(1, self.batch_size, self.hidden_layer_size).to(device),
                            torch.zeros(1, self.batch_size, self.hidden_layer_size).to(device))

    def forward(self, input_seq):
        x = input_seq
        #Propagate LSTM
        lstm_out, self.hidden_cell = self.lstm(x, self.hidden_cell)
        out = self.fc(self.hidden_cell[0])
        out = out[-1]
        return out


model = LstmClassifier(input_size, hidden_layer_size, output_size, batch_size)
model = model.to(device)
model

'''output
LstmClassifier(
  (lstm): LSTM(4, 50, batch_first=True)
  (fc): Linear(in_features=50, out_features=2, bias=True)
  (softmax): Softmax(dim=1)
)
'''

Use cross entropy for the loss function and Adam for the optimization function.

criterion = nn.CrossEntropyLoss()
optimiser = optim.Adam(model.parameters())

Learning

For the time being, let's carry out learning with the number of epochs set to 100.

The gradient is cut off by detach for each epoch, but since RNN has a large amount of calculation, it seems that intermediate results that are no longer needed to reduce memory usage are deleted by detach ([Reference](https: /). /discuss.pytorch.org/t/runtimeerror-trying-to-backward-through-the-graph-a-second-time-but-the-buffers-have-already-been-freed-specify-retain-graph-true -when-calling-backward-the-first-time / 6795/3)).

num_epochs = 100
train_loss_list = []
train_acc_list = []
val_loss_list = []
val_acc_list = []

#Stop backpropagation in the middle
def detach(states):
    return [state.detach() for state in states] 

#Combine tensors
def cat_Tensor(data, i_batch, batch_size):
    for i, idx in enumerate(range(i_batch*batch_size, (i_batch+1)*batch_size)):
        #Increase dimensions
        tmp = torch.unsqueeze(data[idx], 0)
        if i==0:
            output = tmp
        else:
            output = torch.cat((output, tmp), 0)
    return output

for i_epoch in range(num_epochs):

    train_loss = 0
    train_acc = 0
    val_loss = 0
    val_acc = 0

    #train
    model.train()

    n_batch = len(x_train)//batch_size
    for i_batch in range(n_batch):
        seq = cat_Tensor(x_train, i_batch, batch_size)
        labels = cat_Tensor(y_train, i_batch, batch_size)
        labels = torch.squeeze(labels, 1)

        seq = seq.to(device)
        labels = labels.to(device)
        
        #Reset gradient
        optimiser.zero_grad()
        #Stop backpropagation in the middle. Error countermeasures
        model.hidden_cell = detach(model.hidden_cell)
        #Forward propagation
        outputs = model(seq)
        #Backpropagation of error
        loss = criterion(outputs, labels)
        #Accumulation of error
        train_loss += loss.item()
        train_acc += (outputs.max(1)[1] == labels).sum().item()
        #Backpropagation calculation
        loss.backward()
        #Weight update
        optimiser.step()

    avg_train_loss = train_loss / n_batch
    avg_train_acc = train_acc / (n_batch*batch_size)

    #val
    model.eval()
    with torch.no_grad():
        n_batch = len(x_valid)//batch_size
        for i_batch in range(n_batch):
            seq = cat_Tensor(x_valid, i_batch, batch_size)
            labels = cat_Tensor(y_valid, i_batch, batch_size)
            labels = torch.squeeze(labels, 1)

            seq = seq.to(device)
            labels = labels.to(device)

            #Forward propagation
            outputs = model(seq)
            loss = criterion(outputs, labels)
            #Accumulation of error
            val_loss += loss.item()
            val_acc += (outputs.max(1)[1] == labels).sum().item()

    avg_val_loss = val_loss / n_batch
    avg_val_acc = val_acc / (n_batch*batch_size)
    
    print ('Epoch [{}/{}], Loss: {loss:.4f}, val_loss: {val_loss:.4f}, Acc:{acc:.4f}, val_acc: {val_acc:.4f}' 
        .format(i_epoch+1, num_epochs, loss=avg_train_loss, val_loss=avg_val_loss, 
                acc=avg_train_acc, val_acc=avg_val_acc))
    
    train_loss_list.append(avg_train_loss)
    train_acc_list.append(avg_train_acc)
    val_loss_list.append(avg_val_loss)
    val_acc_list.append(avg_val_acc)


'''output
Epoch [1/100], Loss: 0.1198, val_loss: 0.0632, Acc:0.9439, val_acc: 0.9743
Epoch [2/100], Loss: 0.1147, val_loss: 0.0609, Acc:0.9397, val_acc: 0.9743
Epoch [3/100], Loss: 0.1119, val_loss: 0.0590, Acc:0.9403, val_acc: 0.9743
Epoch [4/100], Loss: 0.1096, val_loss: 0.0569, Acc:0.9407, val_acc: 0.9743
Epoch [5/100], Loss: 0.1069, val_loss: 0.0557, Acc:0.9417, val_acc: 0.9754
Epoch [6/100], Loss: 0.1046, val_loss: 0.0544, Acc:0.9437, val_acc: 0.9754
Epoch [7/100], Loss: 0.1032, val_loss: 0.0525, Acc:0.9455, val_acc: 0.9799
Epoch [8/100], Loss: 0.1023, val_loss: 0.0507, Acc:0.9459, val_acc: 0.9799
Epoch [9/100], Loss: 0.1012, val_loss: 0.0500, Acc:0.9457, val_acc: 0.9788
Epoch [10/100], Loss: 0.0998, val_loss: 0.0486, Acc:0.9469, val_acc: 0.9799
~~ Omitted ~~
Epoch [95/100], Loss: 0.0669, val_loss: 0.0420, Acc:0.9688, val_acc: 0.9888
Epoch [96/100], Loss: 0.0665, val_loss: 0.0419, Acc:0.9692, val_acc: 0.9888
Epoch [97/100], Loss: 0.0662, val_loss: 0.0419, Acc:0.9698, val_acc: 0.9888
Epoch [98/100], Loss: 0.0659, val_loss: 0.0419, Acc:0.9702, val_acc: 0.9888
Epoch [99/100], Loss: 0.0656, val_loss: 0.0419, Acc:0.9704, val_acc: 0.9888
Epoch [100/100], Loss: 0.0652, val_loss: 0.0417, Acc:0.9708, val_acc: 0.9888
'''

Let's visualize if you are learning properly.

import matplotlib.pyplot as plt
%matplotlib inline

plt.figure()
plt.plot(range(num_epochs), train_loss_list, color='blue', linestyle='-', label='train_loss')
plt.plot(range(num_epochs), val_loss_list, color='green', linestyle='--', label='val_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.title('Training and validation loss')
plt.grid()

plt.figure()
plt.plot(range(num_epochs), train_acc_list, color='blue', linestyle='-', label='train_acc')
plt.plot(range(num_epochs), val_acc_list, color='green', linestyle='--', label='val_acc')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('acc')
plt.title('Training and validation accuracy')
plt.grid()

image.png image.png

inference

I will try to make predictions using data that I have not used for training and evaluation.

model.eval()
with torch.no_grad():
    total = 0
    test_acc = 0
    
    n_batch = len(x_test)//batch_size
    for i_batch in range(n_batch):
        seq = cat_Tensor(x_test, i_batch, batch_size)
        labels = cat_Tensor(y_test, i_batch, batch_size)
        labels = torch.squeeze(labels, 1)
        
        seq = seq.to(device)
        labels = labels.to(device)

        outputs = model(seq)
        test_acc += (outputs.max(1)[1] == labels).sum().item()
        total += labels.size(0)
    print('accuracy: {} %'.format(100 * test_acc / total)) 

'''output
accuracy: 97.421875 %
'''

The accuracy was 97.42%, and we were able to make highly accurate predictions. However, I feel that it is too expensive, so I would like to verify it later.

Recommended Posts

Stock Price Forecasting Using LSTM_1
Stock price forecast using machine learning (scikit-learn)
Stock price forecast using deep learning (TensorFlow)
Stock Price Forecast with TensorFlow (LSTM) ~ Stock Forecast Part 1 ~
Stock price forecast using machine learning (regression)
Stock Price Forecast Using Deep Learning (TensorFlow) -Part 2-
Stock price forecast using deep learning [Data acquisition]
Stock Price Forecast 2 Chapter 2
Stock Price Forecast 1 Chapter 1
Stock price forecast with tensorflow
Strongest Pokemon generation using LSTM
Python: Stock Price Forecast Part 2
Get stock price with Python
Stock price data acquisition tips
Python: Stock Price Forecast Part 1
Predicting stock price changes using metal labeling and two-step machine learning
I tried using GLM (generalized linear model) for stock price data
[Python] My stock price forecast [HFT]
LSTM (1) for time series forecasting (for beginners)