Learn Pendulum-v0 with DDPG

Overview

In methods such as DQN, the Q (s, a) of each state was calculated by a policy, and the action that maximized the Q value was selected and acted, but this could only handle discrete actions. On the other hand, DDPG did not seek the action to maximize the Q value in order to correspond to the continuous action space, but responded by parameterizing the policy and outputting the action directly. Therefore, it is a decisive measure.

Implementation

Replay buffer

It is a familiar replay buffer in deep reinforcement learning. The current state, the action at that time, the next state, the immediate reward, and the terminal state are saved as one tuple.

from collections import deque, namedtuple
import random

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayBuffer(object):
    def __init__(self, capacity=1e6):
        self.capacity = capacity
        self.memory = deque([], maxlen=int(capacity))

    def append(self, *args):
        transition = Transition(*args)
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def reset(self):
        self.memory.clear()

    def length(self):
        return len(self.memory)

    def __len__(self):
        return len(self.memory)

model

In DDPG, there are Actor $ \ mu (s) $ which outputs the action continuously from the current state and Critic $ Q (s, a) $ which outputs the Q value from the current state and action. The initialization of the weights of each layer is in line with the original paper, so please check there for details (link below). What is characteristic is that there is a tanh in the final layer of the Actor, and when you receive an action in Critic, you receive it in the second layer. If you are experimenting with Pendulum, you may be able to write 2 in the output because the range of action is [-2, 2].

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

def init_weight(size):
    f = size[0]
    v = 1. / np.sqrt(f)
    return torch.tensor(np.random.uniform(low=-v, high=v, size=size), dtype=torch.float)

class ActorNetwork(nn.Module):
    def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-3):
        super(ActorNetwork, self).__init__()
        self.fc1 = nn.Linear(num_state[0], hidden1_size)
        self.fc2 = nn.Linear(hidden1_size, hidden2_size)
        self.fc3 = nn.Linear(hidden2_size, num_action[0])

        self.num_state = num_state
        self.num_action = num_action

        self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
        self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
        self.fc3.weight.data.uniform_(-init_w, init_w)

    def forward(self, x):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(h))
        y = torch.tanh(self.fc3(h))  #May I multiply by 2?
        return y

class CriticNetwork(nn.Module):
    def __init__(self, num_state, num_action, hidden1_size=400, hidden2_size=300, init_w=3e-4):
        super(CriticNetwork, self).__init__()
        self.fc1 = nn.Linear(num_state[0], hidden1_size)
        self.fc2 = nn.Linear(hidden1_size+num_action[0], hidden2_size)
        self.fc3 = nn.Linear(hidden2_size, 1)
        
        self.num_state = num_state
        self.num_action = num_action

        self.fc1.weight.data = init_weight(self.fc1.weight.data.size())
        self.fc2.weight.data = init_weight(self.fc2.weight.data.size())
        self.fc3.weight.data.uniform_(-init_w, init_w)

    def forward(self, x, action):
        h = F.relu(self.fc1(x))
        h = F.relu(self.fc2(torch.cat([h, action], dim=1)))
        y = self.fc3(h)
        return y

Agent

