Il semble que la prochaine méthode de R2D2 appelée R2D3 ait été annoncée. J'étais curieux, alors je l'ai implémenté.
Le code créé dans cet article est ci-dessous.
R2D3 est une méthode d'apprentissage par renforcement de la série DQN. Les explications techniques jusqu'à ce point sont expliquées dans la série suivante, alors n'hésitez pas à nous contacter.
Avec la méthode d'apprentissage améliorée annoncée par Google DeepMind en septembre 2019, En gros, c'est une méthode qui combine R2D2 et DQfD.
En gros, DQfD fait un meilleur apprentissage (base DQN) en se référant au jeu (démonstration) d'une bonne personne, et En fin de compte, c'est un moyen d'apprendre de meilleures performances qu'une démonstration.
À propos, le nom non abrégé semble être le DQN distribué de relecture récurrente à partir de démonstrations (R2D3).
·référence
Ce qui précède est la vue d'ensemble de R2D3. (* Extrait du journal) Le côté droit de la figure (partie violette et bleue) est le même que R2D2, la différence est la partie rouge sur le côté gauche.
Tout d'abord, comme prémisse, la relecture de la démo est fournie avec des données de lecture de référence à l'avance.
Jusqu'à R2D2, les données de lot utilisées pour la formation étaient créées pour la taille de lot en fonction des données de relecture de l'agent. R2D3 crée ces données de lot à partir de la relecture de la démo et de la relecture de l'agent en fonction du ratio de démo.
Dans l'article, nous avons comparé 1/16, 1/32, 1/64 1/128, 1/256 avec des valeurs fixes. Il a été déclaré que 1/256 est la partie la plus précise de la tâche.
À partir de maintenant, c'est mon avis, mais dans le sens humain réel, le jeu de démonstration est utile au début, mais à mesure que vous vous y habituez, vous ne le voyez pas. Donc, dans ma mise en œuvre, j'ai implémenté le démo-ratio ici afin qu'il puisse être recuit. (Le recuit sera le même que le cas fixe si vous modifiez le paramètre)
Tout d'abord, nous devons préparer les données pour la démonstration. Je l'ai créé en me référant au code de lecture manuelle fourni par OpenAI ci-dessous.
La structure de données à enregistrer est divisée en deux, une pour l'apprentissage et une pour la lecture, et a la forme suivante.
・ Pour l'apprentissage (enregistrer pour chaque image)
Nom | Contenu |
---|---|
action | action |
observation | Statut |
reward | Récompense |
done | Que ce soit fini |
・ Pour la lecture (informations générales)
Nom | Contenu |
---|---|
episode | Numéro d'épisode |
rgb_size | Taille de l'image |
states | Tableau de chaque information de trame(Contient les informations suivantes) |
・ Pour la lecture (enregistrer pour chaque image)
Nom | Contenu |
---|---|
step | Numéro de cadre |
reward_total | Récompense totale actuelle |
info | Informations sur le cadre(gym) |
rgb | image |
(Le code sera la fonction add_memory dans env_play.py)
Lorsque vous ajoutez des données de lecture de démonstration à la mémoire, vous devez suivre les mêmes étapes que l'agent réel pour les stocker. (Référence: [Explication de l'implémentation DQN (Rainbow)](https://qiita.com/pocokhc/items/408f0f818140924ad4c4#dqnrainbow%E3%81%AE%E5%AE%9F%E8%A3%85%E8%A7 % A3% E8% AA% AC))
C'est un peu redondant, mais nous allons créer le même mécanisme séparément et l'ajouter à la mémoire. Vous trouverez ci-dessous le flux avec un pseudo code. (Parce que c'est compliqué, il n'est pas décrit dans le cas du LSTM avec état)
add_memory
def add_memory(episode_file, memory, agent):
・ Épisode_Obtenir des informations de lecture de démonstration à partir d'un fichier
#Créer des variables pour la création de données empiriques
recent_actions =Tableau du nombre d'actions à enregistrer
recent_rewards =Gamme de récompenses à économiser
recent_rewards_multistep =Pour le calcul en plusieurs étapes
recent_observations =Tableau des situations à sauver
pour l'étape dans l'épisode:
observation =Informations sur le cadre[step]["observation"]
action =Informations sur le cadre[step]["action"]
reward =Informations sur le cadre[step]["reward"]
#Ajouter un statut
recent_observations.pop(0)
recent_observations.append(observation)
#Créez une expérience
exp = (
recent_observations[:agent.input_sequence], #État précédent
recent_actions[0], #Action dans l'état précédent
recent_rewards_multistep, #Récompense
recent_observations[-agent.input_sequence:]) #État suivant
)
#Ajouter de l'expérience à la mémoire
memory.add(exp)
#Ajouter une action et une récompense
recent_actions.pop(0)
recent_actions.append(action)
recent_rewards.pop(0)
recent_rewards.append(reward)
recent_rewards_multistep =Calcul d'apprentissage en plusieurs étapes
(Le code sera la classe EpisodeSave dans env_play.py)
C'est la classe que vous jouez réellement. Il a les fonctions suivantes.
L'écran est le suivant.
Le code à exécuter ressemble à ce qui suit.
import gym
from src.env_play import EpisodeSave
def run_play():
env = gym.make("MountainCar-v0")
processor = None #S'il y en a, spécifiez-le arbitrairement
es = EpisodeSave(
env,
episode_save_dir="tmp",
processor=processor
)
es.play()
env.close()
run_play()
La liaison de clé du jeu peut être spécifiée par le processeur. Si le processeur a une méthode get_keys_to_action, il sera chargé.
get_keys_to_action
import rl
class MyProcessor(rl.core.Processor):
def get_keys_to_action(self):
return {
():0, #0 si non appuyé
(ord('d'),):1, #La clé d est 1
(ord('a'),):2, #une clé est 2
}
(Le code sera la classe EpisodeReplay dans env_play.py)
J'ai également créé un mécanisme pour lire l'épisode enregistré par EpisodeSave. Principalement pour confirmation.
from src.env_play import EpisodeReplay
def replay():
r = EpisodeReplay(episode_save_dir="tmp")
r.play()
replay()
Comme d'habitude, nous l'implémenterons à partir de la version Rainbow, qui est facile à comprendre sans traitement parallèle.
Nom | Contenu |
---|---|
demo_memory | Type de mémoire(Similaire à la mémoire de relecture) |
demo_episode_dir | Chemin du répertoire enregistré par l'épisode Save ci-dessus |
demo_ratio_initial | taux initial de démo |
demo_ratio_final | Taux de l'état final de la démo |
demo_ratio_steps | Nombre d'étapes pour atteindre le taux final |
demo_memory peut être sélectionné à partir de ReplayMemory, PERGreedyMemory, PERProportionalMemory, PERRankBaseMemory ainsi que ReplayMemory Je peux le faire.
rainbow
def __init__(self):
(réduction)
# add_démo avec fonction mémoire_Ajout de la lecture de démonstration à la mémoire
add_memory(demo_episode_dir, self.demo_memory, self)
# demo_Définir des variables pour le recuit de rapport
self.demo_ratio_initial = demo_ratio_initial
if demo_ratio_final is None:
self.demo_ratio_final = self.demo_ratio_initial
else:
self.demo_ratio_final = demo_ratio_final
self.demo_ratio_step = (self.demo_ratio_initial - self.demo_ratio_final) / demo_ratio_steps
(réduction)
rainbow
import random
def forward(self, observation):
#C'est le moment au moment de l'apprentissage
(réduction)
#Calculer le ratio du ratio de démonstration
ratio_demo = self.demo_ratio_initial - self.local_step * self.demo_ratio_step
if ratio_demo < self.demo_ratio_final:
ratio_demo = self.demo_ratio_final
#Calculez le nombre de lots en fonction du ratio
batch_replay = 0
batch_demo = 0
for _ in range(self.batch_size):
r = random.random()
if r < ratio_demo:
batch_demo += 1
continue
batch_replay += 1
#Créer un lot en fonction du ratio
indexes = []
batchs = []
weights = []
memory_types = [] #Enregistrer le type de mémoire acquis
if batch_replay > 0:
(i, b, w) = self.memory.sample(batch_replay, self.local_step)
indexes.extend(i)
batchs.extend(b)
weights.extend(w)
#0 est la relecture_memory
memory_types.extend([0 for _ in range(batch_replay)])
if batch_demo > 0:
(i, b, w) = self.demo_memory.sample(batch_demo, self.local_step)
indexes.extend(i)
batchs.extend(b)
weights.extend(w)
#1 est une démo_memory
memory_types.extend([1 for _ in range(batch_demo)])
(réduction)
for i in range(self.batch_size):
(Apprentissage)
#Mettre à jour la priorité
if memory_types[i] == 0:
# replay_mettre à jour la mémoire
self.memory.update(indexes[i], batchs[i], priority)
elif memory_types[i] == 1:
# demo_mettre à jour la mémoire
self.demo_memory.update(indexes[i], batchs[i], priority)
else:
assert False
(réduction)
Il peut être implémenté exactement de la même manière que Rainbow. Mettez-le simplement du côté de l'apprenant.
EpisodeMemory
Ceci est ma propre implémentation. J'ai eu l'idée après avoir vu la démo jouer, mais si la démo est significative, pourquoi ne pas l'obtenir en apprenant? J'ai pensé.
Plus précisément, la pièce avec la récompense totale d'épisode la plus élevée est enregistrée séparément dans la mémoire. L'idée est de mélanger cela en un lot de la même manière que cette fois.
En tant qu'image, j'ai l'impression de me souvenir de la pièce qui a réussi et de la revoir plusieurs fois.
Voici la mise en œuvre.
Créez un EpisodeMemory qui inclut ReplayMemory. Il s'agit d'une classe wrapper qui ajoute de l'expérience à ReplayMemory sur une base par épisode.
EpisodeMemory
class EpisodeMemory():
def __init__(self, memory):
self.max_reward = None
self.memory = memory
def add_episode(self, episode, total_reward):
# max_Ajouter un épisode à la mémoire lorsque la récompense est mise à jour
if self.max_reward is None:
self.max_reward = total_reward
elif self.max_reward <= total_reward: #Ajouter à la mémoire même s'il est dans la même ligne
self.max_reward = total_reward
else:
return
#Traitement supplémentaire de la mémoire réelle
for e in episode_recent:
if len(e) == 5: #Traitement en cas de priorité
self.memory.add(e, e[4])
else:
self.memory.add(e)
Nom | Contenu |
---|---|
episode_memory | Type de mémoire(Similaire à la mémoire de relecture) |
episode_ratio | ÉpisodeMémoire |
rainbow
from src.memory.EpisodeMemory import EpisodeMemory
def __init__(self):
(réduction)
#Envelopper dans la classe EpisodeMemory
self.episode_memory = EpisodeMemory(episode_memory)
self.episode_ratio = episode_ratio
(réduction)
reset_states
#Appelé au début de l'épisode
def reset_states(self):
(réduction)
#Pour sauver l'expérience d'épisode
self.episode_exp = []
self.total_reward = 0
#Pour vérifier l'état de fin
self.recent_terminal = False
forward
#Appelé avant l'exécution de l'action à chaque étape
def forward(self, observation):
(réduction)
#Si c'est fini, épisode_Ajouter à la mémoire
if self.recent_terminal:
self.episode_memory.add_episode(self.episode_exp, self.total_reward)
(réduction)
exp =(Créer des données d'expérience)
self.memory.add(exp) # replay_Ajouter de la mémoire
self.episode_exp.append(exp) # episode_Ajouter de l'expérience pour la mémoire
(réduction)
backward
#Appelé après l'exécution de l'action à chaque étape
def backward(self, reward, terminal):
(réduction)
#Calculer l'expérience totale
self.total_reward += reward
#Enregistrer l'état de sortie
self.recent_terminal = terminal
(réduction)
rainbow
import random
def forward(self, observation):
#C'est le moment au moment de l'apprentissage
(réduction)
ratio_demo =(Calcul du ratio de démonstration)
# episode_Si la mémoire a de la mémoire, mélangez-la dans un lot
if len(self.episode_memory) < self.batch_size:
ratio_epi = 0
else:
ratio_epi = self.episode_ratio
#Calculez le nombre de lots en fonction du ratio
batch_replay = 0
batch_demo = 0
batch_episode = 0
for _ in range(self.batch_size):
r = random.random()
if r < ratio_demo:
batch_demo += 1
continue
r -= ratio_demo
if r < ratio_epi:
batch_episode += 1
continue
batch_replay += 1
#Créer un lot en fonction du ratio
indexes = []
batchs = []
weights = []
memory_types = [] #Enregistrer le type de mémoire acquis
if batch_replay > 0:
(replay_Création par lots de mémoire)
if batch_demo > 0:
(demo_Création par lots de mémoire)
if batch_episode > 0:
(i, b, w) = self.episode_memory.sample(batch_episode, self.local_step)
indexes.extend(i)
batchs.extend(b)
weights.extend(w)
# episode_la mémoire est 2
memory_types.extend([2 for _ in range(batch_episode)])
(réduction)
for i in range(self.batch_size):
(Apprentissage)
#Mettre à jour la priorité
if memory_types[i] == 0:
(replay_Mettre à jour la mémoire)
elif memory_types[i] == 1:
(demo_Mettre à jour la mémoire)
elif memory_types[i] == 2:
# episode_mettre à jour la mémoire
self.episode_memory.update(indexes[i], batchs[i], priority)
else:
assert False
(réduction)
C'est presque la même chose que l'implémentation sur Rainbow. Cependant, les données d'épisode sont créées pour chaque acteur, Il est géré du côté de l'apprenant pour réduire la quantité de communication entre les processus.
Learner
class Learner():
def __init__():
(réduction)
#Créer des variables pour la gestion des épisodes pour chaque acteur en initialisant l'apprenant
self.episode_exp = [ [] for _ in range(self.actors_num)]
self.total_reward = [ 0 for _ in range(self.actors_num)]
def train(self):
(réduction)
#Acteur → Ajouter de l'expérience à l'apprenant
for _ in range(self.exp_q.qsize()):
exp = self.exp_q.get(timeout=1)
# add memory
self.memory.add(exp[0], exp[0][4])
# add episode_exp
self.total_reward[exp[1]] += exp[0][2]
self.episode_exp[exp[1]].append(exp[0])
if exp[2]: # terminal
self.episode_memory.add_episode(
self.episode_exp[exp[1]],
self.total_reward[exp[1]]
)
self.episode_exp[exp[1]] = []
self.total_reward[exp[1]] = 0
(réduction)
Actor
class Actor():
def forward(self, observation):
(réduction)
#Envoyer à l'apprenant
# actor_Transmettez également les informations d'index et de terminal
self.exp_q.put((exp, self.actor_index, self.recent_terminal))
(réduction)
MountainCar
Cette fois, je vais l'essayer avec MountainCar.
Mountain Car est un jeu dans lequel vous déplacez la voiture à gauche et à droite pour viser le drapeau en haut à droite.
La récompense est toujours -1. En bref, plus tôt vous atteignez le drapeau, plus votre score est élevé.
Si vous pensez que c'est l'apprentissage Q, c'est une tâche que vous ne pouvez pas obtenir de récompenses tant que vous n'avez pas atteint l'objectif (on ne sait pas si c'est bon ou mauvais), et c'est une tâche plutôt difficile.
Processor est un entraînement MountainCar pur fourni par le gymnase sans définition. Le journal est acquis toutes les 2000 étapes. (C'est l'opération dans Rainbow)
Les 50 000 premiers pas sont réchauffés puis 100 000 fois sont entraînés. ·résultat
Il est difficile d'apprendre tant que le jeu qui a atteint l'objectif n'est pas stocké dans la mémoire dans une certaine mesure. Les résultats commencent à sortir autour de 130 000 pas.
Les paramètres autres que la mémoire DemoReplay sont les mêmes que les résultats précédents. La démo n'a préparé qu'un seul épisode ci-dessous.
Les paramètres de la mémoire DemoReplay sont les suivants.
demo_memory = PERProportionalMemory(100_000, alpha=0.8)
demo_episode_dir = episode_save_dir
demo_ratio_initial = 1.0
demo_ratio_final = 1.0/512.0
demo_ratio_steps = warmup + 50_000
·résultat
Les résultats ont déjà commencé à apparaître autour de 70 000.
Les paramètres autres que EpisodeMemory sont les mêmes que les résultats précédents. Les paramètres de EpisodeMemory sont les suivants.
episode_memory = PERProportionalMemory(2_000, alpha=0.8),
episode_ratio = 1.0/8.0,
·résultat
Les résultats commencent à sortir autour de 80 000.
La mémoire DemoReplay et EpisodeMemory sont valides. (L'axe des x n'inclut pas le nombre d'échauffements)
C'était plus facile à mettre en œuvre que prévu. Je pense que c'est une méthode très efficace car il n'y a pas beaucoup de tâches sans jeu de démonstration lors de l'apprentissage. L'évolution de l'apprentissage par renforcement ne s'est pas encore arrêtée. J'ai hâte de voir ce qui va suivre.
Recommended Posts