C'est une poussée. Il y a peut-être un meilleur moyen.
Lorsque min_after_dequeue
est plus petit que la taille du fichier, tf.shuffle_batch
est biaisé. C'est parce que shuffle_batch
ne mélange que les images mises en file d'attente.
Par exemple, supposons que vous triiez 70 000 images mnist (7 000 pour chaque étiquette) par ordre croissant d'étiquettes et que vous les enregistriez dans tfrecord avec les étiquettes.
À ce stade, si min_after_dequeue
of tf.train.shuffle_batch
est défini sur 10000 et que 50000 étiquettes sont retirées, la distribution des étiquettes sera
On dirait. L'axe horizontal est l'ordre de retrait et l'axe vertical est l'étiquette obtenue. Au début (jusqu'à ce que 4000 feuilles soient retirées = 1400 jusqu'à ce que le premier enregistrement "2" soit mis en file d'attente), seuls les enregistrements "0" ou "1" sont mis en file d'attente, donc seulement 0 ou 1 apparaît. Hmm. De plus, comme l'étiquette «9» n'est incluse qu'après la 63001e feuille, elle n'est jamais apparue lorsque les 50 000 étiquettes sont sorties.
Plus directement, si vous ajoutez un numéro d'enregistrement à tfrecord et prenez ce numéro d'enregistrement sur l'axe vertical,
On dirait. Seuls les enregistrements avec des nombres faibles sont renvoyés dans la première moitié du lot, et les enregistrements avec des nombres élevés sont principalement renvoyés dans la seconde moitié du lot. Cependant, dans la seconde moitié du lot, il y a encore quelques jeunes disques avec des échappées de Dequeue mélangées et chanceuses (?).
Pour plus de détails, par exemple lors de l'utilisation de plusieurs tfrecords, voir «[[Tensorflow] Bias investigation when shuffle_batch with TFRecord file](http://ykicisk.hatenablog.com/entry/2016/12/18/ 184840) ».
Je pense qu'il existe des solutions de contournement.
min_after_dequeue
sur le nombre d'enregistrements dans le fichier ou plus, et mettez tout le contenu du fichier dans la mémoire.min_after_dequeue
sur le nombre d'enregistrements dans le fichier ou plus.Je pense qu'il n'y a que le dernier moyen d'obtenir un traitement asynchrone et une randomisation suffisamment bonne en même temps (mais faites-moi savoir s'il y a autre chose). Par conséquent, j'écrirai les mesures que j'ai prises pour cela, en utilisant le jeu de données MNIST comme exemple.
Dans ce qui suit, nous écrirons deux méthodes d'implémentation en parallèle.
La mise en œuvre de 1 fonctionne en copiant "opération commune" et "dans le cas de l'image tfrecord". La mise en œuvre de 2 fonctionne en copiant "opération commune" et "dans le cas du chemin tfrecord".
Pour essayer la méthode habituelle (enregistrer l'image dans tfrecord) et la méthode que vous voulez faire cette fois (enregistrer dans le chemin tfrecord au lieu de l'image), créez un tfrecord de données MNIST avec chaque méthode.
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
MNIST_DIR = './MNIST-data'
TFRECORD_DIR = './tfrecords'
IMAGE_DIR = './images'
def make_mnist_data():
mnist_data = input_data.read_data_sets(MNIST_DIR, validation_size=0)
#Collectez toutes les données d'entraînement et les données de test
labels = np.r_[mnist_data[0].labels, mnist_data[2].labels]
images = np.r_[mnist_data[0].images, mnist_data[2].images]
#L'image est modifiée à la forme de l'image.
images = (images * 255).astype(np.uint8).reshape((-1, 28, 28))
#Triez les images par ordre de 0.
order = np.argsort(labels)
labels = labels[order] # np.repeat(np.arange(0, 10), 7000)Pareil que
images = images[order] #Images manuscrites triées par ordre croissant
indices = np.arange(len(labels), dtype=int) # 0~Index de 69999
return indices, labels, images
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
Normalement, tfrecord est créé comme ceci (l'index n'est pas nécessaire, mais pour le moment).
def image_to_tfexample(index, label, image):
image_string = image.tostring()
return tf.train.Example(features=tf.train.Features(feature={
'index': int64_feature(index),
'label': int64_feature(label),
'image_string': bytes_feature(image_string)
}))
indices, labels, images = make_mnist_data()
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_image.tfrecord')
with tf.python_io.TFRecordWriter(tfrecord_path) as writer:
for index, label, image in zip(indices, labels, images):
example = image_to_tfexample(index, label, image)
writer.write(example.SerializeToString())
Séparément de la création de tfrecord, enregistrez l'image png dans IMAGE_DIR.
def path_to_tfexample(index, label, path):
path_string = path.encode('utf-8')
return tf.train.Example(features=tf.train.Features(feature={
'index': int64_feature(index),
'label': int64_feature(label),
'path_string': bytes_feature(path_string)
}))
indices, labels, images = make_mnist_data()
paths = [os.path.join(IMAGE_DIR, f'{i}.png') for i in indices]
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_path.tfrecord')
with tf.python_io.TFRecordWriter(tfrecord_path) as writer:
for index, label, path in zip(indices, labels, paths):
example = path_to_tfexample(index, label, path)
writer.write(example.SerializeToString())
#Enregistrez l'image MNIST séparément du tfrecord
for path, image in zip(paths, images):
Image.fromarray(image).save(path)
Dans l'exemple ci-dessous, min_after_dequeue
est défini sur 10000 (la taille commune de l'image d'entrée est de 224x224x3 ou plus, donc selon la mémoire, des dizaines de milliers peuvent être la limite).
Avec cette taille de min_after_dequeue
, la distribution est biaisée comme le montre la figure au début (plutôt, les données ont été obtenues à partir de cette ʻinput_pipeline` et dessinées).
BATCH_SIZE = 20
def read_tfrecord(filename_queue):
reader = tf.TFRecordReader()
key, record_string = reader.read(filename_queue)
example = tf.parse_single_example(record_string, features={
'index': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
'image_string': tf.FixedLenFeature([], tf.string)
})
index = tf.cast(example['index'], tf.int32)
label = tf.cast(example['label'], tf.int32)
image = tf.decode_raw(example['image_string'], tf.uint8)
image = tf.reshape(image, [28, 28, 1])
image.set_shape([28, 28, 1])
return index, label, image
def input_pipeline(filenames):
filename_queue = tf.train.string_input_producer(filenames)
index, label, image = read_tfrecord(filename_queue)
index_batch, label_batch, image_batch = tf.train.shuffle_batch(
[index, label, image],
batch_size=BATCH_SIZE,
min_after_dequeue=10000,
capacity=10000 + 3 * BATCH_SIZE,
num_threads=1,
)
return index_batch, label_batch, image_batch
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_image.tfrecord')
index_batch, label_batch, image_batch = input_pipeline([tfrecord_path, ])
Dans l'exemple ci-dessous, le premier min_after_dequeue
est défini sur 70 000. Le chemin est juste une chaîne et je pense qu'il tiendra en mémoire sans aucun problème. Avec cette taille de min_after_dequeue
, la distribution n'est pas biaisée comme le montre la figure au début.
En revanche, la capacité du lot contenant des images est d'environ 10 000. Ceci est juste pour la version tfrecord de l'image et n'a pas besoin d'être mélangé, donc cela peut en fait être beaucoup moins (la capacité par défaut est de 32).
De plus, la forme de la sortie est souvent insérée pour qu'elle devienne «[BATCH_SIZE,]» ou «[BATCH_SIZE, 28, 28, 1]».
La raison d'avoir un lot en deux étapes est simple: si vous faites exactement la même chose que tfrecord pour une image, le traitement asynchrone s'arrêtera à la lecture du chemin. Le traitement prend du temps entre le chargement de l'image et le prétraitement, donc si cela ne fonctionne pas dans les coulisses, il n'y a presque aucun goût.
BATCH_SIZE = 20
def read_tfrecord(filename_queue):
reader = tf.TFRecordReader()
key, record_string = reader.read(filename_queue)
example = tf.parse_single_example(record_string, features={
'index': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
'path_string': tf.FixedLenFeature([], tf.string)
})
index = tf.cast(example['index'], tf.int32)
label = tf.cast(example['label'], tf.int32)
path = example['path_string']
return index, label, path
def image_from_path(path):
png_bytes = tf.read_file(path)
image = tf.image.decode_png(png_bytes, channels=1)
image.set_shape([28, 28, 1])
return image
def input_pipeline(filenames):
filename_queue = tf.train.string_input_producer(filenames)
index, label, path = read_tfrecord(filename_queue)
index_batch, label_batch, path_batch = tf.train.shuffle_batch(
[index, label, path],
batch_size=1,
min_after_dequeue=70000,
capacity=70000 + 3 * 1,
num_threads=1
)
index_batch_flatten = tf.reshape(index_batch, [-1])
label_batch_flatten = tf.reshape(label_batch, [-1])
path_batch_flatten = tf.reshape(path_batch, [-1])
image_batch_flatten = tf.map_fn(image_from_path, path_batch_flatten, dtype=tf.uint8)
index_batch, label_batch, image_batch = tf.train.batch(
[index_batch_flatten, label_batch_flatten, image_batch_flatten],
batch_size=BATCH_SIZE,
capacity=10000 + 3 * BATCH_SIZE,
num_threads=1,
)
index_batch = tf.reshape(index_batch, [-1])
label_batch = tf.reshape(label_batch, [-1])
image_batch = tf.reshape(image_batch, [-1, 28, 28, 1])
return index_batch, label_batch, image_batch
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_path.tfrecord')
index_batch, label_batch, image_batch = input_pipeline([tfrecord_path, ])
Vérifiez la sortie de ʻindex_batch,
label_batch, ʻimage_batch
créés par chaque méthode.
init_op = tf.local_variables_initializer()
results = {'index': [], 'label': []}
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(2500): #La taille du lot étant de 20, 50 000 feuilles
result = sess.run([index_batch, label_batch])
results['index'].append(result[0])
results['label'].append(result[1])
coord.request_stop()
coord.join(threads)
fig = plt.figure(figsize=(10, 5))
for i, key in enumerate(('index', 'label')):
ax = fig.add_subplot(1, 2, i + 1)
y = np.array(results[key]).flatten()
x = np.arange(len(y))
ax.plot(x, y, '.')
fig.show()
J'ai oublié de l'écrire, mais l'axe horizontal est l'ordre dans lequel le lot a été retiré, et l'axe vertical est le numéro d'enregistrement du lot (à gauche) ou l'étiquette de réponse correcte (à droite).
C'est bien mélangé.
En enregistrant uniquement le chemin dans tfrecord, nous avons pu créer un lot d'images entièrement mélangé de manière asynchrone tout en économisant de la mémoire. Pour des données de dizaines de millions d'unités, je pense que les mêmes mesures peuvent être prises en divisant le fichier (il faut le mélanger plutôt que de mettre l'image directement dans tfrecord).
Recommended Posts