In the agent, when selecting an action, the action becomes decisive as it is, so add noise $ \ mathcal {N} $. The noise at this time is [Ornstein-Uhlenbeck process](https://ja.wikipedia.org/wiki/%E3%82%AA%E3%83%AB%E3%83%B3%E3%82%B7% E3% 83% A5% E3% 82% BF% E3% 82% A4% E3% 83% B3% EF% BC% 9D% E3% 82% A6% E3% 83% BC% E3% 83% AC% E3% 83% B3% E3% 83% 99% E3% 83% 83% E3% 82% AF% E9% 81% 8E% E7% A8% 8B). I don't know the details. Think of it as noise that approaches the average over time. I don't know.

a = \mu(s) + \mathcal{N}

For training each model, Critic updates the model by finding the gradient so as to minimize the TD error, similar to DQN. The loss function is as follows. N is the batch size.

L = \frac{1}{N} \sum_{i=1}^N (r_i + \gamma Q^{\prime}(s_{i+1}, \mu^{\prime}(s_{i+1})) - Q(s_i, a_i))^2

The Actor updates the model to maximize the Q value. Note that Loss will be negative because it will be maximized at this time. The objective function is as follows.

J = \frac{1}{N}\sum_{i=1}^N Q(s_{i}, \mu{s_i})

In the above objective function, the one with a dash is the target network. This is often used to stabilize learning. In DQN etc., this target network is updated every few epochs, whereas in DDPG, the hyperparameter $ \ tau (\ ll 1) $ is used.

\theta \gets \tau \theta + (1 - \tau) \theta^{\prime}

It will be updated slowly, like. This stabilizes the learning, but it seems that the learning time will be slightly longer.

import torch
import torch.nn.functional as F
import numpy as np
import copy

class OrnsteinUhlenbeckProcess:
    def __init__(self, theta=0.15, mu=0.0, sigma=0.2, dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.size = size
        self.num_steps = 0

        self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)

        if sigma_min is not None:
            self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
            self.c = sigma
            self.sigma_min = sigma_min
        else:
            self.m = 0
            self.c = sigma
            self.sigma_min = sigma

    def current_sigma(self):
        sigma = max(self.sigma_min, self.m * float(self.num_steps) + self.c)
        return sigma

    def sample(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma() * np.sqrt(self.dt) * np.random.normal(size=self.size)
        self.x_prev = x
        self.num_steps += 1
        return x


class DDPG:
    def __init__(self, actor, critic, optimizer_actor, optimizer_critic, replay_buffer, device, gamma=0.99, tau=1e-3, epsilon=1.0, batch_size=64):
        self.actor = actor
        self.critic = critic
        self.actor_target = copy.deepcopy(self.actor)
        self.critic_target = copy.deepcopy(self.critic)
        self.optimizer_actor = optimizer_actor
        self.optimizer_critic = optimizer_critic
        self.replay_buffer = replay_buffer
        self.device = device
        self.gamma = gamma
        self.tau = tau
        self.epsilon = epsilon
        self.batch_size = batch_size
        self.random_process = OrnsteinUhlenbeckProcess(size=actor.num_action[0])

        self.num_state = actor.num_state
        self.num_action = actor.num_action

    def add_memory(self, *args):
        self.replay_buffer.append(*args)
    
    def reset_memory(self):
        self.replay_buffer.reset()

    def get_action(self, state, greedy=False):
        state_tensor = torch.tensor(state, dtype=torch.float, device=self.device).view(-1, *self.num_state)
        action = self.actor(state_tensor)
        if not greedy:
            action += self.epsilon*torch.tensor(self.random_process.sample(), dtype=torch.float, device=self.device)

        return action.squeeze(0).detach().cpu().numpy()

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return None
        transitions = self.replay_buffer.sample(self.batch_size)
        batch = Transition(*zip(*transitions))

        state_batch = torch.tensor(batch.state, device=self.device, dtype=torch.float)
        action_batch = torch.tensor(batch.action, device=self.device, dtype=torch.float)
        next_state_batch = torch.tensor(batch.next_state, device=self.device, dtype=torch.float)
        reward_batch = torch.tensor(batch.reward, device=self.device, dtype=torch.float).unsqueeze(1)
        not_done = np.array([(not done) for done in batch.done])
        not_done_batch = torch.tensor(not_done, device=self.device, dtype=torch.float).unsqueeze(1)

        # need to change
        qvalue = self.critic(state_batch, action_batch)
        next_qvalue = self.critic_target(next_state_batch, self.actor_target(next_state_batch))
        target_qvalue = reward_batch + (self.gamma * next_qvalue * not_done_batch) 

        critic_loss = F.mse_loss(qvalue, target_qvalue)
        self.optimizer_critic.zero_grad()
        critic_loss.backward()
        self.optimizer_critic.step()

        actor_loss = -self.critic(state_batch, self.actor(state_batch)).mean()
        self.optimizer_actor.zero_grad()
        actor_loss.backward()
        self.optimizer_actor.step()

        # soft parameter update
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)

Learning

There is nothing new about this. Like other reinforcement learning algorithms, it receives state from the environment and makes you feel like you are acting and learning. Each hyperparameter is in line with the original paper. (Perhaps)

import torch
import torch.optim as optim
import gym

max_episodes = 300
memory_capacity = 1e6  #Buffer capacity
gamma = 0.99  #Discount rate
tau = 1e-3  #Target update rate
epsilon = 1.0  #If you want to mess with the amount of noise, you probably don't need it
batch_size = 64
lr_actor = 1e-4
lr_critic = 1e-3
logger_interval = 10
weight_decay = 1e-2

env = gym.make('Pendulum-v0')
num_state = env.observation_space.shape
num_action = env.action_space.shape
max_steps = env.spec.max_episode_steps

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

actorNet = ActorNetwork(num_state, num_action).to(device)
criticNet = CriticNetwork(num_state, num_action).to(device)
optimizer_actor = optim.Adam(actorNet.parameters(), lr=lr_actor)
optimizer_critic = optim.Adam(criticNet.parameters(), lr=lr_critic, weight_decay=weight_decay)
replay_buffer = ReplayBuffer(capacity=memory_capacity)
agent = DDPG(actorNet, criticNet, optimizer_actor, optimizer_critic, replay_buffer, device, gamma, tau, epsilon, batch_size)

for episode in range(max_episodes):
    observation = env.reset()
    total_reward = 0

    for step in range(max_steps):
        action = agent.get_action(observation)
        next_observation, reward, done, _ = env.step(action)
        total_reward += reward
        agent.add_memory(observation, action, next_observation, reward, done)

        agent.train()

        observation = next_observation

        if done:
            break

    if episode % logger_interval == 0:
        print("episode:{} total reward:{}".format(episode, total_reward))

for episode in range(3):
    observation = env.reset()
    env.render()
    for step in range(max_steps):
        action = agent.get_action(observation, greedy=True)
        next_observation, reward, done, _ = env.step(action)
        observation = next_observation
        env.render()

        if done:
            break

env.close()

If you change the environment of gym.make, you should be able to study in other environments.

result

A graph of cumulative rewards and learning episodes. I think you are learning in a good way. Figure_1-1.png It's been built properly. Great. (Gif is not posted because I don't know how to make it)

