The purpose of this article is to extract weights from the LSTM model learned in Keras and implement the feedforward part in numpy. For my personal purpose, I wanted to play with Keras.
As for the data, I used the data of sin wave prediction using RNN in deep learning library Keras as it is. This article seems to be based on I made RNN learn sin waves and predicted, so I am grateful to both of them.
import pandas as pd
import math
import numpy as np
np.random.seed(0)
#Random number coefficient
random_factor = 0.05
#Number of steps per cycle
steps_per_cycle = 80
#Number of cycles to generate
number_of_cycles = 50
df = pd.DataFrame(np.arange(steps_per_cycle * number_of_cycles + 1), columns=["t"])
df["sin_t"] = df.t.apply(lambda x: math.sin(x * (2 * math.pi / steps_per_cycle)+ np.random.uniform(-1.0, +1.0) * random_factor))
def _load_data(data, n_prev = 100):
"""
data should be pd.DataFrame()
"""
docX, docY = [], []
for i in range(len(data)-n_prev):
docX.append(data.iloc[i:i+n_prev].as_matrix())
docY.append(data.iloc[i+n_prev].as_matrix())
alsX = np.array(docX)
alsY = np.array(docY)
return alsX, alsY
def train_test_split(df, test_size=0.1, n_prev = 100):
"""
This just splits data to training and testing parts
"""
ntrn = round(len(df) * (1 - test_size))
ntrn = int(ntrn)
X_train, y_train = _load_data(df.iloc[0:ntrn], n_prev)
X_test, y_test = _load_data(df.iloc[ntrn:], n_prev)
return (X_train, y_train), (X_test, y_test)
length_of_sequences = 5
(X_train, y_train), (X_test, y_test) = train_test_split(df[["sin_t"]], n_prev =length_of_sequences)
As you can see in Keras documentation, refer to here for the time being. Looks good. The model without $ V_o $ in the Our model part below here was implemented in Keras.
So, let's make a model as follows and then make a prediction.
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
in_out_neurons = 1
h_num = 100
model = Sequential()
model.add(LSTM(h_num, activation="tanh", recurrent_activation="sigmoid", batch_input_shape=(None, length_of_sequences, in_out_neurons), return_sequences=False))
model.add(Dense(in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
model.fit(X_train, y_train, batch_size=600, epochs=15, validation_split=0.05)
y_hat_keras = model.predict(X_test)
Get the parameters of the model you learned earlier. This seems to be possible with the get_weights ()` `` method. This method was found in both the
keras.models.Sequential object and the `` `keras.layers
object, so with `model.get_weights ()`
as follows: You can get it, or you can get it with ``` model.layers [0] .get_weights ()` ``.
model.get_weights()Which parameter is which? So, for the time being, model.After checking layers etc., model.get_weights()I think it is better to get it with.
```python
weights = model.get_weights()
W, U, b, W_out, b_out = model.get_weights()
print("W.shape : ", W.shape)
print("U.shape : ", U.shape)
print("b.shape : ", b.shape)
print("W_out.shape : ", W_out.shape)
print("b_out.shape : ", b_out.shape)
Wi = W[:,0:h_num]
Wf = W[:,h_num:2*h_num]
Wc = W[:,2*h_num:3*h_num]
Wo = W[:,3*h_num:]
print("Wi : ",Wi.shape)
print("Wf : ",Wf.shape)
print("Wc : ",Wc.shape)
print("Wo : ",Wo.shape)
Ui = U[:,0:h_num]
Uf = U[:,h_num:2*h_num]
Uc = U[:,2*h_num:3*h_num]
Uo = U[:,3*h_num:]
print("Ui : ",Ui.shape)
print("Uf : ",Uf.shape)
print("Uc : ",Uc.shape)
print("Uo : ",Uo.shape)
bi = b[0:h_num]
bf = b[h_num:2*h_num]
bc = b[2*h_num:3*h_num]
bo = b[3*h_num:]
print("bi : ",bi.shape)
print("bf : ",bf.shape)
print("bc : ",bc.shape)
print("bo : ",bo.shape)
Looking at the original code, you can see that input, forget, memory cell, in one array. Since the weights are stored in the order of output, they are sliced as above.
The mathematical formula part of the original document did not describe the number of dimensions of the matrix, and it was a little difficult to read, so I carefully commented on that side. (I feel that the code has become quite difficult to read)
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
x = X_test
n = x.shape[0]
#initial
ht_1 = np.zeros(n*h_num).reshape(n,h_num) #h_{t-1}Means.
Ct_1 = np.zeros(n*h_num).reshape(n,h_num) ##C_{t-1}Means.
ht_list = []
for t in np.arange(x.shape[1]):
xt = np.array(x[:,t,:])
#it :t-term input gate
it = sigmoid(np.dot(xt, Wi) + np.dot(ht_1, Ui) + bi)
# it : (390, 100)
# xt : (390, 1), Wi : (1, 100)
# ht_1 : (390, 100), Ui : (100, 100)
# bi : (100,)
# Ct_tilda :Candidate for t-th period of memory cell
Ct_tilda = np.tanh(np.dot(xt, Wc) + np.dot(ht_1, Uc) + bc)
# Ct_tilda : (390, 100)
# xt : (390, 1), Wc : (1, 100)
# ht_1 : (390, 100), Uc : (100, 100)
# bc : (100,)
# ft :t-term forget gate
ft = sigmoid(np.dot(xt, Wf) + np.dot(ht_1, Uf) + bf)
# ft : (390, 100)
# xt : (390, 1), Wf : (1, 100)
# ht_1 : (390, 100), Uf : (100, 100)
# bf : (100,)
#t-term memory cell
Ct = it * Ct_tilda + ft * Ct_1
# ot :t-term output gate
ot = sigmoid( np.dot(xt, Wo) + np.dot(ht_1, Uo) + bo)
# ht :hidden layer in the t-term
ht = ot * np.tanh(Ct)
ht_list.append(ht)
ht_1 = ht
Ct_1 = Ct
my_y_hat = np.dot(ht, W_out) + b_out
Looking at the Keras documentation, I couldn't really tell the difference between the arguments activation
and `recurrent_activation```, but looking at the code inside, I saw the memory cell related (Ct, Ct_tilde) activity. The activation function was ```activation```, and the activation function for each gate (it, ft, ot) was
recurrent_activation```. In other words, in my feedforward, the part using tanh is ```activation```, and the part using sigmoid is ``
recurrent_activation```.
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(y_test,label="true", color="blue")
ax.plot(y_hat_keras, label="keras y_hat", color="red")
ax.plot(my_y_hat, label="my y_hat", linestyle="dashed", color="green")
ax.legend(loc="upper right")
ax.set_ylabel("y")
ax.set_xlabel("x")
fig.savefig("./predict1.png ")
Isn't it a good feeling?
import matplotlib.cm as cm
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(y_test, color="red")
for i in np.arange(len(ht_list)):
y_hat_iter = np.dot(ht_list[i], W_out) + b_out
ax.plot(y_hat_iter, color=cm.cool(i/len(ht_list)))
ax.set_ylabel("y")
ax.set_xlabel("x")
fig.savefig("./predict2.png ")
plt.close("all")
As t increases, the light blue becomes purple. You can see that as t increases, it approaches the true value.
I would like to try even non-stationary signals. Keras is easy to read because the original code is also python. Moreover, reading the original code is quite a learning experience.
Recommended Posts