J'ai revisité l'état caché LSTM (RNN) avec état de Keras. Veuillez noter qu'il n'est pas exact car il collecte des informations sur le net et les met en œuvre à sa manière.
Veuillez également vous référer à l'article que j'ai écrit plus tôt concernant la différence entre le LSTM sans état et le LSTM avec état. ・ Différences entre le LSTM sans état et le LSTM avec état de Keras
L'objectif initial est [R2D2](https://qiita.com/pocokhc/items/3b64d747a2f36da559c3#%E3%82%B9%E3%83%86%E3%83%BC%E3%83%88%E3%83 % AC% E3% 82% B9lstm) implémentation de Burn-in, qui est comme une trajectoire d'essais et d'erreurs vers lui. Ce que je veux faire, c'est enregistrer et restaurer hidden_states, j'ai donc vérifié les modifications dans hidden_states lorsque j'ai exécuté model.predict.
L'ensemble de données n'est pas important, utilisez donc l'ensemble de données suivant tel quel. Référence: Keras: Ex-Tutorials: Understanding Stateful LSTM Recurrent Neural Net
L'image est la suivante.
Nombre de données à utiliser: 24 batch_size : 6
Model
Le modèle utilisé cette fois est le suivant.
model
c = input_ = Input(batch_shape=(batch_size,) + shape) #(batch_size,data)
c = LSTM(lstm_units, stateful=True, name="lstm")(c)
c = Dense(y_data.shape[1], activation="softmax")(c)
model = Model(input_, c)
Nous avons nommé la couche LSTM "lstm" pour faciliter la récupération ultérieure. De plus, il existe les restrictions suivantes lors de l'utilisation de Stateful LSTM.
Référence: Comment utiliser RNN avec état?
Obtenir hidden_states n'est probablement pas implémenté dans keras donc nous l'obtenons directement.
from keras import backend as K
def get_hidden_states(model):
lstm = model.get_layer("lstm")
hidden_states = [K.get_value(lstm.states[0]), K.get_value(lstm.states[1])]
return hidden_states
def set_hidden_states(model, hidden_states):
model.get_layer("lstm").reset_states(hidden_states)
hidden_states a la structure de données suivante. La forme est (2, batch_size, lstm_unit number).
L'apprentissage en lui-même n'a aucun sens, il est donc approprié.
model.fit(x_data, y_data, epochs=2, batch_size=batch_size, verbose=0)
Dans le cas de l'état avec état, la taille du lot ne peut pas être modifiée, les données de test sont donc également requises pour la taille du lot. Augmentez les mêmes données (x_data [0]) de la taille du lot.
# create test data
x_test = np.asarray([x_data[0] for _ in range(batch_size)])
L'image de prédire est ci-dessous.
Ce que je veux savoir, c'est si le résultat change entre les lots lorsque je change l'état hidden_state. Ainsi, la sortie ne produit que la valeur 0 (probabilité de A) pour chaque lot.
def print_result(result):
for i, r in enumerate(result):
print("{}: {}".format(i, r[0]))
Puisqu'il ne se réinitialise pas, les hidden_states utilisés dans l'apprentissage sont utilisés tels quels. Étant donné que chaque état_caché est disjoint, la prédiction est que tous les résultats du lot changeront.
test1_hs stocke hidden_states à ce stade. (Parce que je vais l'utiliser après ça)
print("--- (1) no reset")
test1_hs = get_hidden_states(model)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (1) no reset
0: 0.03929901123046875
1: 0.03843347728252411
2: 0.03823704645037651
3: 0.03934086859226227
4: 0.03969535231590271
5: 0.03939886391162872
Comme prévu, ils sont décousus.
Identique à Case1, mais sans réinitialisation. Il doit être différent de Case1.
print("--- (2) no reset 2")
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (2) no reset 2
0: 0.038682691752910614
1: 0.03798734396696091
2: 0.03784516826272011
3: 0.03870406746864319
4: 0.038950104266405106
5: 0.03872624412178993
Restaurez les états cachés enregistrés dans Case1. Il doit avoir la même valeur que Case1.
print("--- (3) restore hidden_state(1)")
set_hidden_states(model, test1_hs)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (3) restore hidden_state(1)
0: 0.03929901123046875
1: 0.03843347728252411
2: 0.03823704645037651
3: 0.03934086859226227
4: 0.03969535231590271
5: 0.03939886391162872
Initialisez hidden_states avec 0. Ils devraient tous avoir la même valeur.
print("--- (4) reset_states")
model.reset_states()
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (4) reset_states
0: 0.03676648437976837
1: 0.03676648437976837
2: 0.03676648437976837
3: 0.03676648437976837
4: 0.03676648437976837
5: 0.03676648437976837
Initialisez avec hidden_states dans le 0ème lot de Case1. Vous devriez obtenir la même valeur que le 0e dans Case1.
hidden_states convertit un peu de force ...
# case5
# hidden_États(1)Caché_États[0]Unifiez avec.
print("--- (5) all same hidden_states")
states0 = []
states1 = []
for i in range(len(test1_hs[0])):
states0.append(test1_hs[0][0])
states1.append(test1_hs[1][0])
hidden_states = [np.asarray(states0),np.asarray(states1)]
set_hidden_states(model, hidden_states)
print_result( model.predict(x_test, batch_size=batch_size) )
result
--- (5) all same hidden_states
0: 0.03929901123046875
1: 0.03929901123046875
2: 0.03929901123046875
3: 0.03929901123046875
4: 0.03929901123046875
5: 0.03929901123046875
Le résultat était comme prévu. Je me demande s'il est maintenant possible d'effectuer un traitement par lots même avec le LSTM avec état.
from keras.models import Model
from keras.layers import *
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.utils import np_utils
import keras
from keras import backend as K
import numpy as np
import random
import os
import tensorflow as tf
# copy from https://qiita.com/okotaku/items/8d682a11d8f2370684c9
def seed_everything(seed):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
session_conf = tf.compat.v1.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1
)
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
tf.compat.v1.keras.backend.set_session(sess)
seed_everything(42)
# define
seq_length = 3
batch_size = 6
lstm_units = 16
shape=(3,1)
# reference: http://torch.classcat.com/2018/06/26/keras-ex-tutorials-stateful-lstm/
#Définir le jeu de données brutes
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZA"
alphabet_int = [ i for i in range(len(alphabet))]
#Nombre de lettres(0-25)Créez un mappage vers et vice versa.
char_to_int = dict((c, i) for i, c in enumerate(alphabet))
int_to_char = dict((i, c) for i, c in enumerate(alphabet))
def int_to_char_seq(seq):
seq = seq.reshape(seq_length)
s = ""
for c in seq:
c = int(c * float(len(alphabet)))
s += int_to_char[c]
return s
# https://keras.io/ja/preprocessing/sequence/
data = TimeseriesGenerator(alphabet_int, alphabet_int, length=seq_length)[0]
x_data = data[0]
y_data = data[1]
# normalize
x_data = x_data / float(len(alphabet))
x_data = np.reshape(x_data, (len(x_data),) + shape ) #(batch_size,len,data)
# one hot encode the output variable
y_data = np_utils.to_categorical(y_data)
# create model
c = input_ = Input(batch_shape=(batch_size,) + shape) #(batch_size,data)
c = LSTM(lstm_units, stateful=True, name="lstm")(c)
c = Dense(y_data.shape[1], activation="softmax")(c)
model = Model(input_, c)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
# train
model.fit(x_data, y_data, epochs=2, batch_size=batch_size, verbose=0)
def get_hidden_states(model):
lstm = model.get_layer("lstm")
hidden_states = [K.get_value(lstm.states[0]), K.get_value(lstm.states[1])]
return hidden_states
def set_hidden_states(model, hidden_states):
model.get_layer("lstm").reset_states(hidden_states)
def print_result(result):
# result_shape: (batch_size, y_data.shape[1])
#Étant donné que le montant est important, seules les 0èmes données sont affichées à titre de référence.
for i, r in enumerate(result):
print("{}: {}".format(i, r[0]))
# create test data
# x_data[0]Augmentation de la taille du lot
x_test = np.asarray([x_data[0] for _ in range(batch_size)])
# case1
print("--- (1) no reset")
test1_hs = get_hidden_states(model)
print_result( model.predict(x_test, batch_size=batch_size) )
# case2
print("--- (2) no reset 2")
print_result( model.predict(x_test, batch_size=batch_size) )
# case3
print("--- (3) restore hidden_state(1)")
set_hidden_states(model, test1_hs)
print_result( model.predict(x_test, batch_size=batch_size) )
# case4
print("--- (4) reset_states")
model.reset_states()
print_result( model.predict(x_test, batch_size=batch_size) )
# case5
# hidden_États(1)Caché_États[0]Unifiez avec.
print("--- (5) all same hidden_states")
states0 = []
states1 = []
for i in range(len(test1_hs[0])):
states0.append(test1_hs[0][0])
states1.append(test1_hs[1][0])
hidden_states = [np.asarray(states0),np.asarray(states1)]
set_hidden_states(model, hidden_states)
print_result( model.predict(x_test, batch_size=batch_size) )
Recommended Posts