I have researched and implemented it on the Internet, so there are some parts that I have not fully understood yet. Please point out any falsehoods or outrages in the article.
This time, I implemented a reinforcement learning algorithm called Ape-X, which is the second best as of 2018.
Ape-X is a reinforcement learning algorithm announced in 2018, which is the second best reinforcement learning algorithm after R2D2. The treatise is here
Structurally ・ Double Deep Q Network ・ Multi-step bootstrap target ・ Dueling Network ・ Prioritized Experience Replay ・ Distributed learning no It is an algorithm that combines the four. I will explain in order
Double Deep Q Network (DDQN) For DDQN, I think this site is easy to understand. DQN had the drawback of overestimating itself because of one network. DDQN is an improved version of it, and unlike DQN, it uses two Q networks. There are two types: ** value calculation network ** and ** action decision network **. Let's actually look at the formula. The formula for the Q value of DQN (the formula for the value to be calculated) is
Q(S_{t},A_{t})=R_{t+1}+\gamma max_{a}(Q(S_{t+1},a))
$ \ gamma max_ {a} (Q (S_ {t}, a)) $: Situation $ S_ {t} $ multiplied by the discount rate $ \ gamma $
On the other hand, in DDQN
Q(S_{t},A_{t})=R_{t+1}+\gamma Q_{target}(S_{t},argmax_{a}Q(S_{t+1},a))
Use the formula. The second term on the right side has changed. I think a new Q network $ Q_ {target} $ is emerging. This is the value calculation network (in other words, $ Q $ is for action decision). First, the action decision network is used to determine the action for $ S_ {t + 1} $, and the reward that will be obtained as a result is predicted using the value calculation network.
multi-step bootstrap target This is an algorithm that learns by considering several consecutive experiences. Let's compare this with the formula as well. As I mentioned earlier, at DQN,
Q(S_{t},A_{t})=R_{t+1}+\gamma max_{a}(Q(S_{t+1},a))
Whereas the formula is used, the multi-step bootstrap target (considering n steps)
Q(S_{t},A_{t})=R_{t+1}+\gamma R_{t+2}+...+\gamma^{n-1}R_{t+n-1}+\gamma^{n} max_{a}(Q(S_{t+n},a))
Use the formula. (In the Ape-X treatise, n = 3 is used)
When you combine the expressions of Double Deep Q Network and multi-step bootstrap target,
Q(S_{t},A_{t})=R_{t+1}+\gamma R_{t+2}+...+\gamma^{n-1}R_{t+n-1}+\gamma^{n} Q_{target}(S_{t+n},argmax_{a}Q(S_{t+n},a))
This is the formula for finding the target value used in Ape-X.
Dueling Network For Queling Network, refer to This site. Dueling Network does not "determine the action directly from the state" like other DQN and DDQN, but "** identifies what the state is and decides the action considering it * It is an algorithm with the idea of "*". Specifically, the network will be as shown in the figure below.
First, after receiving the situation $ S_t $ and predicting it to some extent, the layer of ** state value function ** (quantifying the state) (left) and the layer of ** Advantage ** (temporary action decision) (right) ), And after making predictions for each, they are added at the end.
Prioritized Experience Replay The Japanese name for this is "Priorityed Experience Playback". This means prioritizing certain experiences (states, behaviors, resulting rewards, etc.) and learning the higher priorities. The priority in Ape-X is determined by ** TD error **. In particular,
P_{i}=|\delta_{i}|
It is defined like this.
In Ape-X, learning is divided into Leaner, Actor, and Memory, and learning is performed in parallel for each (see the figure below).
Actor: Specializing in behavior. Get a network from Leaner from time to time, gain a lot of experience using that network, prioritize it and send it to Memory Memory: Take experience from Actor and provide it to Leaner if needed Leaner: Specializing in learning. Receive a sample from Memory and learn based on it. During learning, update the priority of experience in the sample and update Memory.
The Actor in this is performing distributed learning. The idea is to increase the number of actors and generate more experience.
Normally, ε in ε-greedy is gradually reduced from 1, but this time it is fixed for each Actor. Also, ε of $ Actor_i $ can be calculated by the following formula.
\varepsilon_i=\varepsilon^{1+\frac{i}{N-1}\alpha}
(N is the number of Actors, 0_indexed) In the treatise, $ \ varepsilon $ = 0.4, $ \ alpha $ = 7.
Use Python's multiprocessing module for multiprocessing Also, use the following for communication between processes ・ Exp_queue (Actor → Memory for experience transmission) (multiprocessing.Queue) -Param_queue (Leaner-> Actor Leaner network weight sharing) (multiprocessing.Queue) -Actor_working (Actor operating status) (multiprocessing.Value)
Memory This time, I put Memory in Leaner, check new experiences from exp_queue as appropriate before learning, and store new information in Memory if any. Also, the priority is implemented using SumTree of this site (Simply speaking, SumTree uses a binary tree for prioritized sampling).
Basically the details are posted in the comments, so here I will explain only the flow Memory The Memory code isn't that long, so I'll post the full text.
Leaner.py
class Memory:#Prioritize and save your experience
def __init__(self,
capacity):
self.capacity=capacity
self.data=SumTree(self.capacity)
def length(self):
return self.data.write
def sample(self,num_samples):
data_length=self.data.total()
sampling_interbal=data_length/num_samples
batch=[] #The one to return
for i in range(num_samples):
l=sampling_interbal*i
r=sampling_interbal*(i+1)
p=random.uniform(l,r)
(idx,p_sample,data)=self.data.get(p)
# idx:Index on the binary tree you are using(Used when updating the priority)
# p_sample:Priority of the data (not actually used)
# data:data
batch.append([idx,p_sample,data])
return batch
def add(self,p,data): #p:Priority(TD error)
self.data.add(p,data)
def update_p(self,idx,p): #Update priority
self.data.update(idx,p)
add data to add: Memory with priority update_p: Update the priority of the specified data sample: Get a sample length: The size of the data (but this value loops so you can only trust the first lap)
Leaner Leaner / Actor is long, so it is a function unit
init Initialize the variable
Leaner.py
def __init__(self,
env, #Training environment
exp_queue, #Memory
param_queue, #For provision to Actor
num_actions, #Number of types of actions
exp_memory_size, #Memory limit(Memory is also managed by Leaner)
train_batch_size, #Batch size when learning
actor_working, #Whether the Actor is running
save_name, #Saved name
myenv=False, #Class or name
gamma=0.99, #Discount rate
update_target_interbal=100, #Network update frequency for value calculation
load_model_path=None,
window_length=3 #Number of frames to take into account
):
#Receive a variable as an argument
self.exp_queue=exp_queue
self.param_queue=param_queue
self.exp_memory_size=exp_memory_size
self.train_batch_size=train_batch_size
self.window_length=window_length
self.gamma=gamma
self.save_name=save_name
self.update_target_interbal=update_target_interbal
self.actor_working=actor_working
if myenv:
self.env=env()
else:
self.env=gym.make(env)
self.num_actions=num_actions
#Network definition
self.main_Q=self.build_network() #Q network for action decisions
self.target_Q=self.build_network() #Q network for value calculation
if load_model_path!=None:
self.main_Q=load_model(load_model_path)
self.target_Q.set_weights(self.main_Q.get_weights())
#Memory creation
self.memory=Memory(self.exp_memory_size)
#Put the first weight in the Queue
while not param_queue.full():
param_queue.put([self.main_Q.get_weights(),self.target_Q.get_weights()])
# print(param_queue.full())
return
The first chunk just stores the arguments it receives The next chunk defines a network for action determination and value calculation, and works to align the two weights (the build_network function is defined below). Next is the Memory definition, Finally, the param_queue is full for the Actor
build_network Build a network
Leaner.py
def build_network(self):
#Network construction
l_input=Input((1,)+self.env.observation_space.shape,name="Input_Leaner")
fltn=Flatten()(l_input)
dense=Dense(units=256,activation="relu")(fltn)
dense=Dense(units=256,activation="relu")(dense)
dense=Dense(units=256,activation="relu")(dense)
v=Dense(units=256,activation="relu")(dense)
v=Dense(units=1,activation="linear")(v)
adv=Dense(units=256,activation="relu")(dense)
adv=Dense(units=self.num_actions,activation="linear")(adv)
y=concatenate([v,adv])
l_output = Lambda(lambda a: K.expand_dims(a[:, 0], -1) + (a[:, 1:] - K.stop_gradient(K.mean(a[:,1:],keepdims=True))), output_shape=(self.num_actions,))(y)
model=Model(inputs=l_input,outputs=l_output)
model.compile(optimizer="Adam",
loss="mse",
metrics=["accuracy"])
return model
The first chunk is building the network and the second chunk is compiling the model There is a lambda expression in network construction, but I can't explain it because I don't understand it very much ... I'm sorry.
make_sample I am creating a sample from Memory
Leaner.py
def make_batch(self):
train_batch=self.memory.sample(self.train_batch_size)
train_batch=np.array(train_batch)
batch=train_batch[:,2] #Extract only data
#At this rate
#[[a_1,b_1,c_1],[a_2,b_2,c_2]...[a_n,b_n,c_n]]
#Because it is
#[[a_1,a_2,...,a_n],[b_1,b_2,...,b_n],[c_1,c_2,...,c_n]]
#To
#(By doing this, you can process at high speed because you can do it at once when inferring.)
rewards_batch=[] #Reward[t]
state_batch=[] #State[t]
state_n_batch=[] #State[t+n]
action_batch=[] #Action[n]
for i in batch:
rewards_batch.append(i[0])
state_batch.append(i[1][0])
state_n_batch.append(i[2][0])
action_batch.append(i[3])
rewards_batch=np.array(rewards_batch)
state_batch=np.array(state_batch)
state_n_batch=np.array(state_n_batch)
action_batch=np.array(action_batch)
#Teacher data creation
batch_size=len(state_batch)
y=self.main_Q.predict(state_batch,batch_size=batch_size)
#Make inferences in advance
main_predict=self.main_Q.predict(state_n_batch,batch_size=batch_size)
target_predict=self.target_Q.predict(state_n_batch,batch_size=batch_size)
for i in range(batch_size):
action=np.argmax(main_predict[i]) #Action selection using mainQ
q=target_predict[i][action] #Get Q value with targetQ
target=rewards_batch[i]+(self.gamma**self.window_length)*q #Q(State(t),Action(t))Value to be given as(Target value)
td_error=y[i][action_batch[i]]-target #Calculate TD error
self.memory.update_p(train_batch[i][0],abs(td_error)) #Update the fiber priority
y[i][action_batch[i]]=target #Substitute the target value for the teacher data
return state_batch,y
First of all, the first block fetches as much data as needed from Memory And the second chunk formats the data, and the third formats it into an np array. In the fourth, among the created arrays, the array corresponding to $ State_ {t + n} $ is put in $ Q $ and $ Q_ {target} $ to make inference. As you can see in the comments, this is because it is overwhelmingly faster to infer at once than to infer one by one (more than 3 times faster). And in the last chunk, we are creating teacher data based on the Ape-X formula.
run You will actually learn with Leaner
Leaner.py
def run(self):
t=0 #Total number of trials
while self.memory.length()<self.train_batch_size:
print("Leaner is waiting for enough experience to train")
while not self.exp_queue.empty(): #exp_Add all experience information in queue to Memory
batch=self.exp_queue.get()
self.memory.add(batch[4],batch)
time.sleep(5)
print("Leaner starts")
try:
while True:
#Exit if no Actor is running
working=False
for i in self.actor_working:
if i:
working=True
break
if not working:
break
#exp_Get experience from queue ・ Put in Memory
while not self.exp_queue.empty():
batch=self.exp_queue.get()
self.memory.add(batch[4],batch)
#Sample creation
X,y=self.make_batch()
X=np.array(X)
y=np.array(y)
if t%self.update_target_interbal==0: #Updated value calculation network
self.target_Q.set_weights(self.main_Q.get_weights())
#Update learning and weights for action decisions
self.main_Q.fit(X,y,epochs=1,verbose=0)
#Fill until the Queue is full
while not self.param_queue.full():
self.param_queue.put([self.main_Q.get_weights(),self.target_Q.get_weights()])
t+=1
except KeyboardInterrupt:
self.main_Q.save(self.save_name)
print("model has been saved with name'"+self.save_name+"'")
print("Learning was stopped by user")
return
self.main_Q.save(self.save_name)
print("model has been saved with name '"+self.save_name+"'")
print("Leaner finished")
return
At first, wait until the required number of experiences are gathered in Memory. Start in earnest when you accumulate. Start learning loop First, check if the Actor is running. If none is working, it ends Then, after storing the accumulated experience in exp_queue, we start learning. Get a sample with get_sample, update the weight of the value calculation network if necessary, and learn the action decision network. When the training is finished, the weight at that time is stored in param_queue. (The last is the implementation of Ctrl + C)
Actor init
Actor.py
def __init__(self,
env, #Gym env
exp_queue, #Mp for Memory.Queue
param_queue, #Mp for Leaner.Queue
nb_steps, #Number of learning
warmup_steps, #Number of times to act randomly
num_actions, #Number of types of actions
id_, #Identification ID
num_actors, #Total number of actors
actor_working, #Which Actor is working and which Actor is not
myenv=False, #If this is set to True, you can pass the class name to env
gamma=0.99, #Discount rate
max_epsilon=0.4,
alpha=7,
update_param_interbal=100, #How many times to try to update the parameter
visualize=False,
window_length=3 #Number of frames to take into account
):
if(myenv):
self.env=env()
else:
self.env=gym.make(env)
#Definition and assignment of various variables
self.num_actions=num_actions
self.exp_queue=exp_queue
self.param_queue=param_queue
self.id=id_
self.gamma=gamma
self.window_length=window_length
self.update_param_interbal=update_param_interbal
self.visualize=visualize
self.nb_steps=nb_steps
self.actor_working=actor_working
self.warmup_steps=warmup_steps
if self.warmup_steps>self.nb_steps:
raise ValueError("warmup_steps must be lower than nb_steps.")
#Calculation of ε
if num_actors <= 1:
self.epsilon = max_epsilon ** (1 + alpha)
else:
self.epsilon = max_epsilon ** (1 + id_/(num_actors-1)*alpha)
self.main_Q=self.build_network() #Action decision network
self.target_Q=self.build_network() #Value calculation network(Used only for TD error calculation)
#Get the weight of the early Leaner
while self.param_queue.empty():
time.sleep(2)
param=self.param_queue.get()
self.main_Q.set_weights(param[0])
self.target_Q.set_weights(param[1])
What you are doing is almost the same as Leaner.run (), so the explanation is omitted.
built_network This is almost the same as Leaner. However, the Actor does not learn, so it does not compile.
send_batch Process the received data and send it to exp_queue
Actor.py
def send_batch(self,batch):
#This part is Ape-Difficult to understand without understanding the formula for X's TD error
#Write down the name of the corresponding variable
length=len(batch)
#Cumulative reward
batch_rewards=0
for i in range(length):
batch_rewards+=(self.gamma**i)*batch[i][3]
#State[t]
batch_state=np.array([[batch[0][0]]])
#State[t+n]
batch_state_n=np.array([[batch[length-1][1]]])
#Action[t]
batch_action=batch[0][2]
#Calculation of TD error
action=np.argmax(self.main_Q.predict(batch_state_n)[0])
q=(self.gamma**length)*self.target_Q.predict(batch_state_n)[0][action]
td_error=batch_rewards+q+self.main_Q.predict(batch_state)[0][batch_action]
#exp_Creating content to send to the queue
send=[batch_rewards,batch_state,batch_state_n,batch_action,abs(td_error)]
#Send
self.exp_queue.put(send)
Now check the Ape-X formula again
Q(S_{t},A_{t})=R_{t+1}+\gamma R_{t+2}+...+\gamma^{n-1}R_{t+n-1}+\gamma^{n} Q_{target}(S_{t},argmax_{a}Q(S_t,a))
The variables used for this are ・ $ S_t $ ・ $ A_t $ ・ $ R_ {t + 1}, R_ {t + 2}, ..., R_ {t + n-1} $ ・ $ S_ {t + n} $ ・ $ \ Gamma $ is. Of these, $ \ gamma $ is also in Leaner, so we will send the remaining four as data. At that time, the reward will be a cumulative reward, taking into account the discount rate.
run
Actor.py
def run(self):
batch=[] #A buffer for accumulating some experience before passing it to Memory
t=0 #Total number of trials
epoch=0
self.actor_working[self.id]=True
print("Actor{} starts".format(self.id))
try:
while True:
state=self.env.reset()
done=False
epoch_reward=0
step=0
while not done and t<self.nb_steps:
#Decide what to do
action=self.main_Q.predict(np.array([[state]]))
action=np.argmax(action[0])
if t<self.warmup_steps or (self.epsilon>=random.random() and (not self.visualize)):
action=random.randrange(0,self.num_actions)
old_state=state
state,reward,done,info=self.env.step(action) #Actual action
epoch_reward+=reward
if not self.visualize: #Send experience if not for drawing
mini_batch=[old_state,state,action,reward] #Create a mini batch
batch.append([mini_batch]) #Newly stored in buffer
remove_list=[] #Data that has been accumulated and data has been transmitted(Delete later)
for i in range(len(batch)):
batch[i].append(mini_batch)
if(len(batch[i])>=self.window_length):
self.send_batch(batch[i]) #Sending experience
#I don't need it so I'll throw it away later
remove_list.append(i)
elif done:#Send all the contents of the buffer as no more continuous new experiences will occur
self.send_batch(batch[i]) #Sending experience
#I don't need it so I'll throw it away later
remove_list.append(i)
#If you delete it from the front, it seems to be a bug, so delete it from the back
remove_list.sort()
remove_list.reverse()
for i in remove_list:
batch.pop(i)
else: #visualize=If True, only draw
self.env.render()
time.sleep(1/30)
if t%self.update_param_interbal==0: #Update Q network weights
failed=0
while self.param_queue.empty():
if failed>20: #It's definitely finished because Leaner is down
print("Actor{} ended.".format(self.id))
self.actor_working[self.id]=False
return
print("Actor{} failed to get new param.".format(self.id))
failed+=1
time.sleep(5)
#Get weight
param=self.param_queue.get()
self.main_Q.set_weights(param[0])
self.target_Q.set_weights(param[1])
t+=1
step+=1
if done:
break
if t>=self.nb_steps:
break
epoch+=1
print("Actor{} episode:{} nb_step:{}({}) reward:{} step:{}".format(self.id,epoch,t,self.nb_steps,epoch_reward,step))
except KeyboardInterrupt:
print("Actor{} ended".format(self.id))
self.actor_working[self.id]=False
return
print("Actor{} ended.".format(self.id))
self.actor_working[self.id]=False
return
Unlike Leaner, Actors are supposed to act a specified number of times with termination conditions. As a flow,
The last 8 lines are the implementation of Ctrl + C
main.py
import multiprocessing as mp
from Actor import Actor
from Leaner import Leaner
import time
from keras.models import load_model
import ctypes
def A(exp_queue,param_queue,nb_steps,warmup_steps,num_actions,id,num_actors,actor_working):
if id==0:
actor=Actor("CartPole-v0",exp_queue,param_queue,nb_steps,warmup_steps,num_actions,id,num_actors,actor_working,max_epsilon=0.7)
else:
actor=Actor("CartPole-v0",exp_queue,param_queue,nb_steps,warmup_steps,num_actions,id,num_actors,actor_working,max_epsilon=0.7)
actor.run()
return
def L(exp_queue,param_queue,num_actions,memory_size,train_batch_size,actor_working,save_name):
leaner=Leaner("CartPole-v0",exp_queue,param_queue,num_actions,memory_size,train_batch_size,actor_working,save_name)
leaner.run()
return
if __name__=="__main__":
num_actors=3 #Number of Actors
num_actions=2 #Number of types of actions
nb_steps=10000 #Number of trials
warmup_steps=200 #Number of times to act randomly
memory_size=100000 #Memory limit
train_batch_size=32 #Number of data used by Leaner at one time
save_name="CartPole.h5" #The name when saving the file
#For interprocess communication
exp_queue=mp.Queue(5000)
param_queue=mp.Queue(int(num_actors))
actor_working=mp.Value(ctypes.c_uint*num_actors)
for i in range(num_actors):
actor_working[i]=False
#Parallel processing(Leaner)
ps=[]
ps.append(mp.Process(target=L, args=(exp_queue,param_queue,num_actions,memory_size,train_batch_size,actor_working,save_name)))![Something went wrong]()
ps[0].start()
try:
#Parallel processing(Actor)
for i in range(num_actors):
ps.append(mp.Process(target=A, args=(exp_queue,param_queue,nb_steps,warmup_steps,num_actions,i,num_actors,actor_working)))
ps[i+1].start()
time.sleep(1)
for p in ps:
print(p)
p.join()
except KeyboardInterrupt:
print("end")
This is the main function. Actor and Leaner are running in parallel using a module called multiprocessing.
It is the result of execution (the mouse is reflected ...)
... I haven't learned well. What's wrong? I would appreciate it if anyone could tell me.
The source code is available on github
Recommended Posts