It is a push. Maybe there is a better way.
When min_after_dequeue
is smaller than the file size, tf.shuffle_batch
is biased. This is because shuffle_batch
only shuffles enqueued images.
For example, suppose you sort 70,000 mnist images (7,000 for each label) in ascending order of labels and record them in tfrecord along with the labels.
At this time, if min_after_dequeue
of tf.train.shuffle_batch
is set to 10000 and 50,000 labels are taken out, the label distribution will be
It looks like. The horizontal axis is the order of extraction, and the vertical axis is the obtained label. At the beginning (until 4000 records are taken out = 1400 until the 1st "2" record is euqueued), only "0" or "1" records are queued, so only 0 or 1 appears. Hmm. Also, since the label "9" is included only after the 63001th label, it has never appeared when the 50,000 labels are taken out.
More directly, if you add a record number to tfrecord and take that record number on the vertical axis,
It looks like. Only records with low numbers are returned in the first half of the batch, and records with large numbers are mainly returned in the second half of the batch. However, in the second half of the batch, there are still some young records with shuffles and lucky (?) Dequeue escapes.
For more detailed information such as when using multiple tfrecords, see "[[Tensorflow] Bias investigation when shuffle_batch with TFRecord file](http://ykicisk.hatenablog.com/entry/2016/12/18/ 184840) ”.
I think there are some workarounds.
min_after_dequeue
should not be created in the first place.min_after_dequeue
to the number of records in the file or more, and put all the contents of the file in memory.min_after_dequeue
to the number of records in the file or more.I think there is only one last way to achieve asynchronous processing and good enough randomization at the same time (but if there is anything else, please let me know). Therefore, I will write the measures I took for this, using the MNIST dataset as an example.
In the following, we will write two implementation methods in parallel.
The implementation of 1 works by copying "common operation" and "in the case of image tfrecord". The implementation of 2 works by copying "common operation" and "in the case of tfrecord of path".
Create a tfrecord of MNIST data with each method to try the normal method (save the image in tfrecord) and the method you want to do this time (save in the path tfrecord instead of the image).
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
MNIST_DIR = './MNIST-data'
TFRECORD_DIR = './tfrecords'
IMAGE_DIR = './images'
def make_mnist_data():
mnist_data = input_data.read_data_sets(MNIST_DIR, validation_size=0)
#Collect all training data and test data
labels = np.r_[mnist_data[0].labels, mnist_data[2].labels]
images = np.r_[mnist_data[0].images, mnist_data[2].images]
#The image is modified to the shape of the image.
images = (images * 255).astype(np.uint8).reshape((-1, 28, 28))
#Sort the images in order from 0.
order = np.argsort(labels)
labels = labels[order] # np.repeat(np.arange(0, 10), 7000)Same as
images = images[order] #Handwritten images sorted in ascending order
indices = np.arange(len(labels), dtype=int) # 0~Index of 69999
return indices, labels, images
def int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
Normally, tfrecord is created like this (index is unnecessary, but for the time being).
def image_to_tfexample(index, label, image):
image_string = image.tostring()
return tf.train.Example(features=tf.train.Features(feature={
'index': int64_feature(index),
'label': int64_feature(label),
'image_string': bytes_feature(image_string)
}))
indices, labels, images = make_mnist_data()
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_image.tfrecord')
with tf.python_io.TFRecordWriter(tfrecord_path) as writer:
for index, label, image in zip(indices, labels, images):
example = image_to_tfexample(index, label, image)
writer.write(example.SerializeToString())
Separately from creating tfrecord, save the png image in IMAGE_DIR.
def path_to_tfexample(index, label, path):
path_string = path.encode('utf-8')
return tf.train.Example(features=tf.train.Features(feature={
'index': int64_feature(index),
'label': int64_feature(label),
'path_string': bytes_feature(path_string)
}))
indices, labels, images = make_mnist_data()
paths = [os.path.join(IMAGE_DIR, f'{i}.png') for i in indices]
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_path.tfrecord')
with tf.python_io.TFRecordWriter(tfrecord_path) as writer:
for index, label, path in zip(indices, labels, paths):
example = path_to_tfexample(index, label, path)
writer.write(example.SerializeToString())
#Save the MNIST image separately from the tfrecord
for path, image in zip(paths, images):
Image.fromarray(image).save(path)
In the example below, min_after_dequeue
is set to 10,000 (the common input image size is 224x224x3 or larger, so depending on the memory, tens of thousands may be the limit).
With this size of min_after_dequeue
, the distribution is biased as shown in the figure at the beginning (rather, the data was obtained from this ʻinput_pipeline` and drawn).
BATCH_SIZE = 20
def read_tfrecord(filename_queue):
reader = tf.TFRecordReader()
key, record_string = reader.read(filename_queue)
example = tf.parse_single_example(record_string, features={
'index': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
'image_string': tf.FixedLenFeature([], tf.string)
})
index = tf.cast(example['index'], tf.int32)
label = tf.cast(example['label'], tf.int32)
image = tf.decode_raw(example['image_string'], tf.uint8)
image = tf.reshape(image, [28, 28, 1])
image.set_shape([28, 28, 1])
return index, label, image
def input_pipeline(filenames):
filename_queue = tf.train.string_input_producer(filenames)
index, label, image = read_tfrecord(filename_queue)
index_batch, label_batch, image_batch = tf.train.shuffle_batch(
[index, label, image],
batch_size=BATCH_SIZE,
min_after_dequeue=10000,
capacity=10000 + 3 * BATCH_SIZE,
num_threads=1,
)
return index_batch, label_batch, image_batch
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_image.tfrecord')
index_batch, label_batch, image_batch = input_pipeline([tfrecord_path, ])
In the example below, the first min_after_dequeue
is set to 70,000. The path is just a string and I think it will fit in memory without any problems. With this size of min_after_dequeue
, there is no distribution bias as shown in the figure at the beginning.
On the other hand, the capacity of the batch that holds images is about 10,000. This is just for the tfrecord version of the image and doesn't need to be shuffled, so it can actually be much less (default capacity is 32).
In addition, reshape is often inserted so that the output shape is [BATCH_SIZE,]
or [BATCH_SIZE, 28, 28, 1]
.
The reason for having a two-tiered batch is simple: if you do exactly the same as tfrecord for an image, asynchronous processing will stop at reading the path. The processing takes time from image loading to preprocessing, so if this does not work behind the scenes, there is almost no taste.
BATCH_SIZE = 20
def read_tfrecord(filename_queue):
reader = tf.TFRecordReader()
key, record_string = reader.read(filename_queue)
example = tf.parse_single_example(record_string, features={
'index': tf.FixedLenFeature([], tf.int64),
'label': tf.FixedLenFeature([], tf.int64),
'path_string': tf.FixedLenFeature([], tf.string)
})
index = tf.cast(example['index'], tf.int32)
label = tf.cast(example['label'], tf.int32)
path = example['path_string']
return index, label, path
def image_from_path(path):
png_bytes = tf.read_file(path)
image = tf.image.decode_png(png_bytes, channels=1)
image.set_shape([28, 28, 1])
return image
def input_pipeline(filenames):
filename_queue = tf.train.string_input_producer(filenames)
index, label, path = read_tfrecord(filename_queue)
index_batch, label_batch, path_batch = tf.train.shuffle_batch(
[index, label, path],
batch_size=1,
min_after_dequeue=70000,
capacity=70000 + 3 * 1,
num_threads=1
)
index_batch_flatten = tf.reshape(index_batch, [-1])
label_batch_flatten = tf.reshape(label_batch, [-1])
path_batch_flatten = tf.reshape(path_batch, [-1])
image_batch_flatten = tf.map_fn(image_from_path, path_batch_flatten, dtype=tf.uint8)
index_batch, label_batch, image_batch = tf.train.batch(
[index_batch_flatten, label_batch_flatten, image_batch_flatten],
batch_size=BATCH_SIZE,
capacity=10000 + 3 * BATCH_SIZE,
num_threads=1,
)
index_batch = tf.reshape(index_batch, [-1])
label_batch = tf.reshape(label_batch, [-1])
image_batch = tf.reshape(image_batch, [-1, 28, 28, 1])
return index_batch, label_batch, image_batch
tfrecord_path = os.path.join(TFRECORD_DIR, 'mnist_path.tfrecord')
index_batch, label_batch, image_batch = input_pipeline([tfrecord_path, ])
Check the output of ʻindex_batch,
label_batch, ʻimage_batch
created by each method.
init_op = tf.local_variables_initializer()
results = {'index': [], 'label': []}
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(2500): #Since the batch size is 20, 50,000 sheets
result = sess.run([index_batch, label_batch])
results['index'].append(result[0])
results['label'].append(result[1])
coord.request_stop()
coord.join(threads)
fig = plt.figure(figsize=(10, 5))
for i, key in enumerate(('index', 'label')):
ax = fig.add_subplot(1, 2, i + 1)
y = np.array(results[key]).flatten()
x = np.arange(len(y))
ax.plot(x, y, '.')
fig.show()
I forgot to write it, but the horizontal axis is the order in which the batch was taken out, and the vertical axis is the record number (left) or correct answer label (right) of the batch.
It's mixed in nicely.
By saving only the path in tfrecord, we were able to create a sufficiently shuffled batch of images asynchronously while saving memory. In addition, I think that the same measures can be taken by dividing the file for tens of millions of data (it should be mixed rather than putting the image directly in tfrecord).
Recommended Posts