Chainer1.11.0 a été publié, et il semble qu'une fonction pour résumer la boucle d'apprentissage appelée Trainer a été ajoutée, alors essayons d'apprendre en utilisant notre propre jeu de données d'image de visage d'actrice AV.
Pour plus d'informations sur l'extraction d'images de visage et l'expansion des données, reportez-vous à Publication du savoir-faire de création d'un service de recherche d'images similaire pour les actrices audiovisuelles grâce à l'apprentissage en profondeur par Qiita --chainer. S'il vous plaît. Dans l'article d'origine, il est converti au format numpy, mais cette fois, il n'est pas converti au format numpy car l'image est lue directement à partir du répertoire pendant l'apprentissage.
On suppose que l'image du visage utilisée ici contient 1000 images pour chaque actrice, redimensionnées à une taille de 64 x 64 et divisées dans les répertoires suivants.
./root
|
|--- /actress1
| |--- image1.jpg
| |--- image2.jpg
| |--- image3.jpg
|
|--- /actress2
| .
| .
|--- /actress3
.
.
.
Tout d'abord, les données d'image du visage sont divisées en formation et vérification. Il est possible d'apprendre en divisant les données pour l'apprentissage et la vérification lors de la lecture des données au moment de l'apprentissage, mais comme il est difficile de comprendre quelles données sont utilisées dans l'apprentissage et quelles données sont utilisées pour la vérification, elles sont divisées à l'avance. Je le ferai.
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import glob
import logging
import os
import random
import shutil
def separate_train_val(args):
if not os.path.exists(args.output_dir):
os.mkdir(args.output_dir)
if not os.path.exists(os.path.join(args.output_dir, 'train')):
os.mkdir(os.path.join(args.output_dir, 'train'))
if not os.path.exists(os.path.join(args.output_dir, 'val')):
os.mkdir(os.path.join(args.output_dir, 'val'))
directories = os.listdir(args.root)
for dir_index, dir_name in enumerate(directories):
files = glob.glob(os.path.join(args.root, dir_name, '*.jpg'))
random.shuffle(files)
if len(files) == 0: continue
for file_index, file_path in enumerate(files):
if file_index % args.val_freq != 0:
target_dir = os.path.join(args.output_dir, 'train', dir_name)
if not os.path.exists(target_dir):
os.mkdir(target_dir)
shutil.copy(file_path, target_dir)
logging.info('Copied {} => {}'.format(file_path, target_dir))
else:
target_dir = os.path.join(args.output_dir, 'val', dir_name)
if not os.path.exists(target_dir):
os.mkdir(target_dir)
shutil.copy(file_path, target_dir)
logging.info('Copied {} => {}'.format(file_path, target_dir))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
parser = argparse.ArgumentParser(description='converter')
parser.add_argument('--root', default='.')
parser.add_argument('--output_dir', default='.')
parser.add_argument('--val_freq', type=int, default=10)
args = parser.parse_args()
separate_train_val(args)
Le répertoire divisé a la structure suivante.
./train_val_root
|
|--- /train
| |--- actress1
| | |--- image1.jpg
| | |--- image2.jpg
| | |--- image3.jpg
| |・
| |・
| |--- actress2
| |・
| |・
|
|--- /val
| |--- actress1
| |
| |--- actress2
.
.
Définit une classe qui hérite de chainer.dataset.DatasetMixin
et lit les données du répertoire spécifié. Nous avons défini une méthode (create_label_file
) qui affiche la classe (nombres de 0 à 9) et l'étiquette (nom du répertoire) utilisées pour la reconnaissance, mais cela est désagréable et ne doit pas être imité.
class DatasetFromDirectory(chainer.dataset.DatasetMixin):
def __init__(self, root='.', label_out='', dtype=np.float32, label_dtype=np.int32):
directories = os.listdir(root)
label_table = []
pairs = [] # tuple (filepath, label) list
for dir_index, dir_name in enumerate(directories):
label_table.append((dir_index, dir_name))
file_paths = glob.glob(os.path.join(root, dir_name, '*.jpg'))
for file_path in file_paths:
pairs.append((file_path, dir_index))
self._pairs = pairs
self._root = root
self._label_out = label_out
self._label_table = label_table
self._dtype = dtype
self._label_dtype = label_dtype
if label_out != '':
self.create_label_file()
def __len__(self):
return len(self._pairs)
def get_example(self, i):
path, int_label = self._pairs[i]
with Image.open(path) as f:
image = np.asarray(f, dtype=self._dtype)
image = image.transpose(2, 0, 1)
label = np.array(int_label, dtype=self._label_dtype)
return image, label
def create_label_file(self):
with open(self._label_out, "w") as f:
for (label_index, label_name) in self._label_table:
f.write('{},{}\n'.format(label_index, label_name))
Si vous regardez Official imagenet sample, vous pouvez traiter les données pendant l'entraînement en fonction de la classe d'ensemble de données créée. Vous pouvez également. En faisant légèrement pivoter l'image au hasard ou en décalant légèrement l'image pendant l'entraînement, il est moins probable qu'il apprenne exactement à partir des mêmes données, de sorte qu'une amélioration des performances de généralisation peut être attendue.
Vous apprendrez l'ensemble de données que vous avez réellement préparé. En mettant en œuvre à l'aide de Chainer Trainer, il peut être implémenté avec environ la moitié du montant du code d'origine.
class CNN(chainer.Chain):
"""
CNN (CCPCCPCP)
"""
def __init__(self, n_classes):
super(CNN, self).__init__(
conv1_1=L.Convolution2D(3, 32, 3, pad=1),
bn1_1=L.BatchNormalization(32),
conv1_2=L.Convolution2D(32, 32, 3, pad=1),
bn1_2=L.BatchNormalization(32),
conv2_1=L.Convolution2D(32, 64, 3, pad=1),
bn2_1=L.BatchNormalization(64),
conv2_2=L.Convolution2D(64, 64, 3, pad=1),
bn2_2=L.BatchNormalization(64),
conv3_1=L.Convolution2D(64, 128, 3, pad=1),
bn3_1=L.BatchNormalization(128),
fc4=L.Linear(8192, 1024),
fc5=L.Linear(1024, n_classes),
)
self.train = True
def __call__(self, x, t):
h = F.relu(self.bn1_1(self.conv1_1(x), test=not self.train))
h = F.relu(self.bn1_2(self.conv1_2(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.relu(self.bn2_1(self.conv2_1(h), test=not self.train))
h = F.relu(self.bn2_2(self.conv2_2(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.relu(self.bn3_1(self.conv3_1(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.dropout(F.relu(self.fc4(h)), ratio=0.3, train=self.train)
h = self.fc5(h)
loss = F.softmax_cross_entropy(h, t)
chainer.report({'loss': loss, 'accuracy': F.accuracy(h, t)}, self)
return loss
model = CNN(10)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
mean = np.load(args.mean)
train_data = datasets.DatasetFromDirectory(args.train_root, label_out=label_file)
val_data = datasets.DatasetFromDirectory(args.val_root)
train_iter = chainer.iterators.SerialIterator(train_data, args.batch_size)
val_iter = chainer.iterators.SerialIterator(val_data, args.batch_size, repeat=False, shuffle=False)
# Set up a trainer
updater = training.StandardUpdater(train_iter, optimizer, device=args.gpu)
trainer = training.Trainer(updater, (args.n_epoch, 'epoch'), out=args.output_dir)
snapshot_interval = (args.snapshot_interval, 'iteration')
# Copy the chain with shared parameters to flip 'train' flag only in test
eval_model = model.copy()
eval_model.train = False
trainer.extend(extensions.Evaluator(val_iter, eval_model, device=args.gpu))
trainer.extend(extensions.dump_graph('main/loss'))
trainer.extend(extensions.snapshot(), trigger=snapshot_interval)
trainer.extend(extensions.snapshot_object(
model, 'model_iter_{.updater.iteration}'), trigger=snapshot_interval)
trainer.extend(extensions.snapshot_object(
optimizer, 'optimizer_iter_{.updater.iteration}'), trigger=snapshot_interval)
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(
['epoch', 'main/loss', 'validation/main/loss',
'main/accuracy', 'validation/main/accuracy']))
trainer.extend(extensions.ProgressBar(update_interval=10))
if args.resume:
if not os.path.exists(args.resume):
raise IOError('Resume file is not exists.')
logging.info('Load optimizer state from {}'.format(args.resume))
chainer.serializers.load_npz(args.resume, trainer)
trainer.run()
# Save the trained model
chainer.serializers.save_npz(os.path.join(args.output_dir, 'model_final'), model)
chainer.serializers.save_npz(os.path.join(args.output_dir, 'optimizer_final'), optimizer)
print()
logging.info('Saved the model and the optimizer')
logging.info('Training is finished!')
Puisque l'objet sauvegardé par ʻextensions.snapshot () est destiné à l'entraîneur, le
model et ʻoptimizer
à charger lors de la prédiction réelle doivent être sauvegardés séparément par ʻextensions.snapshot_object () `.
J'ai essayé d'apprendre mon propre ensemble de données en utilisant Chainer Trainer. Comme prévu, l'impression d'utiliser Trainer est qu'il est proche de Keras. Lorsque j'ai essayé d'utiliser Chainer pour la première fois, je me souviens que la lecture de chaque mini-lot prenait beaucoup de temps, j'ai donc eu le sentiment que Trainer, qui résume ces parties, est une implémentation facile à comprendre.
Cependant, dans Keras, vous pouvez utiliser flow_from_directory of ImageDataGenerator class pour lire les données du répertoire sans implémenter la classe Dataset, ce qui facilite la création. peut aussi faire.
Enfin, je crée un site qui recherche des images similaires d'actrices audiovisuelles utilisant CNN, alors jetez un œil si vous le souhaitez.
Babelink --Service de recherche d'actrice audiovisuelle similaire
Recommended Posts