Chainer1.11.0 has been released, and it seems that a function called Trainer that abstracts the learning loop has been added, so let's try learning using our own AV actress face image dataset.
For information on face image extraction and data expansion, refer to Publishing know-how on creating a similar image search service for AV actresses through deep learning by Qiita --chainer. please. In the original article, it is converted to numpy format, but this time it will not be converted to numpy format because the image is read directly from the directory during learning.
It is assumed that the face image used here has 1000 images for each actress, resized to a size of 64 x 64, and divided into the following directories.
./root
|
|--- /actress1
| |--- image1.jpg
| |--- image2.jpg
| |--- image3.jpg
|
|--- /actress2
| .
| .
|--- /actress3
.
.
.
First, the face image data is divided into training and verification. It is possible to learn while dividing the data for learning and verification when reading the data at the time of learning, but it is difficult to understand what kind of data is used in learning and what kind of data is used in verification, so it is divided in advance. I will do it.
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import glob
import logging
import os
import random
import shutil
def separate_train_val(args):
if not os.path.exists(args.output_dir):
os.mkdir(args.output_dir)
if not os.path.exists(os.path.join(args.output_dir, 'train')):
os.mkdir(os.path.join(args.output_dir, 'train'))
if not os.path.exists(os.path.join(args.output_dir, 'val')):
os.mkdir(os.path.join(args.output_dir, 'val'))
directories = os.listdir(args.root)
for dir_index, dir_name in enumerate(directories):
files = glob.glob(os.path.join(args.root, dir_name, '*.jpg'))
random.shuffle(files)
if len(files) == 0: continue
for file_index, file_path in enumerate(files):
if file_index % args.val_freq != 0:
target_dir = os.path.join(args.output_dir, 'train', dir_name)
if not os.path.exists(target_dir):
os.mkdir(target_dir)
shutil.copy(file_path, target_dir)
logging.info('Copied {} => {}'.format(file_path, target_dir))
else:
target_dir = os.path.join(args.output_dir, 'val', dir_name)
if not os.path.exists(target_dir):
os.mkdir(target_dir)
shutil.copy(file_path, target_dir)
logging.info('Copied {} => {}'.format(file_path, target_dir))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
parser = argparse.ArgumentParser(description='converter')
parser.add_argument('--root', default='.')
parser.add_argument('--output_dir', default='.')
parser.add_argument('--val_freq', type=int, default=10)
args = parser.parse_args()
separate_train_val(args)
The divided directory has the following structure.
./train_val_root
|
|--- /train
| |--- actress1
| | |--- image1.jpg
| | |--- image2.jpg
| | |--- image3.jpg
| |・
| |・
| |--- actress2
| |・
| |・
|
|--- /val
| |--- actress1
| |
| |--- actress2
.
.
Defines a class that inherits chainer.dataset.DatasetMixin
and reads data from the specified directory. I have defined a method (create_label_file
) that outputs the class (numbers from 0 to 9) and label (directory name) used for recognition, but this is unpleasant, so please do not copy it.
class DatasetFromDirectory(chainer.dataset.DatasetMixin):
def __init__(self, root='.', label_out='', dtype=np.float32, label_dtype=np.int32):
directories = os.listdir(root)
label_table = []
pairs = [] # tuple (filepath, label) list
for dir_index, dir_name in enumerate(directories):
label_table.append((dir_index, dir_name))
file_paths = glob.glob(os.path.join(root, dir_name, '*.jpg'))
for file_path in file_paths:
pairs.append((file_path, dir_index))
self._pairs = pairs
self._root = root
self._label_out = label_out
self._label_table = label_table
self._dtype = dtype
self._label_dtype = label_dtype
if label_out != '':
self.create_label_file()
def __len__(self):
return len(self._pairs)
def get_example(self, i):
path, int_label = self._pairs[i]
with Image.open(path) as f:
image = np.asarray(f, dtype=self._dtype)
image = image.transpose(2, 0, 1)
label = np.array(int_label, dtype=self._label_dtype)
return image, label
def create_label_file(self):
with open(self._label_out, "w") as f:
for (label_index, label_name) in self._label_table:
f.write('{},{}\n'.format(label_index, label_name))
If you look at Official imagenet sample, you can process the data during training based on the created dataset class. You can also. By randomly rotating the image a little or shifting the image a little during training, it is less likely to learn from the exact same data, so improvement in generalization performance can be expected.
You will learn the data set that you actually prepared. By implementing using Chainer Trainer, it can be implemented with about half the amount of the original code.
class CNN(chainer.Chain):
"""
CNN (CCPCCPCP)
"""
def __init__(self, n_classes):
super(CNN, self).__init__(
conv1_1=L.Convolution2D(3, 32, 3, pad=1),
bn1_1=L.BatchNormalization(32),
conv1_2=L.Convolution2D(32, 32, 3, pad=1),
bn1_2=L.BatchNormalization(32),
conv2_1=L.Convolution2D(32, 64, 3, pad=1),
bn2_1=L.BatchNormalization(64),
conv2_2=L.Convolution2D(64, 64, 3, pad=1),
bn2_2=L.BatchNormalization(64),
conv3_1=L.Convolution2D(64, 128, 3, pad=1),
bn3_1=L.BatchNormalization(128),
fc4=L.Linear(8192, 1024),
fc5=L.Linear(1024, n_classes),
)
self.train = True
def __call__(self, x, t):
h = F.relu(self.bn1_1(self.conv1_1(x), test=not self.train))
h = F.relu(self.bn1_2(self.conv1_2(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.relu(self.bn2_1(self.conv2_1(h), test=not self.train))
h = F.relu(self.bn2_2(self.conv2_2(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.relu(self.bn3_1(self.conv3_1(h), test=not self.train))
h = F.max_pooling_2d(h, 2, 2)
h = F.dropout(F.relu(self.fc4(h)), ratio=0.3, train=self.train)
h = self.fc5(h)
loss = F.softmax_cross_entropy(h, t)
chainer.report({'loss': loss, 'accuracy': F.accuracy(h, t)}, self)
return loss
model = CNN(10)
optimizer = chainer.optimizers.Adam()
optimizer.setup(model)
mean = np.load(args.mean)
train_data = datasets.DatasetFromDirectory(args.train_root, label_out=label_file)
val_data = datasets.DatasetFromDirectory(args.val_root)
train_iter = chainer.iterators.SerialIterator(train_data, args.batch_size)
val_iter = chainer.iterators.SerialIterator(val_data, args.batch_size, repeat=False, shuffle=False)
# Set up a trainer
updater = training.StandardUpdater(train_iter, optimizer, device=args.gpu)
trainer = training.Trainer(updater, (args.n_epoch, 'epoch'), out=args.output_dir)
snapshot_interval = (args.snapshot_interval, 'iteration')
# Copy the chain with shared parameters to flip 'train' flag only in test
eval_model = model.copy()
eval_model.train = False
trainer.extend(extensions.Evaluator(val_iter, eval_model, device=args.gpu))
trainer.extend(extensions.dump_graph('main/loss'))
trainer.extend(extensions.snapshot(), trigger=snapshot_interval)
trainer.extend(extensions.snapshot_object(
model, 'model_iter_{.updater.iteration}'), trigger=snapshot_interval)
trainer.extend(extensions.snapshot_object(
optimizer, 'optimizer_iter_{.updater.iteration}'), trigger=snapshot_interval)
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(
['epoch', 'main/loss', 'validation/main/loss',
'main/accuracy', 'validation/main/accuracy']))
trainer.extend(extensions.ProgressBar(update_interval=10))
if args.resume:
if not os.path.exists(args.resume):
raise IOError('Resume file is not exists.')
logging.info('Load optimizer state from {}'.format(args.resume))
chainer.serializers.load_npz(args.resume, trainer)
trainer.run()
# Save the trained model
chainer.serializers.save_npz(os.path.join(args.output_dir, 'model_final'), model)
chainer.serializers.save_npz(os.path.join(args.output_dir, 'optimizer_final'), optimizer)
print()
logging.info('Saved the model and the optimizer')
logging.info('Training is finished!')
Since the object saved by ʻextensions.snapshot ()is for trainer, it is necessary to save
model and ʻoptimizer
to be read when actually predicting by ʻextensions.snapshot_object ()` separately.
I tried learning my own dataset using Chainer Trainer. As for the impression of using Trainer, as expected, it is close to Keras. When I first tried using Chainer, I remember that it took a lot of time to read each mini-batch, so I felt that Trainer, which abstracts such parts, is an easy-to-understand implementation.
However, in Keras, you can use flow_from_directory of the ImageDataGenerator class to read data from the directory without implementing the Dataset class, so it is easier to create. can also do.
Last but not least, I'm making a site that uses CNN to search for similar images of AV actresses, so please take a look if you like.
Babelink --Similar AV actress search service
Recommended Posts