in conclusion

Basically, if you execute the above code as it is in one file, you can check the operation. Since the implementation was full, some extra variables etc. remain. With Pendulum-v0, you can study with only cpu even if you do not have a GPU available environment. However, learning may become unstable a little, so please try again in that case. If there is an opportunity, we will implement other methods.

References

Continuous Control with Deep Reinforcement Learning

Recommended Posts

Learn Pendulum-v0 with DDPG
Learn Python with ChemTHEATER
Learn Zundokokiyoshi with LSTM
Learn Pandas with Cheminformatics
Learn with chemoinformatics scikit-learn
Learn with Cheminformatics Matplotlib
Learn with Cheminformatics NumPy
DCGAN with TF Learn
Learn librosa with a tutorial 1
Learn elliptical orbits with Chainer
Learn new data with PaintsChainer
Learn algorithms with Go @ recursive call
Learn with Causal ML Package Meta-Learner
Learn with FizzBuzz Iterator, Generator, Decorator
Learn with PyTorch Graph Convolutional Networks
[TensorFlow 2] Learn RNN with CTC Loss
Learn search with Python # 2bit search, permutation search
Learn document categorization with spaCy CLI
Getting Started with python3 # 1 Learn Basic Knowledge
Learn to colorize monochrome images with Chainer
Learn data distributed with TensorFlow Y = 2X
Learn Python! Comparison with Java (basic function)
Learn with Splatoon nervous breakdown! Graph theory
"How to pass PATH" to learn with homebrew
Preparing to learn technical indicators with TFlearn
Learn the design pattern "Facade" with Python