Dans des méthodes telles que DQN, les Q (s, a) de chaque état étaient calculés par une politique, et l'action qui maximisait la valeur Q était sélectionnée et agie, mais cela ne pouvait gérer que des actions discrètes. D'autre part, DDPG a répondu en paramétrant la politique et en sortant l'action directement, au lieu de rechercher l'action qui maximise la valeur Q afin de correspondre à l'espace d'action continue. C'est donc une mesure décisive.
C'est un tampon de relecture familier dans l'apprentissage par renforcement profond. L'état actuel, l'action à ce moment-là, l'état suivant, la récompense immédiate et l'état terminal sont enregistrés sous la forme d'un taple.
from collections import deque, namedtuple
import random
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer(object):
def __init__(self, capacity=1e6):
self.capacity = capacity
self.memory = deque([], maxlen=int(capacity))
def append(self, *args):
transition = Transition(*args)
self.memory.append(transition)
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def reset(self):
self.memory.clear()
def length(self):
return len(self.memory)
def __len__(self):
return len(self.memory)
Dans DDPG, il y a Actor $ \ mu (s) $ qui sort l'action en continu à partir de l'état actuel et Critic $ Q (s, a) $ qui sort la valeur Q de l'état et de l'action actuels. L'initialisation des poids de chaque couche est conforme au papier d'origine, veuillez donc vérifier ici pour plus de détails (lien ci-dessous). Ce qui est caractéristique, c'est qu'il y a un tanh dans la couche finale de l'acteur, et lors de la réception d'une action dans Critic, il est reçu dans la deuxième couche. Si vous expérimentez avec Pendulum, vous pourrez peut-être écrire 2 dans la sortie car la plage d'action est [-2, 2].
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
def init_weight(size):
f = size[0]
v = 1. / np.sqrt(f)
return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)
class ActorNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
super(ActorNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size, hidden2_size)
self.fc3 = nn.Linear(hidden2_size, num_action[0])
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
y = torch.tanh(self.fc3(h)) #Puis-je multiplier par 2?
return y
class CriticNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
super(CriticNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
self.fc3 = nn.Linear(hidden2_size, 1)
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x, action):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
y = self.fc3(h)
return y
Dans l'agent, lors de la sélection d'une action, l'action devient décisive telle quelle, alors ajoutez du bruit $ \ mathcal {N} $. Le bruit à ce moment est [processus Ornstein-Woolenbeck](https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AB%E3%83%B3%E3%82%B7% E3% 83% A5% E3% 82% BF% E3% 82% A4% E3% 83% B3% EF% BC% 9D% E3% 82% A6% E3% 83% BC% E3% 83% AC% E3% 83% B3% E3% 83% 99% E3% 83% 83% E3% 82% AF% E9% 81% 8E% E7% A8% 8B). Je ne connais pas les détails. Considérez-le comme un bruit qui s'approche de la moyenne au fil du temps. Je ne sais pas.
a = \mu(s) + \mathcal{N}
Pour l'apprentissage de chaque modèle, Critic met à jour le modèle en trouvant le gradient afin de minimiser l'erreur TD, similaire à DQN. La fonction de perte est la suivante. N est la taille du lot.
L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2
L'acteur met à jour le modèle pour maximiser la valeur Q. Notez que la perte sera négative car elle sera maximisée à ce moment. La fonction objectif est la suivante.
J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})
Dans la fonction objectif ci-dessus, celui avec un tiret est le réseau cible. Ceci est souvent utilisé pour stabiliser l'apprentissage. Dans DQN etc., ce réseau cible est mis à jour toutes les quelques époques, alors que dans DDPG, l'hyper paramètre $ \ tau (\ ll 1) $ est utilisé.
\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}
Il sera mis à jour lentement, comme. Cela stabilise l'apprentissage, mais il semble que le temps d'apprentissage sera légèrement plus long.
import torch
import torch.nn.functional as F
import numpy as np
import copy
class OrnsteinUhlenbeckProcess:
def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.size = size
self.num_steps = 0
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0
self.c = sigma
self.sigma_min = sigma
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
return sigma
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.num_steps += 1
return x
class DDPG:
def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
self.actor = actor
self.critic = critic
self.actor_target = copy.deepcopy(self.actor)
self.critic_target = copy.deepcopy(self.critic)
self.optimizer_actor = optimizer_actor
self.optimizer_critic = optimizer_critic
self.replay_buffer = replay_buffer
self.device = device
self.gamma = gamma
self.tau = tau
self.epsilon = epsilon
self.batch_size = batch_size
self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])
self.num_state = actor.num_state
self.num_action = actor.num_action
def add_memory(self, *args):
self.replay_buffer.append(*args)
def reset_memory(self):
self.replay_buffer.reset()
def get_action(self, state, greedy=False):
state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
action = self.actor(state_tensor)
if not greedy:
action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)
return action.squeeze(0).detach().cpu().numpy()
def train(self):
if len(self.replay_buffer) < self.batch_size:
return None
transitions = self.replay_buffer.sample(self.batch_size)
batch = Transition(*zip(*transitions))
state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
not_done = np.array([(not done) for done in batch.done])
not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)
# need to change
qvalue = self.critic(state_batch, action_batch)
next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch)
critic_loss = F.mse_loss(qvalue, target_qvalue)
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
self.optimizer_actor.zero_grad()
actor_loss.backward()
self.optimizer_actor.step()
# soft parameter update
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
Il n'y a rien de nouveau à ce sujet. Comme d'autres algorithmes d'apprentissage par renforcement, il reçoit l'état de l'environnement et vous donne l'impression d'agir et d'apprendre. Chaque hyper paramètre est conforme au papier d'origine. (Peut-être)
import torch
import torch.optim as optim
import gym
max_episodes = 300
memory_capacity = 1e6 #Capacité tampon
gamma = 0.99 #Taux de remise
tau = 1e-3 #Taux de mise à jour cible
epsilon = 1.0 #Si vous voulez gâcher la quantité de bruit, vous n'en avez probablement pas besoin
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2
env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)
for episode in range(max_episodes):
observation = env.reset()
total_reward = 0
for step in range(max_steps):
action = agent.get_action(observation)
next_observation, reward, done, _ = env.step(action)
total_reward += reward
agent.add_memory(observation, action, next_observation, reward, done)
agent.train()
observation = next_observation
if done:
break
if episode % logger_interval == 0:
print("episode:{} total reward:{}".format(episode, total_reward))
for episode in range(3):
observation = env.reset()
env.render()
for step in range(max_steps):
action = agent.get_action(observation, greedy=True)
next_observation, reward, done, _ = env.step(action)
observation = next_observation
env.render()
if done:
break
env.close()
Si vous modifiez l'environnement de gym.make, vous devriez pouvoir étudier dans d'autres environnements.
C'est un graphique des récompenses cumulées et des épisodes d'apprentissage. Je pense que vous apprenez d'une bonne manière. Il a été construit correctement. Génial. (Gif n'est pas posté car je ne sais pas comment le faire)
En gros, si vous exécutez le code ci-dessus tel quel dans un fichier, vous pouvez vérifier l'opération. Comme l'implémentation était complète, certaines variables supplémentaires, etc. subsistent. Avec Pendulum-v0, vous ne pouvez apprendre qu'avec le processeur, même si vous n'êtes pas dans un environnement où le GPU est disponible. Cependant, l'apprentissage peut devenir un peu instable, veuillez donc réessayer. S'il y a une opportunité, nous mettrons en œuvre d'autres méthodes.
Continuous Control with Deep Reinforcement Learning
Recommended Posts