[Ce site](http://www.ie110704.net/2017/08/21/attention-seq2seq%E3%81%A7%E5] pour l'auto-apprentissage personnel lors de la création d'un chatbot dans un séminaire % AF% BE% E8% A9% B1% E3% 83% A2% E3% 83% 87% E3% 83% AB% E3% 82% 92% E5% AE% 9F% E8% A3% 85% E3% 81 % 97% E3% 81% A6% E3% 81% BF% E3% 81% 9F /) a été mentionnée.
--Je veux transmettre des données dans un fichier. --Je veux lister les données passées dans le fichier. --Je souhaite confirmer l'apprentissage en sélectionnant dans la liste des données passées dans le fichier.
taiwa_model_file.py
import datetime
import numpy as np
import chainer
import chainer.functions as F
import chainer.links as L
import MeCab
import numpy as np
import re
#Récupérer les données d'un fichier
with open("test_data.txt",encoding="utf-8") as f:
s = f.read()
f.close()
#Données de liste
l = [x.strip() for x in re.split('\t|\n',s)]
l.pop(-1)
data = np.array(l).reshape(-1, 2,1).tolist()
print("-----------------------------")
print(data)
print("-----------------------------")
gpu = -1
if gpu >= 0: #numpy ou cuda.cupy
xp = chainer.cuda.cupy
chainer.cuda.get_device(gpu).use()
else:
xp = np
#Définition de la classe de transformation de données
class DataConverter:
def __init__(self, batch_col_size):
"""Initialisation de classe
Args:
batch_col_size:Taille du nombre de mots du mini-lot pendant l'apprentissage
"""
self.mecab = MeCab.Tagger() #Analyseur morphologique
self.vocab = {"<eos>":0, "<unknown>": 1} #Dictionnaire de mots
self.batch_col_size = batch_col_size
def load(self, data):
"""Pendant la formation, lisez les données de l'enseignant et convertissez-les en un tableau Numpy correspondant à la taille du mini-lot
Args:
data:Données de dialogue
"""
#Enregistrement du dictionnaire de mots
self.vocab = {"<eos>":0, "<unknown>": 1} #Initialiser le dictionnaire de mots
for d in data:
sentences = [d[0][0], d[1][0]] #Texte d'entrée, texte de réponse
for sentence in sentences:
sentence_words = self.sentence2words(sentence) #Décomposer les phrases en mots
for word in sentence_words:
if word not in self.vocab:
self.vocab[word] = len(self.vocab)
#Identification et organisation des données enseignants
queries, responses = [], []
for d in data:
query, response = d[0][0], d[1][0] #Instruction d'encodage, instruction de décodage
queries.append(self.sentence2ids(sentence=query, train=True, sentence_type="query"))
responses.append(self.sentence2ids(sentence=response, train=True, sentence_type="response"))
self.train_queries = xp.vstack(queries)
self.train_responses = xp.vstack(responses)
def sentence2words(self, sentence):
"""Renvoie la phrase sous forme de tableau de mots
Args:
sentence:Chaîne de phrase
"""
sentence_words = []
for m in self.mecab.parse(sentence).split("\n"): #Décomposer en mots par analyse morphologique
w = m.split("\t")[0].lower() #mot
if len(w) == 0 or w == "eos": #Caractères illégaux, EOS omis
continue
sentence_words.append(w)
sentence_words.append("<eos>") #Enfin enregistré en vocal<eos>Remplacer
return sentence_words
def sentence2ids(self, sentence, train=True, sentence_type="query"):
"""Convertissez des phrases en tableau Numpy d'ID de mot et retournez
Args:
sentence:Chaîne de phrase
train:Que ce soit pour apprendre
sentence_type:Pour changer la direction de compensation de taille pour l'apprentissage et pour la prise en charge des mini-lots avec réponse à la requête"query"or"response"Spécifier
Returns:
ids:Tableau Numpy d'ID de mot
"""
ids = [] #Un tableau qui est converti en un identifiant de mot et stocké
sentence_words = self.sentence2words(sentence) #Décomposer les phrases en mots
for word in sentence_words:
if word in self.vocab: #Si le mot existe dans le dictionnaire de mots, convertissez-le en ID
ids.append(self.vocab[word])
else: #Si le mot n'existe pas dans le dictionnaire de mots<unknown>Convertir en
ids.append(self.vocab["<unknown>"])
#Lors de l'apprentissage, ajustez la taille du nombre de mots et convertissez-le en Numpy pour la prise en charge des mini-lots
if train:
if sentence_type == "query": #Dans le cas d'une requête, jusqu'à ce que la taille du numéro de mot du mini-lot soit atteinte avant-Compenser pour 1
while len(ids) > self.batch_col_size: #S'il est plus grand que la taille du mot du mini-lot, coupez-le depuis le début jusqu'à ce qu'il atteigne la taille du mot du mini-lot.
ids.pop(0)
ids = xp.array([-1]*(self.batch_col_size-len(ids))+ids, dtype="int32")
elif sentence_type == "response": #Dans le cas de réponse, jusqu'à ce qu'il atteigne la taille du nombre de mots du mini-lot vers l'arrière-Compenser pour 1
while len(ids) > self.batch_col_size: #S'il est plus grand que la taille de mot du mini-lot, coupez-le de la fin jusqu'à ce qu'il atteigne la taille du mot du mini-lot.
ids.pop()
ids = xp.array(ids+[-1]*(self.batch_col_size-len(ids)), dtype="int32")
else: #Au moment de la prédiction, convertissez en Numpy tel quel
ids = xp.array([ids], dtype="int32")
return ids
def ids2words(self, ids):
"""Au moment de la prédiction, convertissez le tableau Numpy d'ID de mot en mots et renvoyez
Args:
ids:Tableau Numpy d'ID de mot
Returns:
words:Disposition des mots
"""
words = [] #Tableau pour stocker les mots
for i in ids: #Reportez-vous à l'identifiant du mot du dictionnaire de mots dans l'ordre et convertissez-le en mot
words.append(list(self.vocab.keys())[list(self.vocab.values()).index(i)])
return words
#Définition de classe de modèle
#Classe d'encodeur LSTM
class LSTMEncoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Instanciation du codeur
Args:
vocab_size:Nombre de types de mots utilisés
embed_size:Taille des mots dans la représentation vectorielle
hidden_size:Taille du calque masqué
"""
super(LSTMEncoder, self).__init__(
xe = L.EmbedID(vocab_size, embed_size, ignore_label=-1),
eh = L.Linear(embed_size, 4 * hidden_size),
hh = L.Linear(hidden_size, 4 * hidden_size)
)
def __call__(self, x, c, h):
"""Calcul du codeur
Args:
x: one-mot chaud
c:Mémoire interne
h:Couche cachée
Returns:
Mémoire interne suivante, couche cachée suivante
"""
e = F.tanh(self.xe(x))
return F.lstm(c, self.eh(e) + self.hh(h))
# Attention Model +Classe de décodeur LSTM
class AttLSTMDecoder(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size):
"""Instanciation du décodeur pour le modèle Attention
Args:
vocab_size:Nombre de vocabulaire
embed_size:Taille du vecteur Word
hidden_size:Taille du calque masqué
"""
super(AttLSTMDecoder, self).__init__(
ye = L.EmbedID(vocab_size, embed_size, ignore_label=-1), #Couche pour convertir des mots en vecteurs de mots
eh = L.Linear(embed_size, 4 * hidden_size), #Un calque qui transforme un vecteur de mot en un vecteur quatre fois plus grand que le calque masqué
hh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme le vecteur intermédiaire du décodeur en un vecteur quatre fois plus grand que le calque caché
fh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme la moyenne pondérée du vecteur intermédiaire de l'encodeur avant en un vecteur quatre fois la taille du calque caché
bh = L.Linear(hidden_size, 4 * hidden_size), #Un calque qui transforme la moyenne pondérée du vecteur intermédiaire de l'encodeur avant en un vecteur quatre fois la taille du calque caché
he = L.Linear(hidden_size, embed_size), #Un calque qui convertit un vecteur de taille de calque masqué en la taille d'un vecteur de mot
ey = L.Linear(embed_size, vocab_size) #Couche pour convertir le vecteur de mot en vecteur de taille de vocabulaire
)
def __call__(self, y, c, h, f, b):
"""Calcul du décodeur
Args:
y:Mots à entrer dans le décodeur
c:Mémoire interne
h:Vecteur intermédiaire décodeur
f:Moyenne pondérée du codeur direct calculé par le modèle Attention
b:Moyenne pondérée du codeur inversé calculée par Attention Model
Returns:
Dictionnaire de la taille du vocabulaire, mémoire interne mise à jour, vecteur intermédiaire mis à jour
"""
e = F.tanh(self.ye(y)) #Convertir des mots en vecteurs de mots
c, h = F.lstm(c, self.eh(e) + self.hh(h) + self.fh(f) + self.bh(b)) #LSTM avec vecteur de mots, vecteur intermédiaire du décodeur, Attention du codeur avant, Attention du codeur inverse
t = self.ey(F.tanh(self.he(h))) #Convertir la sortie vectorielle intermédiaire de LSTM en vecteur de taille lexicale
return t, c, h
#Classe de modèle d'attention
class Attention(chainer.Chain):
def __init__(self, hidden_size):
"""Instanciation de l'attention
Args:
hidden_size:Taille du calque masqué
"""
super(Attention, self).__init__(
fh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire de l'encodeur avant en un vecteur de taille de couche caché
bh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire de l'encodeur inverse en un vecteur de taille de couche caché
hh = L.Linear(hidden_size, hidden_size), #Une couche de couplage linéaire qui transforme le vecteur intermédiaire du décodeur en un vecteur de taille de couche caché
hw = L.Linear(hidden_size, 1), #Couche de couplage linéaire pour convertir des vecteurs de taille de couche cachés en scalaires
)
self.hidden_size = hidden_size #Souvenez-vous de la taille du calque caché
def __call__(self, fs, bs, h):
"""Calcul de l'attention
Args:
fs:Une liste de vecteurs intermédiaires d'encodeur avant
bs:Liste des vecteurs intermédiaires du codeur inverse
h:Sortie vectorielle intermédiaire par le décodeur
Returns:
Moyenne pondérée du vecteur intermédiaire du codeur direct, moyenne pondérée du vecteur intermédiaire du codeur inverse
"""
batch_size = h.data.shape[0] #Rappelez-vous la taille du mini lot
ws = [] #Initialisation de la liste pour enregistrer les poids
sum_w = chainer.Variable(xp.zeros((batch_size, 1), dtype='float32')) #Initialisez la valeur pour calculer la valeur totale du poids
#Calcul du poids à l'aide du vecteur intermédiaire de l'encodeur et du vecteur intermédiaire du décodeur
for f, b in zip(fs, bs):
w = F.tanh(self.fh(f)+self.bh(b)+self.hh(h)) #Calcul du poids à l'aide du vecteur intermédiaire du codeur direct, du vecteur intermédiaire du codeur inverse, du vecteur intermédiaire du décodeur
w = F.exp(self.hw(w)) #Normaliser à l'aide de la fonction softmax
ws.append(w) #Enregistrez le poids calculé
sum_w += w
#Initialisation du vecteur moyen pondéré en sortie
att_f = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
att_b = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
for f, b, w in zip(fs, bs, ws):
w /= sum_w #Normalisé pour que la somme des poids soit 1.
#poids*Ajouter le vecteur intermédiaire de l'encodeur au vecteur de sortie
att_f += F.reshape(F.batch_matmul(f, w), (batch_size, self.hidden_size))
att_b += F.reshape(F.batch_matmul(b, w), (batch_size, self.hidden_size))
return att_f, att_b
#Séquence d'attention à la classe de modèle de séquence
class AttSeq2Seq(chainer.Chain):
def __init__(self, vocab_size, embed_size, hidden_size, batch_col_size):
"""Attention +Instanciation Seq2 Seq
Args:
vocab_size:Nombre de taille de vocabulaire
embed_size:Taille du vecteur Word
hidden_size:Taille du calque masqué
"""
super(AttSeq2Seq, self).__init__(
f_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Encodeur avant
b_encoder = LSTMEncoder(vocab_size, embed_size, hidden_size), #Encodeur inversé
attention = Attention(hidden_size), # Attention Model
decoder = AttLSTMDecoder(vocab_size, embed_size, hidden_size) # Decoder
)
self.vocab_size = vocab_size
self.embed_size = embed_size
self.hidden_size = hidden_size
self.decode_max_size = batch_col_size #Le décodage se termine lorsque EOS est sorti, nombre maximum de vocabulaire de sortie lorsqu'il n'est pas sorti
#Initialisez la liste pour stocker le vecteur intermédiaire de l'encodeur avant et le vecteur intermédiaire de l'encodeur inverse
self.fs = []
self.bs = []
def encode(self, words, batch_size):
"""Calcul du codeur
Args:
words:Une liste enregistrée de mots à utiliser dans votre saisie
batch_size:Mini taille de lot
"""
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Calcul du codeur avant
for w in words:
c, h = self.f_encoder(w, c, h)
self.fs.append(h) #Enregistrer le vecteur intermédiaire calculé
#Mémoire interne, initialisation du vecteur intermédiaire
c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
#Calcul du codeur inversé
for w in reversed(words):
c, h = self.b_encoder(w, c, h)
self.bs.insert(0, h) #Enregistrer le vecteur intermédiaire calculé
#Mémoire interne, initialisation du vecteur intermédiaire
self.c = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
self.h = chainer.Variable(xp.zeros((batch_size, self.hidden_size), dtype='float32'))
def decode(self, w):
"""Calcul du décodeur
Args:
w:Mots à saisir avec Decoder
Returns:
Mot prédictif
"""
att_f, att_b = self.attention(self.fs, self.bs, self.h)
t, self.c, self.h = self.decoder(w, self.c, self.h, att_f, att_b)
return t
def reset(self):
"""Initialiser les variables d'instance
"""
#Initialisation de la liste qui enregistre le vecteur intermédiaire de l'encodeur
self.fs = []
self.bs = []
#Initialisation du gradient
self.zerograds()
def __call__(self, enc_words, dec_words=None, train=True):
"""Une fonction qui calcule la propagation vers l'avant
Args:
enc_words:Une liste de mots prononcés
dec_words:Une liste de mots dans la phrase de réponse
train:Apprentissage ou prédiction
Returns:
Perte totale calculée ou chaîne de décodage prévue
"""
enc_words = enc_words.T
if train:
dec_words = dec_words.T
batch_size = len(enc_words[0]) #Enregistrer la taille du lot
self.reset() #Réinitialiser le dégradé stocké dans le modèle
enc_words = [chainer.Variable(xp.array(row, dtype='int32')) for row in enc_words] #Changer les mots de la liste d'énoncés en type variable
self.encode(enc_words, batch_size) #Calcul d'encodage
t = chainer.Variable(xp.array([0 for _ in range(batch_size)], dtype='int32')) # <eos>Vers le décodeur
loss = chainer.Variable(xp.zeros((), dtype='float32')) #Initialisation de la perte
ys = [] #Une liste de mots générés par le décodeur
#Calcul du décodeur
if train: #Calculer la perte d'apprentissage
for w in dec_words:
y = self.decode(t) #Décoder mot par mot
t = chainer.Variable(xp.array(w, dtype='int32')) #Convertir le mot correct en type variable
loss += F.softmax_cross_entropy(y, t) #Calculez la perte en comparant le mot correct avec le mot prédit
return loss
else: #Générer une chaîne décodée pour la prédiction
for i in range(self.decode_max_size):
y = self.decode(t)
y = xp.argmax(y.data) #Puisqu'il est toujours produit avec probabilité, obtenez le mot prédit avec une probabilité élevée
ys.append(y)
t = chainer.Variable(xp.array([y], dtype='int32'))
if y == 0: #Lors de la sortie d'EOS, le décodage est terminé.
break
return ys
#Apprentissage
#constant
embed_size = 100
hidden_size = 100
batch_size = 6 #Nombre de tailles de lots pour l'apprentissage par mini-lots
batch_col_size = 15
epoch_num = 50 #Nombre d'époques
N = len(data) #Nombre de données sur les enseignants
#Lire les données des enseignants
data_converter = DataConverter(batch_col_size=batch_col_size) #Convertisseur de données
data_converter.load(data) #Lire les données des enseignants
vocab_size = len(data_converter.vocab) #Nombre de mots
#Modèle de déclaration
model = AttSeq2Seq(vocab_size=vocab_size, embed_size=embed_size, hidden_size=hidden_size, batch_col_size=batch_col_size)
opt = chainer.optimizers.Adam()
opt.setup(model)
opt.add_hook(chainer.optimizer.GradientClipping(5))
if gpu >= 0:
model.to_gpu(gpu)
model.reset()
#Apprentissage
st = datetime.datetime.now()
for epoch in range(epoch_num):
#Mini apprentissage par lots
perm = np.random.permutation(N) #Obtenez une liste aléatoire de colonnes entières
total_loss = 0
for i in range(0, N, batch_size):
enc_words = data_converter.train_queries[perm[i:i+batch_size]]
dec_words = data_converter.train_responses[perm[i:i+batch_size]]
model.reset()
loss = model(enc_words=enc_words, dec_words=dec_words, train=True)
loss.backward()
loss.unchain_backward()
total_loss += loss.data
opt.update()
if (epoch+1)%10 == 0:
ed = datetime.datetime.now()
print("epoch:\t{}\ttotal loss:\t{}\ttime:\t{}".format(epoch+1, total_loss, ed-st))
st = datetime.datetime.now()
def predict(model, query):
enc_query = data_converter.sentence2ids(query, train=False)
dec_response = model(enc_words=enc_query, train=False)
response = data_converter.ids2words(dec_response)
print(query, "=>", response)
#Sélectionnez dans une liste de données pour confirmer l'apprentissage
predict(model, str(data[0][0]))
Séparez les mots "phrase supérieure" et "phrase du milieu, phrase inférieure" dans les onglets Les données ci-dessous sont Recherche Haiku de Association Haijin / Musée de la littérature Haiku. ) De
test_data.txt
Assurez-vous de l'appeler un poulet
Le premier jour, je me demande si c'est une cacahuète
Saumon de fin d'année renversant du sel
Poème du Nouvel An de Manyoshu
Ni long ni court
Afin de faire face à la nouvelle lumière
Je me demande si c'est le premier jour que je n'ai pas encore fait
Droite Gauche Grand miroir première pratique
Enlève tes épaules et accepte le radis
Trois jours sont abattus dans les yeux de Kokaku
Shinonome no Miya Ruins Masaden Première vue
-----------------------------
[[['Avec du poulet'], ['Assurez-vous d'aller à Isuzugawa']], [['Premier jour'], ['Vert plume et paon']], [['Saumon de fin d'année'], ['Ah, répandre du sel']], [['Manyoshu'], ['Poème du nouvel an']], [['Pas longtemps'], ['Pas une question courte']], [['Première lumière'], ['Pour faire face à de nouveaux cœurs']], [['Encore'], ['Je me demande si c'est le premier jour où je n'ai rien à faire']], [['Droite gauche'], ['Première pratique du grand miroir']], [['Enlève ton épaule'], ['Acceptez le radis']], [['Trois jours'], ['Tirez dans les yeux du rôle petit angle']], [['Shinomeno'], ['La première vue du sanctuaire principal']]]
-----------------------------
epoch: 10 total loss: 53.870222091674805 time: 0:00:12.398713
epoch: 20 total loss: 30.608291625976562 time: 0:00:12.503922
epoch: 30 total loss: 13.965360641479492 time: 0:00:12.470424
epoch: 40 total loss: 6.161568880081177 time: 0:00:12.560850
epoch: 50 total loss: 2.741897940635681 time: 0:00:12.466510
['Avec du poulet'] => ['Ihe', 'Si', 'vous devez', 'Rivière Isuzu', '<eos>']
Recommended Posts