I revisited Keras' stateful LSTM (RNN) hidden state. Please note that it is not accurate because it collects information on the net and implements it in its own way.
Also, please refer to the article I wrote earlier about the difference between stateless LSTM and stateful LSTM. ・ Differences between Keras stateless LSTM and stateful LSTM
The original purpose is [R2D2](https://qiita.com/pocokhc/items/3b64d747a2f36da559c3#%E3%82%B9%E3%83%86%E3%83%BC%E3%83%88%E3%83 % AC% E3% 82% B9lstm) implementation of Burn-in, which is like a trial-and-error trajectory towards it. What I want to do is save and restore hidden_states, so I checked the changes in hidden_states when I ran model.predict.
The dataset is not important, so use the following dataset as it is. Reference: Keras: Ex-Tutorials: Understanding Stateful LSTM Recurrent Neural Networks
The image is as follows.
Number of data to use: 24 batch_size : 6
Model
The model used this time is as follows.
model
c = input_ = Input(batch_shape=(batch_size,) + shape) #(batch_size,data)
c = LSTM(lstm_units, stateful=True, name="lstm")(c)
c = Dense(y_data.shape[1], activation="softmax")(c)
model = Model(input_, c)
We have named the LSTM layer "lstm" to make it easier to retrieve later. In addition, there are the following restrictions when using stateful LSTM.
Reference: How to use stateful RNN?
Getting hidden_states is probably not implemented in keras so I'm getting it directly.
from keras import backend as K
def get_hidden_states(model):
lstm = model.get_layer("lstm")
hidden_states = [K.get_value(lstm.states[0]), K.get_value(lstm.states[1])]
return hidden_states
def set_hidden_states(model, hidden_states):
model.get_layer("lstm").reset_states(hidden_states)
hidden_states has the following data structure. The shape is (2, batch_size, number of lstm_units).
Learning itself has no meaning, so it is appropriate.
model.fit(x_data, y_data, epochs=2, batch_size=batch_size, verbose=0)
In the case of stateful, the batch size cannot be changed, so test data is also required for the batch size. Increase the same data (x_data [0]) by the batch size.
# create test data
x_test = np.asarray([x_data[0] for _ in range(batch_size)])
The image of predict is below.
What I want to know is if the results change between batches when I change the hidden_state. So the output only outputs the 0th value (probability of A) for each batch.
def print_result(result):
for i, r in enumerate(result):
print("{}: {}".format(i, r[0]))
Since it does not reset, the hidden_states used in learning are used as they are. Since each hidden_state is disjointed, the prediction is that all results in the batch will change.
test1_hs stores hidden_states at this point. (Because I will use it after this)
print("--- (1) no reset")
test1_hs = get_hidden_states(model)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (1) no reset
0: 0.03929901123046875
1: 0.03843347728252411
2: 0.03823704645037651
3: 0.03934086859226227
4: 0.03969535231590271
5: 0.03939886391162872
As expected, they are disjointed.
Same as Case1, but without resetting again. It should be different from Case1.
print("--- (2) no reset 2")
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (2) no reset 2
0: 0.038682691752910614
1: 0.03798734396696091
2: 0.03784516826272011
3: 0.03870406746864319
4: 0.038950104266405106
5: 0.03872624412178993
Restore the hidden_states saved in Case1. It should be the same value as Case1.
print("--- (3) restore hidden_state(1)")
set_hidden_states(model, test1_hs)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (3) restore hidden_state(1)
0: 0.03929901123046875
1: 0.03843347728252411
2: 0.03823704645037651
3: 0.03934086859226227
4: 0.03969535231590271
5: 0.03939886391162872
Initialize hidden_states with 0. They should all have the same value.
print("--- (4) reset_states")
model.reset_states()
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (4) reset_states
0: 0.03676648437976837
1: 0.03676648437976837
2: 0.03676648437976837
3: 0.03676648437976837
4: 0.03676648437976837
5: 0.03676648437976837
Initialize with hidden_states in the 0th batch of Case1. You should get the same value as the 0th in Case1.
hidden_states is converting a bit forcibly ...
# case5
# hidden_states(1)Hidden_states[0]Unify with.
print("--- (5) all same hidden_states")
states0 = []
states1 = []
for i in range(len(test1_hs[0])):
states0.append(test1_hs[0][0])
states1.append(test1_hs[1][0])
hidden_states = [np.asarray(states0),np.asarray(states1)]
set_hidden_states(model, hidden_states)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (5) all same hidden_states
0: 0.03929901123046875
1: 0.03929901123046875
2: 0.03929901123046875
3: 0.03929901123046875
4: 0.03929901123046875
5: 0.03929901123046875
The result was as expected. I wonder if batch processing can now be performed even with stateful LSTMs ...
from keras.models import Model
from keras.layers import *
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.utils import np_utils
import keras
from keras import backend as K
import numpy as np
import random
import os
import tensorflow as tf
# copy from https://qiita.com/okotaku/items/8d682a11d8f2370684c9
def seed_everything(seed):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
session_conf = tf.compat.v1.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1
)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
tf.compat.v1.keras.backend.set_session(sess)
seed_everything(42)
# define
seq_length = 3
batch_size = 6
lstm_units = 16
shape=(3,1)
# reference: http://torch.classcat.com/2018/06/26/keras-ex-tutorials-stateful-lstm/
#Define the raw dataset
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZA"
alphabet_int = [ i for i in range(len(alphabet))]
#Letter numbers(0-25)Create a mapping to and vice versa.
char_to_int = dict((c, i) for i, c in enumerate(alphabet))
int_to_char = dict((i, c) for i, c in enumerate(alphabet))
def int_to_char_seq(seq):
seq = seq.reshape(seq_length)
s = ""
for c in seq:
c = int(c * float(len(alphabet)))
s += int_to_char[c]
return s
# https://keras.io/ja/preprocessing/sequence/
data = TimeseriesGenerator(alphabet_int, alphabet_int, length=seq_length)[0]
x_data = data[0]
y_data = data[1]
# normalize
x_data = x_data / float(len(alphabet))
x_data = np.reshape(x_data, (len(x_data),) + shape ) #(batch_size,len,data)
# one hot encode the output variable
y_data = np_utils.to_categorical(y_data)
# create model
c = input_ = Input(batch_shape=(batch_size,) + shape) #(batch_size,data)
c = LSTM(lstm_units, stateful=True, name="lstm")(c)
c = Dense(y_data.shape[1], activation="softmax")(c)
model = Model(input_, c)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
# train
model.fit(x_data, y_data, epochs=2, batch_size=batch_size, verbose=0)
def get_hidden_states(model):
lstm = model.get_layer("lstm")
hidden_states = [K.get_value(lstm.states[0]), K.get_value(lstm.states[1])]
return hidden_states
def set_hidden_states(model, hidden_states):
model.get_layer("lstm").reset_states(hidden_states)
def print_result(result):
# result_shape: (batch_size, y_data.shape[1])
#Since the amount is large, only the 0th data is displayed for reference.
for i, r in enumerate(result):
print("{}: {}".format(i, r[0]))
# create test data
# x_data[0]Increase by batch size
x_test = np.asarray([x_data[0] for _ in range(batch_size)])
# case1
print("--- (1) no reset")
test1_hs = get_hidden_states(model)
print_result( model.predict(x_test, batch_size=batch_size) )
# case2
print("--- (2) no reset 2")
print_result( model.predict(x_test, batch_size=batch_size) )
# case3
print("--- (3) restore hidden_state(1)")
set_hidden_states(model, test1_hs)
print_result( model.predict(x_test, batch_size=batch_size) )
# case4
print("--- (4) reset_states")
model.reset_states()
print_result( model.predict(x_test, batch_size=batch_size) )
# case5
# hidden_states(1)Hidden_states[0]Unify with.
print("--- (5) all same hidden_states")
states0 = []
states1 = []
for i in range(len(test1_hs[0])):
states0.append(test1_hs[0][0])
states1.append(test1_hs[1][0])
hidden_states = [np.asarray(states0),np.asarray(states1)]
set_hidden_states(model, hidden_states)
print_result( model.predict(x_test, batch_size=batch_size) )
Recommended Posts