In methods such as DQN, the Q (s, a) of each state was calculated by a policy, and the action that maximized the Q value was selected and acted, but this could only handle discrete actions. On the other hand, DDPG did not seek the action to maximize the Q value in order to correspond to the continuous action space, but responded by parameterizing the policy and outputting the action directly. Therefore, it is a decisive measure.
It is a familiar replay buffer in deep reinforcement learning. The current state, the action at that time, the next state, the immediate reward, and the terminal state are saved as one tuple.
from collections import deque, namedtuple
import random
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))
class ReplayBuffer(object):
def __init__(self, capacity=1e6):
self.capacity = capacity
self.memory = deque([], maxlen=int(capacity))
def append(self, *args):
transition = Transition(*args)
self.memory.append(transition)
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def reset(self):
self.memory.clear()
def length(self):
return len(self.memory)
def __len__(self):
return len(self.memory)
In DDPG, there are Actor $ \ mu (s) $ which outputs the action continuously from the current state and Critic $ Q (s, a) $ which outputs the Q value from the current state and action. The initialization of the weights of each layer is in line with the original paper, so please check there for details (link below). What is characteristic is that there is a tanh in the final layer of the Actor, and when you receive an action in Critic, you receive it in the second layer. If you are experimenting with Pendulum, you may be able to write 2 in the output because the range of action is [-2, 2].
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
def init_weight(size):
f = size[0]
v = 1. / np.sqrt(f)
return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)
class ActorNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
super(ActorNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size, hidden2_size)
self.fc3 = nn.Linear(hidden2_size, num_action[0])
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(h))
y = torch.tanh(self.fc3(h)) #May I multiply by 2?
return y
class CriticNetwork(nn.Module):
def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
super(CriticNetwork, self).__init__()
self.fc1 = nn.Linear(num_state[0], hidden1_size)
self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
self.fc3 = nn.Linear(hidden2_size, 1)
self.num_state = num_state
self.num_action = num_action
self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
self.fc3.weight.data.uniform_(-init_w, init_w)
def forward(self, x, action):
h = F.relu(self.fc1(x))
h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
y = self.fc3(h)
return y
In the agent, when selecting an action, the action becomes decisive as it is, so add noise $ \ mathcal {N} $. The noise at this time is [Ornstein-Uhlenbeck process](https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AB%E3%83%B3%E3%82%B7% E3% 83% A5% E3% 82% BF% E3% 82% A4% E3% 83% B3% EF% BC% 9D% E3% 82% A6% E3% 83% BC% E3% 83% AC% E3% 83% B3% E3% 83% 99% E3% 83% 83% E3% 82% AF% E9% 81% 8E% E7% A8% 8B). I don't know the details. Think of it as noise that approaches the average over time. I don't know.
a = \mu(s) + \mathcal{N}
For training each model, Critic updates the model by finding the gradient so as to minimize the TD error, similar to DQN. The loss function is as follows. N is the batch size.
L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2
The Actor updates the model to maximize the Q value. Note that Loss will be negative because it will be maximized at this time. The objective function is as follows.
J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})
In the above objective function, the one with a dash is the target network. This is often used to stabilize learning. In DQN etc., this target network is updated every few epochs, whereas in DDPG, the hyperparameter $ \ tau (\ ll 1) $ is used.
\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}
It will be updated slowly, like. This stabilizes the learning, but it seems that the learning time will be slightly longer.
import torch
import torch.nn.functional as F
import numpy as np
import copy
class OrnsteinUhlenbeckProcess:
def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.size = size
self.num_steps = 0
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0
self.c = sigma
self.sigma_min = sigma
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
return sigma
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.num_steps += 1
return x
class DDPG:
def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
self.actor = actor
self.critic = critic
self.actor_target = copy.deepcopy(self.actor)
self.critic_target = copy.deepcopy(self.critic)
self.optimizer_actor = optimizer_actor
self.optimizer_critic = optimizer_critic
self.replay_buffer = replay_buffer
self.device = device
self.gamma = gamma
self.tau = tau
self.epsilon = epsilon
self.batch_size = batch_size
self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])
self.num_state = actor.num_state
self.num_action = actor.num_action
def add_memory(self, *args):
self.replay_buffer.append(*args)
def reset_memory(self):
self.replay_buffer.reset()
def get_action(self, state, greedy=False):
state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
action = self.actor(state_tensor)
if not greedy:
action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)
return action.squeeze(0).detach().cpu().numpy()
def train(self):
if len(self.replay_buffer) < self.batch_size:
return None
transitions = self.replay_buffer.sample(self.batch_size)
batch = Transition(*zip(*transitions))
state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
not_done = np.array([(not done) for done in batch.done])
not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)
# need to change
qvalue = self.critic(state_batch, action_batch)
next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch)
critic_loss = F.mse_loss(qvalue, target_qvalue)
self.optimizer_critic.zero_grad()
critic_loss.backward()
self.optimizer_critic.step()
actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
self.optimizer_actor.zero_grad()
actor_loss.backward()
self.optimizer_actor.step()
# soft parameter update
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
There is nothing new about this. Like other reinforcement learning algorithms, it receives state from the environment and makes you feel like you are acting and learning. Each hyperparameter is in line with the original paper. (Perhaps)
import torch
import torch.optim as optim
import gym
max_episodes = 300
memory_capacity = 1e6 #Buffer capacity
gamma = 0.99 #Discount rate
tau = 1e-3 #Target update rate
epsilon = 1.0 #If you want to mess with the amount of noise, you probably don't need it
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2
env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)
for episode in range(max_episodes):
observation = env.reset()
total_reward = 0
for step in range(max_steps):
action = agent.get_action(observation)
next_observation, reward, done, _ = env.step(action)
total_reward += reward
agent.add_memory(observation, action, next_observation, reward, done)
agent.train()
observation = next_observation
if done:
break
if episode % logger_interval == 0:
print("episode:{} total reward:{}".format(episode, total_reward))
for episode in range(3):
observation = env.reset()
env.render()
for step in range(max_steps):
action = agent.get_action(observation, greedy=True)
next_observation, reward, done, _ = env.step(action)
observation = next_observation
env.render()
if done:
break
env.close()
If you change the environment of gym.make, you should be able to study in other environments.
A graph of cumulative rewards and learning episodes. I think you are learning in a good way. It's been built properly. Great. (Gif is not posted because I don't know how to make it)
Basically, if you execute the above code as it is in one file, you can check the operation. Since the implementation was full, some extra variables etc. remain. With Pendulum-v0, you can study with only cpu even if you do not have a GPU available environment. However, learning may become unstable a little, so please try again in that case. If there is an opportunity, we will implement other methods.
Continuous Control with Deep Reinforcement Learning
Recommended Posts