En parlant du meilleur YouTuber du Japon, c'est vrai HIKAKIN (ci-après dénommé "HIKAKIN"). Je l'aime aussi et je regarde des vidéos tous les jours.
Hikakin est HIKAKIN, HikakinTV, [HikakinGames](https: // www. Nous exploitons quatre canaux: youtube.com/user/HikakinGames) et HikakinBlog. Ici, j'ai pensé qu'il serait intéressant de pouvoir déterminer à quel canal appartient une vidéo en se basant uniquement sur les informations appelées images miniatures, je l'ai donc implémenté en utilisant l'apprentissage automatique.
Utilisez TensorFlow comme infrastructure d'apprentissage automatique. Ensuite, à partir de la collecte d'images, utilisez TensorFlow vers [CNN (Convolutional Neural Network)](https://ja.wikipedia.org/wiki/%E7%95%B3%E3%81%BF%E8%BE%BC%E3 % 81% BF% E3% 83% 8B% E3% 83% A5% E3% 83% BC% E3% 83% A9% E3% 83% AB% E3% 83% 8D% E3% 83% 83% E3% 83 Je voudrais présenter le flux de mise en œuvre du point de mise en œuvre% 88% E3% 83% AF% E3% 83% BC% E3% 82% AF) jusqu'au point de déduire réellement.
Python
outil | version | Utilisation / but |
---|---|---|
Python | 3.6.1 | |
Selenium | 3.4.0 | Grattage |
TensorFlow | 1.1.0 | Apprentissage automatique |
NumPy | 1.12.1 | Calcul numérique |
outil | version | Utilisation |
---|---|---|
ChromeDriver | 2.29 | Pour exécuter Chrome sur Selenium |
iTerm2 | 3.0.15 | Pour afficher l'image sur le terminal |
fetch_urls.py
import os
import sys
from selenium import webdriver
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
def fetch_urls(channel):
driver = webdriver.Chrome()
url = os.path.join('https://www.youtube.com/user', channel, 'videos')
driver.get(url)
while True:
driver.execute_script('window.scrollTo(0, document.body.scrollHeight);')
try:
#Attendez que le bouton "Charger plus" soit cliquable.
more = WebDriverWait(driver, 3).until(
EC.element_to_be_clickable((By.CLASS_NAME, 'load-more-button'))
)
except StaleElementReferenceException:
continue;
except TimeoutException:
break;
more.click()
selector = '.yt-thumb-default .yt-thumb-clip img'
elements = driver.find_elements_by_css_selector(selector)
src_list = [element.get_attribute('src') for element in elements]
driver.quit()
with open(f'urls/{channel}.txt', 'wt') as f:
for src in src_list:
print(src, file=f)
if __name__ == '__main__':
fetch_urls(sys.argv[1])
Utilisez Selenium pour interagir avec Google Chrome et collecter des URL pour les images miniatures.
Plus précisément, faites d'abord défiler l'écran jusqu'à ce que le bouton «Charger plus» en bas de l'écran disparaisse. Cela affichera toutes les images miniatures sur l'écran du navigateur. Après cela, récupérez la valeur de l'attribut src de tous les éléments img correspondant à l'image miniature et écrivez-la dans le fichier texte sous le répertoire ʻurls`.
$ python fetch_urls.py HikakinTV
$ wc -l urls/HikakinTV.txt
2178 urls/HikakinTV.txt
$ head -n 3 urls/HikakinTV.txt
https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=tRWLF3Pa-fZrEa5XTmPeHyVORv4
https://i.ytimg.com/vi/bolTkMSMrSA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=a0_PeYpyB9RrOhb3ySd4i7nJ9P8
https://i.ytimg.com/vi/jm4cK_XPqMA/hqdefault.jpg?custom=true&w=196&h=110&stc=true&jpg444=true&jpgq=90&sp=67&sigh=VymexTRKLE_wQaYtSKqrph1okcA
download.rb
import os
import random
import re
import sys
import time
from urllib.request import urlretrieve
def download(channel):
with open(f'urls/{channel}.txt', 'rt') as f:
lines = f.readlines()
dir = os.path.join('images', channel)
if not os.path.exists(dir):
os.makedirs(dir)
for url in lines:
# https://i.ytimg.com/vi/ieHNKaG1KfA/hqdefault.jpg
#Utilisez la partie ieHNKaG1KfA de l'URL comme nom d'image.
name = re.findall(r'(?<=vi/).*(?=/hqdefault)', url)[0]
path = os.path.join(dir, f'{name}.jpg')
if os.path.exists(path):
print(f'{path} already exists')
continue
print(f'download {path}')
urlretrieve(url, path)
time.sleep(1 + random.randint(0, 2))
if __name__ == '__main__':
download(sys.argv[1])
Après avoir lu le fichier texte sorti par fetch_urls.py
, utilisez urlretrieve (). Téléchargez l'image miniature.
À propos, toutes les images miniatures téléchargées ont une taille unifiée à 196 x 110. Il devrait être facile à manipuler: blush
$ python download.py HikakinTV
download images/HikakinTV/1ngTnVb9oF0.jpg
download images/HikakinTV/AGonzpJtyYU.jpg
images/HikakinTV/MvwxFi3ypNg.jpg already exists
(Abréviation)
$ ls -1 images/HikakinTV | wc -l
2178
$ ls -1 images/HikakinTV
-2DRamjx75o.jpg
-5Xk6i1jVhs.jpg
-9U3NOHsT1k.jpg
(Abréviation)
split_images.py
import glob
import numpy as np
import os
import shutil
def clean_data():
for dirpath, _, filenames in os.walk('data'):
for filename in filenames:
os.remove(os.path.join(dirpath, filename))
def split_pathnames(dirpath):
pathnames = glob.glob(f'{dirpath}/*')
np.random.shuffle(pathnames)
#référence:Ensemble de données avec NumPy(ndarray)Divisez en proportions arbitraires
# http://qiita.com/QUANON/items/e28335fa0e9f553d6ab1
return np.split(pathnames, [int(0.7 * len(pathnames))])
def copy_images(data_dirname, class_dirname, image_pathnames):
class_dirpath = os.path.join('data', data_dirname, class_dirname)
if not os.path.exists(class_dirpath):
os.makedirs(class_dirpath)
for image_pathname in image_pathnames:
image_filename = os.path.basename(image_pathname)
shutil.copyfile(image_pathname,
os.path.join(class_dirpath, image_filename))
def split_images():
for class_dirname in os.listdir('images'):
image_dirpath = os.path.join('images', class_dirname)
if not os.path.isdir(image_dirpath):
continue
train_pathnames, test_pathnames = split_pathnames(image_dirpath)
copy_images('train', class_dirname, train_pathnames)
copy_images('test', class_dirname, test_pathnames)
if __name__ == '__main__':
clean_data()
split_images()
ʻImages / nom du canal Le fichier image téléchargé dans le répertoire est divisé au hasard en données d'entraînement et données de test. Plus précisément, les fichiers image dans le répertoire ʻimages / nom de canal
doivent être stockés dans le répertoire data / train / nom de canal
ou dans le répertoire données / test / nom de canal
de sorte que le rapport entre les données d'entraînement et les données de test soit de 7: 3. Copier.
images/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
↓ train : test = 7 :Copier pour être 3
data/
├ train/
│ ├ HIKAKIN/
│ ├ HikakinBlog/
│ ├ HikakinGames/
│ └ HikakinTV/
│
└ test/
├ HIKAKIN/
├ HikakinBlog/
├ HikakinGames/
└ HikakinTV/
$ python split_images.py
$ find images -name '*.jpg' | wc -l
3652
$ find data/train -name '*.jpg' | wc -l
2555
$ find data/test -name '*.jpg' | wc -l
1097
config.py
from enum import Enum
class Channel(Enum):
HIKAKIN = 0
HikakinBlog = 1
HikakinGames = 2
HikakinTV = 3
LOG_DIR = 'log'
write_csv_file.py
import os
import csv
from config import Channel, LOG_DIR
def write_csv_file(dir):
with open(os.path.join(dir, 'data.csv'), 'wt') as f:
for i, channel in enumerate(Channel):
image_dir = os.path.join(dir, channel.name)
writer = csv.writer(f, lineterminator='\n')
for filename in os.listdir(image_dir):
writer.writerow([os.path.join(image_dir, filename), i])
if __name__ == '__main__':
write_csv_file('data/train')
write_csv_file('data/test')
Il sera utilisé plus tard dans la formation et les tests pour charger des images et des étiquettes à l'aide de TensorFlow.
$ python write_csv_file.py
$ cat data/train/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abréviation)
data/train/HikakinBlog/-OtqlF5BMNY.jpg,1
data/train/HikakinBlog/07XKtHfni1A.jpg,1
(Abréviation)
data/train/HikakinGames/-2VyYsCkPZI.jpg,2
data/train/HikakinGames/-56bZU-iqQ4.jpg,2
(Abréviation)
data/train/HikakinTV/-5Xk6i1jVhs.jpg,3
data/train/HikakinTV/-9U3NOHsT1k.jpg,3
(Abréviation)
$ cat data/test/data.csv
data/test/HIKAKIN/-c07QNF8lmM.jpg,0
data/test/HIKAKIN/0eHE-jfRQPo.jpg,0
(Abréviation)
data/test/HikakinBlog/2Z6GB9JjV4I.jpg,1
data/test/HikakinBlog/4eGZtFhZWIE.jpg,1
(Abréviation)
data/test/HikakinGames/-FpYaEmiq1M.jpg,2
data/test/HikakinGames/-HFXWY1-M8M.jpg,2
(Abréviation)
data/test/HikakinTV/-2DRamjx75o.jpg,3
data/test/HikakinTV/-9zt1EfKJYI.jpg,3
(Abréviation)
cnn.py
import tensorflow as tf
class CNN:
def __init__(self, image_size=48, class_count=2, color_channel_count=3):
self.image_size = image_size
self.class_count = class_count
self.color_channel_count = color_channel_count
#Fonction d'inférence.
def inference(self, x, keep_prob, softmax=False):
#Tf pour stocker les poids.Créez une variable.
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
#Tf pour stocker le biais.Créez une variable.
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
#Pliez-le.
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
# [2x2]La mise en commun est effectuée avec la taille et la quantité de mouvement 2.
def max_pool_2x2(x):
return tf.nn.max_pool(x,
ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
padding='SAME')
x_image = tf.reshape(
x,
[-1, self.image_size, self.image_size, self.color_channel_count])
with tf.name_scope('conv1'):
W_conv1 = weight_variable([5, 5, self.color_channel_count, 32])
b_conv1 = bias_variable([32])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
with tf.name_scope('pool1'):
h_pool1 = max_pool_2x2(h_conv1)
with tf.name_scope('conv2'):
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
with tf.name_scope('pool2'):
h_pool2 = max_pool_2x2(h_conv2)
with tf.name_scope('fc1'):
W_fc1 = weight_variable(
[int(self.image_size / 4) * int(self.image_size / 4) * 64, 1024])
b_fc1 = bias_variable([1024])
h_pool2_flat = tf.reshape(
h_pool2,
[-1, int(self.image_size / 4) * int(self.image_size / 4) * 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
with tf.name_scope('fc2'):
W_fc2 = weight_variable([1024, self.class_count])
b_fc2 = bias_variable([self.class_count])
y = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
if softmax:
with tf.name_scope('softmax'):
y = tf.nn.softmax(y)
return y
#Fonction de perte pour calculer l'erreur entre le résultat de l'inférence et la bonne réponse.
def loss(self, y, labels):
#Calculez l'entropie croisée.
# tf.nn.softmax_cross_entropy_with_logits argument logits
#Ne donnez pas de variables auxquelles la fonction softmax est appliquée.
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=labels))
tf.summary.scalar('cross_entropy', cross_entropy)
return cross_entropy
#Fonctions d'apprentissage
def training(self, cross_entropy, learning_rate=1e-4):
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
return train_step
#Taux de réponse correct(accuracy)Demander.
def accuracy(self, y, labels):
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
Une implémentation du modèle CNN. C'est le cœur de ce projet, mais il s'agit surtout du tutoriel TensorFlow Deep MNIST for Experts [Code](https: // github. Identique à com / tensorflow / tensorflow / blob / master / tensorflow / examples / tutorials / mnist / mnist_deep.py). Cependant, il est classé pour augmenter la polyvalence.
load_data.py
import tensorflow as tf
def load_data(csvpath, batch_size, image_size, class_count,
shuffle=False, min_after_dequeue=1000):
queue = tf.train.string_input_producer([csvpath], shuffle=shuffle)
reader = tf.TextLineReader()
key, value = reader.read(queue)
imagepath, label = tf.decode_csv(value, [['imagepath'], [0]])
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.image.resize_images(image, [image_size, image_size])
#Échelle à 0 en moyenne.
image = tf.image.per_image_standardization(image)
#Valeur d'étiquette un-Convertissez en expression chaude.
label = tf.one_hot(label, depth=class_count, dtype=tf.float32)
capacity = min_after_dequeue + batch_size * 3
if shuffle:
images, labels = tf.train.shuffle_batch(
[image, label],
batch_size=batch_size,
num_threads=4,
capacity=capacity,
min_after_dequeue=min_after_dequeue)
else:
images, labels = tf.train.batch(
[image, label],
batch_size=batch_size,
capacity=capacity)
return images, labels
Une fonction de lecture d'images et d'étiquettes à partir de CSV. Il sera utilisé plus tard dans l'apprentissage et les tests. Utilisez tf.train.shuffle_batch () pour mélanger les données de test pendant l'entraînement, sans mélanger pendant les tests Je suppose que vous utilisez tf.train.batch ().
train.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
#Supprimez les messages d'avertissement TensorFlow.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('step_count', 1000, 'Number of steps.')
flags.DEFINE_integer('batch_size', 50, 'Batch size.')
flags.DEFINE_float('learning_rate', 1e-4, 'Initial learning rate.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/train/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=True)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
loss = cnn.loss(logits, labels)
train_op = cnn.training(loss, FLAGS.learning_rate)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter(LOG_DIR, sess.graph)
for step in range(1, FLAGS.step_count + 1):
_, loss_value, accuracy_value = sess.run(
[train_op, loss, accuracy], feed_dict={keep_prob: 0.5})
if step % 10 == 0:
print(f'step {step}: training accuracy {accuracy_value}')
summary = sess.run(summary_op, feed_dict={keep_prob: 1.0})
summary_writer.add_summary(summary, step)
coord.request_stop()
coord.join(threads)
save_path = saver.save(sess, os.path.join(LOG_DIR, 'model.ckpt'))
if __name__ == '__main__':
main()
En fait, lisez l'image et apprenez CNN. Cette fois, 1 000 étapes d'apprentissage sont effectuées et le taux de réponse correct (précision) est émis toutes les 10 étapes. Enregistrez les paramètres appris dans log / model.ckpt
.
$ python train.py
step 10: training accuracy 0.5600000023841858
step 20: training accuracy 0.47999998927116394
step 30: training accuracy 0.7200000286102295
(Abréviation)
step 980: training accuracy 1.0
step 990: training accuracy 0.9800000190734863
step 1000: training accuracy 0.9800000190734863
De plus, si vous démarrez TensorBoard dans une autre session du terminal et accédez à http://0.0.0.0:6006 avec un navigateur Web, la transition des valeurs telles que le taux de réponse correct (exactitude) et l'entropie croisée (cross_entropy) pour les données d'entraînement est représentée graphiquement. Tu peux vérifier.
$ tensorboard --logdir ./log
Starting TensorBoard b'47' at http://0.0.0.0:6006
(Press CTRL+C to quit)
test.py
import os
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
from load_data import load_data
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
flags.DEFINE_integer('batch_size', 1000, 'Batch size.')
def main():
with tf.Graph().as_default():
cnn = CNN(image_size=FLAGS.image_size, class_count=len(Channel))
images, labels = load_data(
'data/test/data.csv',
batch_size=FLAGS.batch_size,
image_size=FLAGS.image_size,
class_count=len(Channel),
shuffle=False)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, keep_prob)
accuracy = cnn.accuracy(logits, labels)
saver = tf.train.Saver()
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init_op)
saver.restore(sess, os.path.join(LOG_DIR, 'model.ckpt'))
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
accuracy_value = sess.run(accuracy, feed_dict={keep_prob: 0.5})
print(f'test accuracy: {accuracy_value}')
coord.request_stop()
coord.join(threads)
if __name__ == '__main__':
main()
Mesurez la précision du modèle entraîné en trouvant le taux de réponse correct (précision) pour les données de test.
$ find data/test -name '*.jpg' | wc -l
1097
$ python test.py --batch_size 1097
test accuracy: 0.7657247185707092
Cette fois, le taux de réponse correcte était d'environ 76,6%. Si vous inférez au hasard, cela devrait être un quart, ou 25,0%, donc je pense que vous pouvez apprendre correctement, mais il semble y avoir place à l'amélioration de la précision.
inference.py
import numpy as np
import os
import sys
import tensorflow as tf
from cnn import CNN
from config import Channel, LOG_DIR
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('image_size', 48, 'Image size.')
def load_image(imagepath, image_size):
jpeg = tf.read_file(imagepath)
image = tf.image.decode_jpeg(jpeg, channels=3)
image = tf.cast(image, tf.float32)
image = tf.image.resize_images(image, [image_size, image_size])
image = tf.image.per_image_standardization(image)
return image
def print_results(imagepath, softmax):
os.system(f'imgcat {imagepath}')
mex_channel_name_length = max(len(channel.name) for channel in Channel)
for channel, value in zip(Channel, softmax):
print(f'{channel.name.ljust(mex_channel_name_length + 1)}: {value}')
print()
prediction = Channel(np.argmax(softmax)).name
for channel in Channel:
if channel.name in imagepath:
answer = channel.name
break
print(f'Devine: {prediction},Bonne réponse: {answer}')
Enfin, faites des inférences à l'aide du modèle entraîné. Regardez le résultat de la fonction softmax et utilisez la classe avec la plus grande valeur comme résultat de l'inférence.
À propos, il est distribué sur les pages iTerm2 et iTerm2 Images imgcat Vous pouvez utiliser (: //raw.githubusercontent.com/gnachman/iTerm2/master/tests/imgcat) pour afficher l'image telle quelle sur le terminal. C'est pratique car l'image d'entrée et le résultat de l'inférence peuvent être combinés et émis sur le terminal.
Sélectionnons quelques données de test et déduisons-les.
HikakinTV et HikakinGames ont un pourcentage élevé de réponses correctes, probablement en raison de la grande quantité de données.
En revanche, HIKAKIN et HikakinBlog ont un faible pourcentage de réponses correctes, probablement parce que le nombre de données est petit.
J'ai réduit les données de test à un seul canal et calculé le taux de réponse correct.
Canal | Nombre de données de test | Taux de réponse correct(%) |
---|---|---|
HIKAKIN | 50 | 20.0 |
HikakinBlog | 19 | 15.8 |
HikakinGames | 374 | 68.4 |
HikakinTV | 654 | 69.4 |
Eh bien, après tout, s'il y a peu de données, le taux de réponse correct est extrêmement faible.
En particulier, je pense qu'il peut être considérablement amélioré simplement en augmentant le nombre de données d'entraînement, je vais donc l'essayer à l'avenir.
Il existe d'innombrables documents de référence, nous ne sélectionnerons donc avec soin que ceux qui ont été particulièrement utiles. De plus, les documents officiels sont exclus.
Articles des dieux: priez :: scintille:
J'étais très ennuyé par la saisie de données TensorFlow. J'apprécie beaucoup les articles de mes prédécesseurs: priez :: sparkles:
Recommended Posts