I tried to solve the multi-agent tracking problem with python. As a textbook ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural networks, genetic algorithms, reinforcement learning-Institute of Electrical Engineers of Japan GA Neuro-based learning methods and their application research We used the expert committee / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning and its algorithms).
Structure of this article
Learn the best behavior to capture $ 1 $ targets with $ 2 $ agents. The state in which the agent sandwiches the target is defined as the captured state. "Pinching" means that the agent exists at $ 2 $ in the vicinity of $ 4 $ of the target.
The detailed rules are shown below. ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural networks, genetic algorithms, reinforcement learning-Institute of Electrical Engineers of Japan GA Neuro-based learning methods and their application research experts The rule of meeting / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning and its algorithm) is adopted.
Update the $ Q $ value with the following formula.
Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]
$ Q (s_t, a) $ represents the value of the action $ a $ in a certain state $ s_t $. The action $ a $ in the state $ s_t $ gives the reward $ r_ {t + 1} $. When updating the $ Q $ value, the latest reward $ r_ {t + 1} $ is added to the maximum value obtained by the action $ a'$ in the next state $ s_ {t + 1} $. Add the product multiplied by gamma $. This means that the action in a certain state is selected by considering not only the latest reward but also the value in the subsequent state. $ \ Gamma $ is called the discount rate, and $ \ alpha $ is called the learning rate.
I will put it separately in the code and the point.
I implemented it as follows.
.
├── main.py
├── model
│ ├── agent.py
│ ├── area.py
│ ├── movingobject.py
│ ├── target.py
└── service
└── gameservice.py
movingobject.py
import numpy
import random
import copy
class MovingObject:
def __init__(self, area):
self.state = self.__random_state(area)
self.actions = self.__create_actions(area.shape)
self.moves = self.__create_moves()
# public method
def reset_state(self, area):
self.state = self.__random_state(area)
def get_act_index(self, epsilon):
return self.__select_action(epsilon)
def move_state(self, act_index):
return self.state + self.moves.get(act_index)
# private method
def __random_state(self, area):
h, w = area.shape
mergin = area.mergin
while True:
y = numpy.random.randint(mergin, h - mergin)
x = numpy.random.randint(mergin, w - mergin)
if area.state[y, x] == 0:
break
return numpy.array([y, x])
def __create_actions(self, area_shape):
actions = []
h, w = area_shape
for j in range(h):
actions.append([])
for i in range(w):
action_org = [0, 1, 2, 3]
action = self.__remove_actions(action_org, h, w, j, i)
actions[j].append(action)
return numpy.array(actions)
def __remove_actions(self, action_org, h, w, j, i):
action = copy.deepcopy(action_org)
mergin = 2
if j == mergin:
action.remove(3)
if j == h - mergin - 1:
action.remove(1)
if i == mergin:
action.remove(0)
if i == w - mergin - 1:
action.remove(2)
return action
def __create_moves(self):
# 0->left, 1->down, 2-> right, 3->up
return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}
def __select_action(self, epsilon):
y, x = self.state
action = self.actions[y, x]
if numpy.random.rand() > epsilon:
return self.__greedy_action(action)
else:
return self.__random_action(action)
def __greedy_action(self, action):
sy, sx = self.around
qmax = self.q[action, sy, sx].max()
indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
random.shuffle(indexes)
return action[indexes[0]]
def __random_action(self, action):
act_indexes = copy.deepcopy(action)
random.shuffle(act_indexes)
return act_indexes[0]
tartget.py
from movingobject import *
class Target(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 127
# public method
def act(self, area):
_y, _x = self.state
if self.__check_catched(area, _y, _x) == False:
area.state[_y, _x] = 0
while True:
act_index = MovingObject.get_act_index(self, 1.0)
y, x = MovingObject.move_state(self, act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y ,x])
area.state[y, x] = self.value
break
# private method
def __check_catched(self, area, _y, _x):
check = self.__is_surrounded(area)
check *= self.__is_atcorners(area, _y, _x)
return check
def __is_surrounded(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
def __is_atcorners(self, area, _y, _x):
h, w = area.shape
mergin = 2
c = _y == mergin or _y == h - mergin - 1
c *= _x == mergin or _x == w - mergin - 1
return c
agent.py
from movingobject import *
class Agent(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 255
self.q = self.__creat_q_table()
self.around = numpy.zeros(2).astype('int')
self.act_index = -1
self.reward = 0
# public method
def lookout(self, area):
y, x = self.state
self.around = self.__get_object_states(area, y, x)
def act(self, area, epsilon):
_y, _x = self.state
area.state[_y, _x] = 0
if self.__is_alone():
epsilon = 1.0
self.act_index = MovingObject.get_act_index(self, epsilon)
y, x = MovingObject.move_state(self, self.act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y, x])
area.state[y, x] = self.value
else:
area.state[_y, _x] = self.value
def update_q(self, area, alpha, gamma):
sy, sx = self.around
self.reward = self.__get_reward(area)
act_index = self.act_index
q = self.q[act_index, sy, sx]
self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)
def save_q(self, fname):
numpy.save(fname, self.q)
# private method
def __creat_q_table(self):
q = numpy.zeros((4, 26, 26))
return q
def __get_object_states(self, area, y, x):
mergin = area.mergin
around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
t_state = numpy.argwhere(around == 127)[:, 1]
_a_state = numpy.argwhere(around == 255)[:, 1]
a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
if numpy.array_equal(t_state, numpy.array([])):
t_state = numpy.array([25])
if numpy.array_equal(a_state, numpy.array([])):
a_state = numpy.array([25])
return numpy.r_[t_state, a_state]
def __is_alone(self):
return numpy.array_equal(self.around, numpy.array([25, 25]))
def __get_reward(self, area):
return 3 if self.__is_surrounding(area) else -1
def __is_surrounding(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
check += numpy.array_equal(a_state.mean(axis = 0), t_state)
return check
def __get_max_q(self):
y, x = self.state
actions = self.actions[y, x]
sy, sx = self.around
_sy, _sx = self.__next_around(sy, sx)
return self.q[actions, _sy, _sx].max()
def __next_around(self, sy, sx):
act_index = self.act_index
_sy = self.__next_state(act_index, sy)
_sx = self.__next_state(act_index, sx)
return numpy.array([_sy, _sx])
def __next_state(self, act_index, z):
if z != 25:
if act_index == 0 and (z + 1) % 5 != 0:
z += 1
elif act_index == 1 and z / 5 != 0:
z -= 5
elif act_index == 2 and z % 5 != 0:
z -= 1
elif act_index == 3 and z / 5 != 4:
z += 5
else:
z = 25
return z
area.py
import numpy
class Area:
def __init__(self, shape, mergin):
self.shape = shape
self.mergin = mergin
self.state = self.__init_state()
# public method
def update_state(self, mvobj):
y, x = mvobj.state
self.state[y, x] = mvobj.value
def reset_state(self):
self.state = self.__init_state()
# private method
def __init_state(self):
return numpy.zeros(self.shape).astype('uint8')
gameservice.py
from model.area import Area
from model.agent import Agent
from model.target import Target
class GameService:
def __init__(self):
'SERVICE'
def construct(self, area_shape, mergin):
area = Area(area_shape, mergin)
agent1 = Agent(area)
area.update_state(agent1)
agent2 = Agent(area)
area.update_state(agent2)
target = Target(area)
area.update_state(target)
return (area, agent1, agent2, target)
def turn(self, area, agent1, agent2, target, epsilon):
agent1.lookout(area)
agent2.lookout(area)
agent1.act(area, epsilon)
agent2.act(area, epsilon)
target.act(area)
def reset(self, area, agent1, agent2, target):
area.reset_state()
agent1.reset_state(area)
area.update_state(agent1)
agent2.reset_state(area)
area.update_state(agent2)
target.reset_state(area)
area.update_state(target)
main.py
from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot
if __name__ == '__main__':
# init
h = 24
w = 24
area_shape = (h, w)
mergin = 2
epsilon = 0.3
alpha = 0.3
gamma = 0.7
image_shape = (200, 200)
# count
total_plays = 300
total_steps = numpy.zeros(total_plays).astype('int')
play = 0
steps = 0
# construct
gameservice = GameService()
area, agent1, agent2, target = gameservice.construct(area_shape, mergin)
# video writer
writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)
while True:
# disp
print u'play: %d, steps: %d' % (play, steps)
# act
gameservice.turn(area, agent1, agent2, target, epsilon)
# update area and agents' q talbe
agent1.update_q(area, alpha, gamma)
agent2.update_q(area, alpha, gamma)
# show image
image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
cv2.imshow('', image)
if cv2.waitKey() == 27:
break
# refresh state if agents catch the taget
steps += 1
if agent1.reward == 3:
print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
gameservice.reset(area, agent1, agent2, target)
agent1.save_q('q1.npy')
agent2.save_q('q2.npy')
total_steps[play] = steps
steps = 0
play += 1
# count
if play == total_plays:
break
pyplot.plot(numpy.arange(0, total_plays), total_steps)
pyplot.savefig('graph.png')
cv2.destroyAllWindows()
print '!!!finish!!!'
Each agent has an independent $ Q $ value. The state $ s $ is represented by the coordinates of the agent and the coordinates of the target. The coordinates are determined in order from the upper left. When there is no agent or target in the recognition area, it is represented by the numerical value of $ 25 $. Actions are represented by numerical values of $ 0, 1, 2, 3 $ in the order of lower left and upper right. (Number of actions) $ \ times $ (target coordinates) $ \ times $ (agent coordinates), so the size of the $ Q $ value is $ 4 \ times 26 \ times 26 $. Since we cannot know how the agent or target behaves when updating the $ Q $ value, The surrounding state at the destination depends only on one's own behavior, and it is assumed that the behavior changes accordingly.
0 | 1 | 2 | 3 | 4 |
5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 |
↑ Agent's recognition range and its coordinates
The graph below shows the number of steps taken to capture the target on the vertical axis and the number of trials on the horizontal axis. Independent $ 300 $ trials are performed $ 30 $, and the average number of steps in each trial is the value on the vertical axis. As the learning progresses, you can see that the number of steps required to capture the target decreases.
I made an image of an example of the learned behavior. White is the agent and gray is the target. I am learning behaviors that hunt down the target. Actually, there are some images that have not been learned well, but I have not considered them, so I will omit them.
I tried to solve the multi-agent tracking problem with python. There may be some points that the implementation is not working well, so please point out. This time, I only checked the learning results with the agent and target visible, but I will also check the learning results when only one of them can be seen. If there is a discovery, I will add it as needed.
Recommended Posts