Cet article présente un modèle d'apprentissage qui prédit le langage généré par RG (Reber Grammar), ERG (Embeded-) et CERG (Continuous-).
En tant que tutoriel sur la mise en œuvre de TensorFlow de LSTM, il y a un apprentissage à l'aide d'un jeu de données de langage appelé PTB (Penn Treebank), mais il est difficile à comprendre comme introduction. Par conséquent, en guise d'introduction, nous allons apprendre un modèle qui prédit le langage généré à partir des algorithmes (RG, ERG, CERG) qui génèrent automatiquement un langage composé de huit caractères.
https://www.willamette.edu/~gorr/classes/cs449/reber.html Le site de référence de est facile à comprendre. Par exemple, "BTSSXSE.BPVVR." Est l'un des RG et "BPBTSXXVVEPE." Est l'un des ERG. CERG est une langue dans laquelle le caractère terminal "." Est supprimé de ERG. Cet article prédit ces langues à huit caractères.
Le code source de github est répertorié ci-dessous. Nous avons confirmé l'opération avec Python3 et l'API TensorFlow r1.1. TensorFlow change considérablement pour chaque version, donc si la version est différente, vous ne devriez pas penser qu'il fonctionnera tel quel.
RG_prediction_model.py
#! /usr/local/bin/python
# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np
import random
from create_RG import ERG_generator
num_of_sample_length = 10000
class RG_predict_model:
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
def evaluate(self, output, label):
with tf.name_scope("evaluate") as scope:
prediction = tf.nn.softmax(output)
correct_prediction = tf.equal(tf.argmax(output,1),tf.argmax(label,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar("accuracy", accuracy)
return prediction, accuracy
def loss(self, output, label):
with tf.name_scope("loss") as scope:
loss = tf.reduce_mean(tf.losses.softmax_cross_entropy(label, output))
tf.summary.scalar("loss", loss)
return loss
def training(self, loss):
with tf.name_scope("training") as scope:
optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(loss)
return optimizer
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
with tf.Session() as sess:
summary_writer = tf.summary.FileWriter("./tmp/RG_log", graph=sess.graph)
sess.run(tf.global_variables_initializer())
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
####### save ########
print("Training has done successfully")
saver = tf.train.Saver()
saver.save(sess, self.model_file_name)
if __name__ == '__main__':
random.seed(0)
np.random.seed(0)
tf.set_random_seed(0)
rg_model = RG_predict_model(ERG_generator)
rg_model.train()
Ensuite, les détails seront expliqués dans l'ordre.
python
def __init__(self, data_model):
self.num_of_hidden_nodes = 60
self.chunk_size = 20
self.model_file_name = "./tmp/model.ckpt"
self.batch_size = 100
self.forget_bias = 0.8
self.learning_rate = 0.001
self.num_of_epochs = 50000
try:
#train data set
self.rggen = data_model()
self.rggen.generate(num_of_sample_length)
self.num_of_output_nodes = self.rggen.CHAR_VEC
self.num_of_input_nodes = self.rggen.CHAR_VEC
#test data set
self.test_rggen = data_model()
self.test_rggen.generate(num_of_sample_length)
except:
print("could not specify generator model")
raise
Puisque le vecteur d'entrée est représenté par un vecteur chaud, il est représenté par un vecteur à 8 dimensions (par exemple B = (1,0,0,0,0,0,0,0)), mais il est entièrement avant l'entrée dans la cellule LSTM. Insérez une couche connectée et augmentez la quantité d'entités à num_of_hidden_nodes = vecteur de 60 dimensions. LSTM nécessite un paramètre qui détermine le nombre d'entrées précédentes affectant la sortie, et ceci est spécifié par chunk_size. Cette fois, 20 caractères consécutifs sont saisis pour une prédiction. L'un des EG_model, ERG_model et CERG_model est entré dans l'argument data_model.
python
def inference(self, input_ph, istate_ph):
with tf.name_scope("inference") as scope:
weight1_var = tf.Variable(tf.truncated_normal(
[self.num_of_input_nodes, self.num_of_hidden_nodes], stddev=0.1), name="weight1")
weight2_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes, self.num_of_output_nodes], stddev=0.1), name="weight2")
bias1_var = tf.Variable(tf.truncated_normal(
[self.num_of_hidden_nodes], stddev=0.1), name="bias1")
bias2_var = tf.Variable(tf.truncated_normal(
[self.num_of_output_nodes], stddev=0.1), name="bias2")
in1 = tf.transpose(input_ph, [1, 0, 2]) #(chunk_size, batch_size, CHAR_VEC_DIM)
in2 = tf.reshape(in1, [-1, self.num_of_input_nodes]) #(chunk_size * batch_size, CHAR_VEC_DIM)
in3 = tf.matmul(in2, weight1_var) + bias1_var #(chunk_size * batch_size, num_of_hidden_nodes)
in4 = tf.split(in3, self.chunk_size, axis=0) #chunk_size * (batch_size, num_of_hidden_nodes)
cell = tf.contrib.rnn.BasicLSTMCell(
self.num_of_hidden_nodes, forget_bias=self.forget_bias, state_is_tuple=False)
outputs, states = tf.contrib.rnn.static_rnn(cell, in4, initial_state=istate_ph)
output = tf.matmul(outputs[-1], weight2_var) + bias2_var
return output
Les couches sont assemblées dans le flux d'entrée (8D) -> couche entièrement connectée-> (60D) -> LSTM-> (60D) -> couche entièrement connectée-> sortie (8D). in1 ~ in4 ne sont convertis que pour faciliter le calcul Wx + b. tf.contrib.rnn.static_rnn obtient la taille du tableau du deuxième argument sans autorisation, crée une cellule (premier argument) et la combine. Chaque cellule reçoit une entrée de [quantité de fonction interne = 60] x [taille de lot = 100]. Inférence globale: À l'intérieur de rnn (création de cellules pour chuk_size):
évaluer, perdre, s'entraîner sont omis.
python
def train(self):
input_ph = tf.placeholder(tf.float32, [None, self.chunk_size, self.num_of_input_nodes], name="input")
label_ph = tf.placeholder(tf.float32, [None, self.num_of_input_nodes], name="label")
istate_ph = tf.placeholder(tf.float32, [None, self.num_of_hidden_nodes * 2], name="istate")
prediction = self.inference(input_ph, istate_ph)
loss = self.loss(prediction, label_ph)
optimizer = self.training(loss)
evaluater = self.evaluate(prediction, label_ph)
summary = tf.summary.merge_all()
Définissez une instance pour l'entrée et l'étiquette de réponse correcte, une instance des informations d'état à entrer au début de LSTM et une instance de chaque sortie. summary est la sortie du journal des résultats pour faciliter le débogage avec tensorboard, etc.
python
####### train ########
for epoch in range(self.num_of_epochs):
inputs, labels = self.rggen.get_batch(self.batch_size, self.chunk_size)
train_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
sess.run([optimizer], feed_dict=train_dict)
if (epoch) % 100 ==0:
summary_str, train_loss, (prediction, acc) = sess.run([summary, loss, evaluater], feed_dict=train_dict)
print("train#%d, loss: %e, accuracy: %e" % (epoch, train_loss, acc))
summary_writer.add_summary(summary_str, epoch)
Collectez les entrées dans le dictionnaire et commencez à apprendre. Ajoutez le résultat au résumé. rggen.get_batch apporte les données d'entrée spécifiées. Pour plus de détails, consultez create_RG.py dans github. Voyons voir.
python
####### test #########
inputs, labels = self.test_rggen.get_batch(self.batch_size, self.chunk_size)
test_dict = {
input_ph: inputs,
label_ph: labels,
istate_ph: np.zeros((self.batch_size, self.num_of_hidden_nodes * 2)),
}
prediction, acc = sess.run(evaluater, feed_dict=test_dict)
for pred, label in zip(prediction, labels):
print(np.argmax(pred) == np.argmax(label))
print(['{:.2f}'.format(n) for n in pred])
print(['{:.2f}'.format(n) for n in label])
De même, créez un dictionnaire d'entrée pour le test et affichez la sortie. {: .2f} est une notation pour la sortie jusqu'à la deuxième décimale. # Le libellé de réponse correct et le libellé de sortie ne sont émis que de manière à être affichés verticalement.
Site de référence:
--À propos d'ERG - https://www.willamette.edu/~gorr/classes/cs449/reber.html --À propos de l'implémentation TensorFlow de LSTM - http://qiita.com/yukiB/items/f6314d2861fc8d9b739f - http://www.madopro.net/entry/RNNLSTMLanguageModel
Recommended Posts