The top YouTuber in Japan is Mr. HIKAKIN (hereinafter referred to as "HIKAKIN"). I also love it and watch videos every day.
Hikakin is HIKAKIN, HikakinTV, [HikakinGames](https://www. We operate four channels: youtube.com/user/HikakinGames) and HikakinBlog. Here, I thought it would be interesting to be able to determine which channel a video belongs to based on only the information called thumbnail images, so I implemented it using machine learning.
We use TensorFlow as a machine learning framework. And from collecting images, in TensorFlow [CNN (Convolutional Neural Network)](https://ja.wikipedia.org/wiki/%E7%95%B3%E3%81%BF%E8%BE%BC%E3 % 81% BF% E3% 83% 8B% E3% 83% A5% E3% 83% BC% E3% 83% A9% E3% 83% AB% E3% 83% 8D% E3% 83% 83% E3% 83 I would like to introduce the flow of implementation from the point of implementing% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF) to the point of actually inferring.
Python
tool | version | Use / purpose |
---|---|---|
Python | 3.6.1 | |
Selenium | 3.4.0 | Scraping |
TensorFlow | 1.1.0 | Machine learning |
NumPy | 1.12.1 | Numerical calculation |
tool | version | Use |
---|---|---|
ChromeDriver | 2.29 | To run Chrome on Selenium |
iTerm2 | 3.0.15 | To display the image on the terminal |
fetch_urls.py
import os
import sys
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def fetch_urls(channel):
driver = webdriver.Chrome()
url = os.path.join('https://www.youtube.com/user', channel, 'videos')
driver.get(url)
while True:
driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
try:
#Wait for the "Load more" button to be clickable.
more = WebDriverWait(driver, 3).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'load-more-button'))
)
except StaleElementReferenceException:
continue;
except TimeoutException:
break;
more.click()
selector = '.yt-thumb-default .yt-thumb-clip img'
elements = driver.find_elements_by_css_selector(selector)
src_list = [element.get_attribute('src') for element in elements]
driver.quit()
with open(f'urls/{channel}.txt', 'wt') as f:
for src in src_list:
print(src, file=f)
if __name__ == '__main__':
fetch_urls(sys.argv[1])
Use Selenium to interact with Google Chrome and collect thumbnail image URLs.
Specifically, first scroll down the screen until the "Load more" button at the bottom of the screen disappears. Doing so will display all thumbnail images on your browser screen. After that, get the value of the src attribute of all the img elements corresponding to the thumbnail image and write it to the text file under the ʻurls` directory.
$ python fetch_urls.py HikakinTV
$ wc -l urls/HikakinTV.txt
2178 urls/HikakinTV.txt
$ head -n 3 urls/HikakinTV.txt
https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=tRWLF3Pa-fZrEa5XTmPeHyVORv4
https://i.ytimg.com/vi/bolTkMSMrSA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=a0_PeYpyB9RrOhb3ySd4i7nJ9P8
https://i.ytimg.com/vi/jm4cK_XPqMA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=VymexTRKLE_wQaYtSKqrph1okcA
download.rb
import os
import random
import re
import sys
import time
from urllib.request import urlretrieve
def download(channel):
with open(f'urls/{channel}.txt', 'rt') as f:
lines = f.readlines()
dir = os.path.join('images', channel)
if not os.path.exists(dir):
os.makedirs(dir)
for url in lines:
# https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg
#Use the part of ieHNKaG1KfA in the URL as the image name.
name = re.findall(r'(?<=vi/).*(?=/hqdefault)', url)[0]
path = os.path.join(dir, f'{name}.jpg')
if os.path.exists(path):
print(f'{path} already exists')
continue
print(f'download {path}')
urlretrieve(url, path)
time.sleep(1 + random.randint(0, 2))
if __name__ == '__main__':
download(sys.argv[1])
After reading the text file output by fetch_urls.py
, use urlretrieve (). Download the thumbnail image.
By the way, all the downloaded thumbnail images are unified in size to 196 x 110. It should be easy to handle: blush:
$ python download.py HikakinTV
download images/HikakinTV/1ngTnVb9oF0.jpg
download images/HikakinTV/AGonzpJtyYU.jpg
images/HikakinTV/MvwxFi3ypNg.jpg already exists
(Abbreviation)
$ ls -1 images/HikakinTV | wc -l
2178
$ ls -1 images/HikakinTV
-2DRamjx75o.jpg
-5Xk6i1jVhs.jpg
-9U3NOHsT1k.jpg
(Abbreviation)
split_images.py
import glob
import numpy as np
import os
import shutil
def clean_data():
for dirpath, _, filenames in os.walk('data'):
for filename in filenames:
os.remove(os.path.join(dirpath, filename))
def split_pathnames(dirpath):
pathnames = glob.glob(f'{dirpath}/*')
np.random.shuffle(pathnames)
#reference:Dataset with NumPy(ndarray)To any percentage
# http://qiita.com/QUANON/items/e28335fa0e9f553d6ab1
return np.split(pathnames, [int(0.7 * len(pathnames))])
def copy_images(data_dirname, class_dirname, image_pathnames):
class_dirpath = os.path.join('data', data_dirname, class_dirname)
if not os.path.exists(class_dirpath):
os.makedirs(class_dirpath)
for image_pathname in image_pathnames:
image_filename = os.path.basename(image_pathname)
shutil.copyfile(image_pathname,
os.path.join(class_dirpath, image_filename))
def split_images():
for class_dirname in os.listdir('images'):
image_dirpath = os.path.join('images', class_dirname)
if not os.path.isdir(image_dirpath):
continue
train_pathnames, test_pathnames = split_pathnames(image_dirpath)
copy_images('train', class_dirname, train_pathnames)
copy_images('test', class_dirname, test_pathnames)
if __name__ == '__main__':
clean_data()
split_images()
ʻImages / channel name The image file downloaded to the directory is randomly divided into training data and test data. Specifically, the image files in the ʻimages / channel name
directory should be placed in the data / train / channel name
directory or the data / test / channel name
directory so that the ratio of training data to test data is 7: 3. Copy to.
images/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
↓ train : test = 7 :Copy to be 3
data/
├ train/
│ ├ HIKAKIN/
│ ├ HikakinBlog/
│ ├ HikakinGames/
│ └ HikakinTV/
│
└ test/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
$ python split_images.py
$ find images -name '*.jpg' | wc -l
3652
$ find data/train -name '*.jpg' | wc -l
2555
$ find data/test -name '*.jpg' | wc -l
1097
config.py
from enum import Enum
class Channel(Enum):
HIKAKIN = 0
HikakinBlog = 1
HikakinGames = 2
HikakinTV = 3
LOG_DIR = 'log'
write_csv_file.py
import os
import csv
from config import Channel, LOG_DIR
def write_csv_file(dir):
with open(os.path.join(dir, 'data.csv'), 'wt') as f:
for i, channel in enumerate(Channel):
image_dir = os.path.join(dir, channel.name)
writer = csv.writer(f, lineterminator='\n')
for filename in os.listdir(image_dir):
writer.writerow([os.path.join(image_dir, filename), i])
if __name__ == '__main__':
write_csv_file('data/train')
write_csv_file('data/test')
Later, you'll use it to load images and labels using TensorFlow for learning and testing.
$ python write_csv_file.py
$ cat data/train/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abbreviation)
data/train/HikakinBlog/-OtqlF5BMNY.jpg,1
data/train/HikakinBlog/07XKtHfni1A.jpg,1
(Abbreviation)
data/train/HikakinGames/-2VyYsCkPZI.jpg,2
data/train/HikakinGames/-56bZU-iqQ4.jpg,2
(Abbreviation)
data/train/HikakinTV/-5Xk6i1jVhs.jpg,3
data/train/HikakinTV/-9U3NOHsT1k.jpg,3
(Abbreviation)
$ cat data/test/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abbreviation)
data/test/HikakinBlog/2Z6GB9JjV4I.jpg,1
data/test/HikakinBlog/4eGZtFhZWIE.jpg,1
(Abbreviation)
data/test/HikakinGames/-FpYaEmiq1M.jpg,2
data/test/HikakinGames/-HFXWY1-M8M.jpg,2
(Abbreviation)
data/test/HikakinTV/-2DRamjx75o.jpg,3
data/test/HikakinTV/-9zt1EfKJYI.jpg,3
(Abbreviation)
cnn.py
import tensorflow as tf
class CNN:
def __init__(self, image_size=48, class_count=2, color_channel_count=3):
self.image_size = image_size
self.class_count = class_count
self.color_channel_count = color_channel_count
#Function for inference.
def inference(self, x, keep_prob, softmax=False):
#Tf for storing weights.Create a Variable.
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
#Tf to store bias.Create a Variable.
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
#Perform convolution.
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
# [2x2]Pooling is performed with the size of and the amount of movement 2.
def max_pool_2x2(x):
return tf.nn.max_pool(x,
ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
padding='SAME')
x_image = tf.reshape(
x,
[-1, self.image_size, self.image_size, self.color_channel_count])
with tf.name_scope('conv1'):
W_conv1 = weight_variable([5, 5, self.color_channel_count, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
with tf.name_scope('pool1'):
h_pool1 = max_pool_2x2(h_conv1)
with tf.name_scope('conv2'):
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
with tf.name_scope('pool2'):
h_pool2 = max_pool_2x2(h_conv2)
with tf.name_scope('fc1'):
W_fc1 = weight_variable(
[int(self.image_size / 4) * int(self.image_size / 4) * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(
h_pool2,
[-1, int(self.image_size / 4) * int(self.image_size / 4) * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
with tf.name_scope('fc2'):
W_fc2 = weight_variable([1024, self.class_count])
b_fc2 = bias_variable([self.class_count])
y = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
if softmax:
with tf.name_scope('softmax'):
y = tf.nn.softmax(y)
return y
#A loss function for calculating the error between the inference result and the correct answer.
def loss(self, y, labels):
#Calculate the cross entropy.
# tf.nn.softmax_cross_entropy_with_The logits argument logits
#Do not give a variable to which the softmax function is applied.
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=labels))
tf.summary.scalar('cross_entropy', cross_entropy)
return cross_entropy
#Functions for learning
def training(self, cross_entropy, learning_rate=1e-4):
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
return train_step
#Correct answer rate(accuracy)To ask.
def accuracy(self, y, labels):
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
An implementation of the CNN model. It's the heart of this project, but it's mostly about the TensorFlow tutorial Deep MNIST for Experts [Code](https://github. Same as com / tensorflow / tensorflow / blob / master / tensorflow / examples / tutorials / mnist / mnist_deep.py). However, it is classified to increase versatility.
load_data.py
import tensorflow as tf
def load_data(csvpath, batch_size, image_size, class_count,
shuffle=False, min_after_dequeue=1000):
queue = tf.train.string_input_producer([csvpath], shuffle=shuffle)
reader = tf.TextLineReader()
key, value = reader.read(queue)
imagepath, label = tf.decode_csv(value, [['imagepath'], [0]])
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.image.resize_images(image, [image_size, image_size])
#Scale to 0 on average.
image = tf.image.per_image_standardization(image)
#Label value one-Convert to hot expression.
label = tf.one_hot(label, depth=class_count, dtype=tf.float32)
capacity = min_after_dequeue + batch_size * 3
if shuffle:
images, labels = tf.train.shuffle_batch(
[image, label],
batch_size=batch_size,
num_threads=4,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
else:
images, labels = tf.train.batch(
[image, label],
batch_size=batch_size,
capacity=capacity)
return images, labels
A function for reading images and labels from CSV. It will be used later in learning and testing. Use tf.train.shuffle_batch () to shuffle test data during training, without shuffling during testing I'm assuming you're using tf.train.batch ().
train.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
#Suppress TensorFlow warning messages.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('step_count', 1000, 'Number of steps.')
flags.DEFINE_integer('batch_size', 50, 'Batch size.')
flags.DEFINE_float('learning_rate', 1e-4, 'Initial learning rate.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/train/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=True)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
loss = cnn.loss(logits, labels)
train_op = cnn.training(loss, FLAGS.learning_rate)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter(LOG_DIR, sess.graph)
for step in range(1, FLAGS.step_count + 1):
_, loss_value, accuracy_value = sess.run(
[train_op, loss, accuracy], feed_dict={keep_prob: 0.5})
if step % 10 == 0:
print(f'step {step}: training accuracy {accuracy_value}')
summary = sess.run(summary_op, feed_dict={keep_prob: 1.0})
summary_writer.add_summary(summary, step)
coord.request_stop()
coord.join(threads)
save_path = saver.save(sess, os.path.join(LOG_DIR, 'model.ckpt'))
if __name__ == '__main__':
main()
Actually read the image and learn CNN. This time, 1,000 steps of learning are performed, and the correct answer rate (accuray) is output every 10 steps. Save the learned parameters in log / model.ckpt
.
$ python train.py
step 10: training accuracy 0.5600000023841858
step 20: training accuracy 0.47999998927116394
step 30: training accuracy 0.7200000286102295
(Abbreviation)
step 980: training accuracy 1.0
step 990: training accuracy 0.9800000190734863
step 1000: training accuracy 0.9800000190734863
Also, if you start TensorBoard in another session of the terminal and access http://0.0.0.0:6006 with a web browser, the transition of values such as correct answer rate (accuray) and cross entropy (cross_entropy) for the training data is graphed. You can check it.
$ tensorboard --logdir ./log
Starting TensorBoard b'47' at http://0.0.0.0:6006
(Press CTRL+C to quit)
test.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('batch_size', 1000, 'Batch size.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/test/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=False)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
saver.restore(sess, os.path.join(LOG_DIR, 'model.ckpt'))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
accuracy_value = sess.run(accuracy, feed_dict={keep_prob: 0.5})
print(f'test accuracy: {accuracy_value}')
coord.request_stop()
coord.join(threads)
if __name__ == '__main__':
main()
Measure the accuracy of the trained model by finding the correct answer rate (accuray) for the test data.
$ find data/test -name '*.jpg' | wc -l
1097
$ python test.py --batch_size 1097
test accuracy: 0.7657247185707092
This time, the correct answer rate was about 76.6%. If you infer at random, it should be a quarter, that is, 25.0%, so I think that you can learn correctly, but there seems to be room for improvement in accuracy.
inference.py
import numpy as np
import os
import sys
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
def load_image(imagepath, image_size):
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.cast(image, tf.float32)
image = tf.image.resize_images(image, [image_size, image_size])
image = tf.image.per_image_standardization(image)
return image
def print_results(imagepath, softmax):
os.system(f'imgcat {imagepath}')
mex_channel_name_length = max(len(channel.name) for channel in Channel)
for channel, value in zip(Channel, softmax):
print(f'{channel.name.ljust(mex_channel_name_length + 1)}: {value}')
print()
prediction = Channel(np.argmax(softmax)).name
for channel in Channel:
if channel.name in imagepath:
answer = channel.name
break
print(f'Guess: {prediction},Correct answer: {answer}')
Finally, we make inferences using the trained model. Look at the result of the softmax function and use the class with the larger value as the inference result.
By the way, it is distributed on the iTerm2 and iTerm2 Images pages imgcat. You can use (: //raw.githubusercontent.com/gnachman/iTerm2/master/tests/imgcat) to output the image as it is on the terminal. It is convenient because the input image and the inference result can be combined and output on the terminal.
Let's select some test data and infer it.
HikakinTV and HikakinGames have a high percentage of correct answers, probably because of the large amount of data.
On the other hand, HIKAKIN and HikakinBlog have a low percentage of correct answers, probably because the number of data is small.
I narrowed down the test data to only one channel and calculated the correct answer rate.
Channel | Number of test data | Correct answer rate(%) |
---|---|---|
HIKAKIN | 50 | 20.0 |
HikakinBlog | 19 | 15.8 |
HikakinGames | 374 | 68.4 |
HikakinTV | 654 | 69.4 |
Well, after all, if there is little data, the correct answer rate is extremely poor.
In particular, I feel that it can be improved considerably just by increasing the number of training data, so I will try it in the future.
There are innumerable reference materials, so we will carefully select only those that have been particularly helpful. Also, official documents are excluded.
Articles of the gods: pray :: sparkles:
Data entry in TensorFlow was very annoying. I really appreciate the articles of my predecessors: pray :: sparkles:
Recommended Posts