I'm reading a masterpiece, ** "Deep Learning from Zero 2" **. This time is a memo of Chapter 7. To execute the code, download the entire code from Github and use jupyter notebook in ch07.
First, let's execute the code rnnlm_gen.py
that reads the weight file ** Rnnlm.pkl ** learned in train_rnnlm.py
in Chapter 6 and generates sentences.
import sys
sys.path.append('..')
from rnnlm_gen import RnnlmGen
from dataset import ptb
#Read the PTB dataset
corpus, word_to_id, id_to_word = ptb.load_data('train')
vocab_size = len(word_to_id)
corpus_size = len(corpus)
model = RnnlmGen() #Model generation
model.load_params('../ch06/Rnnlm.pkl') #Load learned weights
#Set start and skip characters
start_word = 'you'
start_id = word_to_id[start_word]
skip_words = ['N', '<unk>', '$']
skip_ids = [word_to_id[w] for w in skip_words]
#Sentence generation
word_ids = model.generate(start_id, skip_ids)
txt = ' '.join([id_to_word[i] for i in word_ids])
txt = txt.replace(' <eos>', '.\n')
print(txt)
Somehow, such an English sentence was generated. The theory is simple: after deciding the first word, predict the next word, and then predict the next word based on the prediction result, and so on. The point is RnnlmGen ()
that appears in the model generation, so let's take a look at it.
3.class RnnlmGen
It is possible to create a sentence generation class from scratch, but it is easier to add functionality to the class Rnnlm
used during the learning in Chapter 6.
By declaring class RnnlmGen (Rnnlm):
at the beginning of the code as shown below, all the methods that were in class Rnnlm
will be built in to class RnnlmGen
. This is called ** "inheritance" **.
class RnnlmGen(Rnnlm):
def generate(self, start_id, skip_ids=None, sample_size=100):
word_ids = [start_id]
x = start_id #Specify the word id for sentence generation
# word_ids is sample_Continue until size is reached
while len(word_ids) < sample_size:
x = np.array(x).reshape(1, 1) #In a two-dimensional array(Mini batch support)
score = self.predict(x) #Get forecast results
p = softmax(score.flatten()) #Normalize the probability distribution with softmax
#10000 in length, 1 in size, random choice according to the probability distribution of p
sampled = np.random.choice(len(p), size=1, p=p)
# skip_Missing ids or sampled word skip_Not in ids
if (skip_ids is None) or (sampled not in skip_ids):
x = sampled
word_ids.append(int(x)) # word_Append to ids
return word_ids
def get_state(self):
return self.lstm_layer.h, self.lstm_layer.c
def set_state(self, state):
self.lstm_layer.set_state(*state)
Here are the methods to add to class Rnnlm
. If you use predict (x)
to predict the appearance of the next word after x and normalize it with softmax
, you will get a probability distribution p for the number of vocabularies.
sampled = np.random.choice (len (p), size = 1, p = p)
is one from ** 0 to an integer ** with vocabulary number -1, which means that random sampling is performed according to the probability distribution p. Will be.
By the way, if you look at class Rnnlm
,
class Rnnlm(BaseModel):
def __init__(self, vocab_size=10000, wordvec_size=100, hidden_size=100):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
#Weight initialization
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4 * H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
#Layer generation
self.layers = [
TimeEmbedding(embed_W),
TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True),
TimeAffine(affine_W, affine_b)
]
self.loss_layer = TimeSoftmaxWithLoss()
self.lstm_layer = self.layers[1]
#List all weights and gradients
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads
def predict(self, xs):
for layer in self.layers:
xs = layer.forward(xs)
return xs
def forward(self, xs, ts):
score = self.predict(xs)
loss = self.loss_layer.forward(score, ts)
return loss
def backward(self, dout=1):
dout = self.loss_layer.backward(dout)
for layer in reversed(self.layers):
dout = layer.backward(dout)
return dout
def reset_state(self):
self.lstm_layer.reset_state()
This is the content. This time we "inherit" this class, so these methods are automatically built into class RnnlmGen
. This is convenient, isn't it?
I can't really feel that English sentence generation is working well, so I'll try it in Japanese as well. However, the prediction method is ** character unit ** instead of word unit.
This time, from Aozora Bunko, Natsume Soseki's "I am a cat" ** text file (with ruby) ** Download and use. After downloading, unzip it and save it on ch07 with the name ** wagahaiwa_nekodearu.txt **.
import sys
import re
path = './wagahaiwa_nekodearu.txt'
bindata = open(path, "rb")
lines = bindata.readlines()
for line in lines:
text = line.decode('Shift_JIS') # Shift_Read by JIS
text = re.split(r'\r',text)[0] #Delete line breaks
text = text.replace('|','') #Ruby Deleted
text = re.sub(r'《.+?》','',text) #Ruby removal
text = re.sub(r'[#.+?]','',text) #Input person note deleted
print(text)
file = open('data_neko.txt','a',encoding='utf-8').write(text) # UTF-Convert to 8
This is the code for preprocessing. When executed, it will be read in the text file format (Shift-JIS), the line breaks, ruby, inputter note, etc. will be deleted, then converted to UTF-8 and saved as ** data_neko.txt **. Then use an editor to manually remove the extra parts before and after the sentence.
Next, define a function load_data ()
that gets corpus, word_to_id, id_to_word from data_neko.txt.
import numpy as np
import io
def load_data():
# file_name to UTF-Read into text in 8 format
file_name = './data_neko.txt'
with io.open(file_name, encoding='utf-8') as f:
text = f.read().lower()
# word_to_id, id_to_word creation
word_to_id, id_to_word = {}, {}
for word in text:
if word not in word_to_id:
new_id = len(word_to_id)
word_to_id[word] = new_id
id_to_word[new_id] = word
#Creating corpus
corpus = np.array([word_to_id[W] for W in text])
corpus_test = corpus[300000:] #test data
corpus = corpus[:300000] #Training data
return corpus_test, corpus, word_to_id, id_to_word
Since the total ** corpus ** created this time is 318,800 words, 18,800 words after 300,000 words are designated as ** corpus_test **, and the previous 300,000 words are designated as ** corpus **.
Now, let's execute the learning code used in ch06 using the GPU.
import sys
sys.path.append('..')
from common import config
#When executing on GPU, delete the comment out below (cupy required)
# ==============================================
config.GPU = True
# ==============================================
from common.optimizer import SGD
from common.trainer import RnnlmTrainer
from common.util import eval_perplexity, to_gpu
from dataset import ptb
from ch06.better_rnnlm import BetterRnnlm
#Hyperparameter settings
batch_size = 20
wordvec_size = 650
hidden_size = 650
time_size = 35
lr = 20.0
max_epoch = 40
max_grad = 0.25
dropout = 0.5
#Reading training data
corpus_test, corpus, word_to_id, id_to_word = load_data()
corpus_val = corpus_test #For simplicity, val and test are the same
if config.GPU:
corpus = to_gpu(corpus)
corpus_val = to_gpu(corpus_val)
corpus_test = to_gpu(corpus_test)
vocab_size = len(word_to_id)
xs = corpus[:-1]
ts = corpus[1:]
model = BetterRnnlm(vocab_size, wordvec_size, hidden_size, dropout)
optimizer = SGD(lr)
trainer = RnnlmTrainer(model, optimizer)
best_ppl = float('inf')
for epoch in range(max_epoch):
trainer.fit(xs, ts, max_epoch=1, batch_size=batch_size,
time_size=time_size, max_grad=max_grad)
model.reset_state()
ppl = eval_perplexity(model, corpus_val)
print('valid perplexity: ', ppl)
if best_ppl > ppl:
best_ppl = ppl
model.save_params()
else:
lr /= 4.0
optimizer.lr = lr
model.reset_state()
print('-' * 50)
#Evaluation with test data
model.reset_state()
ppl_test = eval_perplexity(model, corpus_test)
print('test perplexity: ', ppl_test)
Learning was completed in about 40 minutes using a windows machine (GTX1060). When complete, the trained weight parameters will be saved in the ch07 folder as ** BetterRnnlm.pkl **.
6.class BetterRnnlmGen
Next, define class BetterRnnlmGen
. Basically, it inherits the class BetterRnnlm
of Chapter 6, but the ** vocal_size ** of " I am a cat " is different from ** PTB **, so the relateddef __init___ Add and overwrite the ()
part (this is called ** override **).
import sys
sys.path.append('..')
import numpy as np
from common.functions import softmax
from ch06.rnnlm import Rnnlm
from ch06.better_rnnlm import BetterRnnlm
from common.time_layers import * # def __init__Import the required files with
class BetterRnnlmGen(BetterRnnlm):
def __init__(self, vocab_size=3038, wordvec_size=650,
hidden_size=650, dropout_ratio=0.5):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx1 = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
lstm_Wh1 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b1 = np.zeros(4 * H).astype('f')
lstm_Wx2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_Wh2 = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b2 = np.zeros(4 * H).astype('f')
affine_b = np.zeros(V).astype('f')
self.layers = [
TimeEmbedding(embed_W),
TimeDropout(dropout_ratio),
TimeLSTM(lstm_Wx1, lstm_Wh1, lstm_b1, stateful=True),
TimeDropout(dropout_ratio),
TimeLSTM(lstm_Wx2, lstm_Wh2, lstm_b2, stateful=True),
TimeDropout(dropout_ratio),
TimeAffine(embed_W.T, affine_b) # weight tying!!
]
self.loss_layer = TimeSoftmaxWithLoss()
self.lstm_layers = [self.layers[2], self.layers[4]]
self.drop_layers = [self.layers[1], self.layers[3], self.layers[5]]
self.params, self.grads = [], []
for layer in self.layers:
self.params += layer.params
self.grads += layer.grads
def generate(self, start_id, skip_ids=None, sample_size=100):
word_ids = [start_id]
x = start_id
while len(word_ids) < sample_size:
x = np.array(x).reshape(1, 1)
score = self.predict(x).flatten()
p = softmax(score).flatten()
sampled = np.random.choice(len(p), size=1, p=p)
#sampled = np.argmax(p)
if (skip_ids is None) or (sampled not in skip_ids):
x = sampled
word_ids.append(int(x))
return word_ids
def get_state(self):
states = []
for layer in self.lstm_layers:
states.append((layer.h, layer.c))
return states
def set_state(self, states):
for layer, state in zip(self.lstm_layers, states):
layer.set_state(*state)
Finally, execute the following code that generates sentences.
import sys
sys.path.append('..')
from common.np import *
corpus_test, corpus, word_to_id, id_to_word = load_data()
vocab_size = len(word_to_id)
corpus_size = len(corpus)
model = BetterRnnlmGen()
model.load_params('./BetterRnnlm.pkl')
#Set start and skip characters
start_word = 'I'
start_id = word_to_id[start_word]
skip_words = ['〇']
skip_ids = [word_to_id[w] for w in skip_words]
#Sentence generation(From the first word)
word_ids = model.generate(start_id, skip_ids)
txt = ''.join([id_to_word[i] for i in word_ids])
print(txt)
#Sentence generation(From the phrase)
model.reset_state() #Reset model
start_words = 'I'm a cat.'
start_ids = [word_to_id[w] for w in start_words.split(' ')] #Convert to word id
#Predict before the last word id of the phrase(Do not use prediction results)
for x in start_ids[:-1]:
x = np.array(x).reshape(1, 1)
model.predict(x)
word_ids = model.generate(start_ids[-1], skip_ids) #Predict from the last word id of the phrase
word_ids = start_ids[:-1] + word_ids #Concatenate phrases and prediction results
txt = ''.join([id_to_word[i] for i in word_ids]) #Convert to sentence
print('-' * 50)
print(txt)
There are two patterns of sentence generation. One is to predict after one word, and the other is to predict after from one phrase.
I don't understand the meaning at all as a whole, but when I look at the phrase, I think it's something I don't understand.
import sys
sys.path.append('..')
import numpy as np
import matplotlib.pyplot as plt
from dataset import sequence
from common.optimizer import Adam
from common.trainer import Trainer
from common.util import eval_seq2seq
from seq2seq import Seq2seq
from peeky_seq2seq import PeekySeq2seq
#Data set loading
(x_train, t_train), (x_test, t_test) = sequence.load_data('addition.txt')
char_to_id, id_to_char = sequence.get_vocab()
# Reverse input? =================================================
is_reverse = False # True
if is_reverse:
# [::-1]Sorted in reverse order, because it is two-dimensional[:, ::-1]
x_train, x_test = x_train[:, ::-1], x_test[:, ::-1]
# ================================================================
#Hyperparameter settings
vocab_size = len(char_to_id)
wordvec_size = 16
hidden_size = 128
batch_size = 128
max_epoch = 25
max_grad = 5.0
# Normal or Peeky? ==============================================
model = Seq2seq(vocab_size, wordvec_size, hidden_size)
# model = PeekySeq2seq(vocab_size, wordvec_size, hidden_size)
# ================================================================
optimizer = Adam()
trainer = Trainer(model, optimizer)
acc_list = []
for epoch in range(max_epoch):
trainer.fit(x_train, t_train, max_epoch=1,
batch_size=batch_size, max_grad=max_grad)
correct_num = 0
for i in range(len(x_test)):
question, correct = x_test[[i]], t_test[[i]]
verbose = i < 10
correct_num += eval_seq2seq(model, question, correct,
id_to_char, verbose, is_reverse)
acc = float(correct_num) / len(x_test)
acc_list.append(acc)
print('val acc %.3f%%' % (acc * 100))
#Drawing a graph
x = np.arange(len(acc_list))
plt.plot(x, acc_list, marker='o')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.ylim(0, 1.0)
plt.show()
It's a sober code that makes you remember the addition. First, let's take a look at class Seq2seq
.
9.class Seq2seq
class Seq2seq(BaseModel):
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
self.encoder = Encoder(V, D, H)
self.decoder = Decoder(V, D, H)
self.softmax = TimeSoftmaxWithLoss()
self.params = self.encoder.params + self.decoder.params
self.grads = self.encoder.grads + self.decoder.grads
def forward(self, xs, ts):
decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:]
h = self.encoder.forward(xs)
score = self.decoder.forward(decoder_xs, h)
loss = self.softmax.forward(score, decoder_ts)
return loss
def backward(self, dout=1):
dout = self.softmax.backward(dout)
dh = self.decoder.backward(dout)
dout = self.encoder.backward(dh)
return dout
def generate(self, xs, start_id, sample_size):
h = self.encoder.forward(xs)
sampled = self.decoder.generate(h, start_id, sample_size)
return sampled
Since we are only combining the classes of class Encoder and class Decoder, let's first look at class Encoder.
class Encoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4 * H).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)
self.params = self.embed.params + self.lstm.params
self.grads = self.embed.grads + self.lstm.grads
self.hs = None
def forward(self, xs):
xs = self.embed.forward(xs)
hs = self.lstm.forward(xs)
self.hs = hs
return hs[:, -1, :]
def backward(self, dh):
dhs = np.zeros_like(self.hs)
dhs[:, -1, :] = dh
dout = self.lstm.backward(dhs)
dout = self.embed.backward(dout)
return dout
It is a schematic diagram of ** Encoder **. The training data is input in sequence, and the final LSTM output ** h ** is passed to the Decoder.
class Decoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4 * H).astype('f')
affine_W = (rn(H, V) / np.sqrt(H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
self.affine = TimeAffine(affine_W, affine_b)
self.params, self.grads = [], []
for layer in (self.embed, self.lstm, self.affine):
self.params += layer.params
self.grads += layer.grads
def forward(self, xs, h):
self.lstm.set_state(h)
out = self.embed.forward(xs)
out = self.lstm.forward(out)
score = self.affine.forward(out)
return score
def backward(self, dscore):
dout = self.affine.backward(dscore)
dout = self.lstm.backward(dout)
dout = self.embed.backward(dout)
dh = self.lstm.dh
return dh
def generate(self, h, start_id, sample_size):
sampled = []
sample_id = start_id
self.lstm.set_state(h)
for _ in range(sample_size):
x = np.array(sample_id).reshape((1, 1))
out = self.embed.forward(x)
out = self.lstm.forward(out)
score = self.affine.forward(out)
sample_id = np.argmax(score.flatten())
sampled.append(int(sample_id))
return sampled
It is a schematic diagram of ** Decoder **. Decoder handles the Softmax with Loss layer after this differently between learning and generation, so the Softmax with Loss layer is supported by the Seq2seq class.
Inverting the input data speeds up the learning process and improves the final accuracy. Execute the code of the addition model above with ʻis_reverse = True`. Just by flipping it, the correct answer rate jumped up from the 10% range to the 50% range. The closer the time lag between each input element and its output element is, the better the accuracy. I see.
10.PeekyDecoder
The vector h output from the Encoder is very important information, but it is only input at the first time of the Decoder. Therefore, the idea of inputting the information of the vector h to the LSTM layer and the Affine layer at all times is born. This method is called ** Peeky **. Let's take a look at class PeekyDecoder
.
class PeekyDecoder:
def __init__(self, vocab_size, wordvec_size, hidden_size):
V, D, H = vocab_size, wordvec_size, hidden_size
rn = np.random.randn
embed_W = (rn(V, D) / 100).astype('f')
lstm_Wx = (rn(H + D, 4 * H) / np.sqrt(H + D)).astype('f')
lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
lstm_b = np.zeros(4 * H).astype('f')
affine_W = (rn(H + H, V) / np.sqrt(H + H)).astype('f')
affine_b = np.zeros(V).astype('f')
self.embed = TimeEmbedding(embed_W)
self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
self.affine = TimeAffine(affine_W, affine_b)
self.params, self.grads = [], []
for layer in (self.embed, self.lstm, self.affine):
self.params += layer.params
self.grads += layer.grads
self.cache = None
def forward(self, xs, h):
N, T = xs.shape
N, H = h.shape
self.lstm.set_state(h)
out = self.embed.forward(xs)
hs = np.repeat(h, T, axis=0).reshape(N, T, H)
out = np.concatenate((hs, out), axis=2)
out = self.lstm.forward(out)
out = np.concatenate((hs, out), axis=2)
score = self.affine.forward(out)
self.cache = H
return score
def backward(self, dscore):
H = self.cache
dout = self.affine.backward(dscore)
dout, dhs0 = dout[:, :, H:], dout[:, :, :H]
dout = self.lstm.backward(dout)
dembed, dhs1 = dout[:, :, H:], dout[:, :, :H]
self.embed.backward(dembed)
dhs = dhs0 + dhs1
dh = self.lstm.dh + np.sum(dhs, axis=1)
return dh
def generate(self, h, start_id, sample_size):
sampled = []
char_id = start_id
self.lstm.set_state(h)
H = h.shape[1]
peeky_h = h.reshape(1, 1, H)
for _ in range(sample_size):
x = np.array([char_id]).reshape((1, 1))
out = self.embed.forward(x)
out = np.concatenate((peeky_h, out), axis=2)
out = self.lstm.forward(out)
out = np.concatenate((peeky_h, out), axis=2)
score = self.affine.forward(out)
char_id = np.argmax(score.flatten())
sampled.append(char_id)
return sampled
Now, in the model generation of the addition model earlier, enable model = PeekySeq2seq (vocab_size, wordvec_size, hidden_size)
and execute it.
It's a dramatic effect! The correct answer rate was 99.1%.
Recommended Posts