[Reinforcement learning] Tracking by multi-agent

Introduction

I tried to solve the multi-agent tracking problem with python. As a textbook ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural networks, genetic algorithms, reinforcement learning-Institute of Electrical Engineers of Japan GA Neuro-based learning methods and their application research We used the expert committee / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning and its algorithms).

Structure of this article

Multi-agent tracking

Learn the best behavior to capture $ 1 $ targets with $ 2 $ agents. The state in which the agent sandwiches the target is defined as the captured state. "Pinching" means that the agent exists at $ 2 $ in the vicinity of $ 4 $ of the target.

rule

The detailed rules are shown below. ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural networks, genetic algorithms, reinforcement learning-Institute of Electrical Engineers of Japan GA Neuro-based learning methods and their application research experts The rule of meeting / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning and its algorithm) is adopted.

  1. Set up a grid-like torus plane of $ 20 \ times 20 $ as an environment.
  2. Randomly place $ 2 $ agents and $ 1 $ targets in the environment.
  3. The agent can observe the state of the $ 2 $ grid centered on itself. That is, it recognizes the $ 25 $ grid including its own grid as a state.
  4. Each agent independently proceeds with Q-learning. Based on the learning result, the action of moving the $ 1 $ grid up, down, left, or right per $ 1 $ step is selected. You will always be rewarded with $ -1 $ for each action. If another agent or target already exists at the destination, it will stay there.
  5. The target moves randomly in either the up, down, left, or right direction for each $ 1 $ step. If there is an agent at the destination, another random action is selected.
  6. When $ 2 $ agents pinch the target, reward each $ 2 $ agent and end the $ 1 $ trial. After that, return to 2. and start the next trial.

Update Q value

Update the $ Q $ value with the following formula.

Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]

$ Q (s_t, a) $ represents the value of the action $ a $ in a certain state $ s_t $. The action $ a $ in the state $ s_t $ gives the reward $ r_ {t + 1} $. When updating the $ Q $ value, the latest reward $ r_ {t + 1} $ is added to the maximum value obtained by the action $ a'$ in the next state $ s_ {t + 1} $. Add the product multiplied by gamma $. This means that the action in a certain state is selected by considering not only the latest reward but also the value in the subsequent state. $ \ Gamma $ is called the discount rate, and $ \ alpha $ is called the learning rate.

Implementation

I will put it separately in the code and the point.

code

I implemented it as follows.

.
├── main.py
├── model
│   ├── agent.py
│   ├── area.py
│   ├── movingobject.py
│   ├── target.py
└── service
    └── gameservice.py

movingobject.py


import numpy
import random
import copy

class MovingObject:

    def __init__(self, area):
        self.state = self.__random_state(area)
        self.actions = self.__create_actions(area.shape)
        self.moves = self.__create_moves()


# public method
    def reset_state(self, area):
        self.state = self.__random_state(area)


    def get_act_index(self, epsilon):
        return self.__select_action(epsilon)


    def move_state(self, act_index):
        return self.state + self.moves.get(act_index)


# private method
    def __random_state(self, area):
        h, w = area.shape
        mergin = area.mergin

        while True:
            y = numpy.random.randint(mergin, h - mergin)
            x = numpy.random.randint(mergin, w - mergin)

            if area.state[y, x] == 0:
                break

        return numpy.array([y, x])


    def __create_actions(self, area_shape):
        actions = []
        h, w = area_shape

        for j in range(h):
            actions.append([])
            for i in range(w):
                action_org = [0, 1, 2, 3]
                action = self.__remove_actions(action_org, h, w, j, i)
                actions[j].append(action)

        return numpy.array(actions)


    def __remove_actions(self, action_org, h, w, j, i):
        action = copy.deepcopy(action_org)
        mergin = 2
        if j == mergin:
            action.remove(3)
        if j == h - mergin - 1:
            action.remove(1)
        if i == mergin:
			action.remove(0)
        if i == w - mergin - 1:
			action.remove(2)

        return action


    def __create_moves(self):
        # 0->left, 1->down, 2-> right, 3->up
        return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}


    def __select_action(self, epsilon):
        y, x = self.state
        action = self.actions[y, x]
        if numpy.random.rand() > epsilon:
            return self.__greedy_action(action)
        else:
            return self.__random_action(action)


    def __greedy_action(self, action):
        sy, sx = self.around
        qmax = self.q[action, sy, sx].max()
        indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
        random.shuffle(indexes)

        return action[indexes[0]]


    def __random_action(self, action):
        act_indexes = copy.deepcopy(action)
        random.shuffle(act_indexes)

        return act_indexes[0]

tartget.py


from movingobject import *

class Target(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 127


# public method
	def act(self, area):
		_y, _x = self.state
		if self.__check_catched(area, _y, _x) == False:
			area.state[_y, _x] = 0
			while True:
				act_index = MovingObject.get_act_index(self, 1.0)
				y, x = MovingObject.move_state(self, act_index)
				if area.state[y, x] == 0:
					self.state = numpy.array([y ,x])
					area.state[y, x] = self.value
					break


# private method
	def __check_catched(self, area, _y, _x):
		check = self.__is_surrounded(area)
		check *= self.__is_atcorners(area, _y, _x)

		return check


	def __is_surrounded(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))


	def __is_atcorners(self, area, _y, _x):
		h, w = area.shape
		mergin = 2
		c = _y == mergin or _y == h - mergin - 1
		c *= _x == mergin or _x == w - mergin - 1

		return c

agent.py


from movingobject import *

class Agent(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 255
		self.q = self.__creat_q_table()
		self.around = numpy.zeros(2).astype('int')
		self.act_index = -1
		self.reward = 0


# public method
	def lookout(self, area):
		y, x = self.state
		self.around = self.__get_object_states(area, y, x)


	def act(self, area, epsilon):
		_y, _x = self.state
		area.state[_y, _x] = 0
		if self.__is_alone():
			epsilon = 1.0
		self.act_index = MovingObject.get_act_index(self, epsilon)
		y, x = MovingObject.move_state(self, self.act_index)
		if area.state[y, x] == 0:
			self.state = numpy.array([y, x])
			area.state[y, x] = self.value
		else:
			area.state[_y, _x] = self.value


	def update_q(self, area, alpha, gamma):
		sy, sx = self.around
		self.reward = self.__get_reward(area)
		act_index = self.act_index
		q = self.q[act_index, sy, sx]
		self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)


	def save_q(self, fname):
		numpy.save(fname, self.q)


# private method
	def __creat_q_table(self):
		q = numpy.zeros((4, 26, 26))

		return q

	def __get_object_states(self, area, y, x):
		mergin = area.mergin
		around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
		t_state = numpy.argwhere(around == 127)[:, 1]
		_a_state = numpy.argwhere(around == 255)[:, 1]
		a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
		if numpy.array_equal(t_state, numpy.array([])):
			t_state = numpy.array([25])
		if numpy.array_equal(a_state, numpy.array([])):
			a_state = numpy.array([25])

		return numpy.r_[t_state, a_state]


	def __is_alone(self):
		return numpy.array_equal(self.around, numpy.array([25, 25]))


	def __get_reward(self, area):
		return 3 if self.__is_surrounding(area) else -1


	def __is_surrounding(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
		check += numpy.array_equal(a_state.mean(axis = 0), t_state)

		return check


	def __get_max_q(self):
		y, x = self.state
		actions = self.actions[y, x]
		sy, sx = self.around
		_sy, _sx = self.__next_around(sy, sx)
		return self.q[actions, _sy, _sx].max()


	def __next_around(self, sy, sx):
		act_index = self.act_index
		_sy = self.__next_state(act_index, sy)
		_sx = self.__next_state(act_index, sx)

		return numpy.array([_sy, _sx])


	def __next_state(self, act_index, z):
		if z != 25:
			if act_index == 0 and (z + 1) % 5 != 0:
				z += 1
			elif act_index == 1 and z / 5 != 0:
				z -= 5
			elif act_index == 2 and z % 5 != 0:
				z -= 1
			elif act_index == 3 and z / 5 != 4:
				z += 5
			else:
				z = 25

		return z

area.py


import numpy

class Area:

    def __init__(self, shape, mergin):
        self.shape = shape
        self.mergin = mergin
        self.state = self.__init_state()


# public method
    def update_state(self, mvobj):
        y, x = mvobj.state
        self.state[y, x] = mvobj.value


    def reset_state(self):
        self.state = self.__init_state()


# private method
    def __init_state(self):
        return numpy.zeros(self.shape).astype('uint8')

gameservice.py


from model.area import Area
from model.agent import Agent
from model.target import Target

class GameService:

    def __init__(self):
        'SERVICE'

    def construct(self, area_shape, mergin):
        area = Area(area_shape, mergin)
    	agent1 = Agent(area)
        area.update_state(agent1)
    	agent2 = Agent(area)
        area.update_state(agent2)
    	target = Target(area)
        area.update_state(target)

        return (area, agent1, agent2, target)


    def turn(self, area, agent1, agent2, target, epsilon):
        agent1.lookout(area)
        agent2.lookout(area)
        agent1.act(area, epsilon)
        agent2.act(area, epsilon)
        target.act(area)


    def reset(self, area, agent1, agent2, target):
        area.reset_state()
        agent1.reset_state(area)
        area.update_state(agent1)
        agent2.reset_state(area)
        area.update_state(agent2)
        target.reset_state(area)
        area.update_state(target)

main.py


from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot

if __name__ == '__main__':

	# init
	h = 24
	w = 24
	area_shape = (h, w)
	mergin = 2
	epsilon = 0.3
	alpha = 0.3
	gamma = 0.7
	image_shape = (200, 200)

	# count
	total_plays = 300
	total_steps = numpy.zeros(total_plays).astype('int')
	play = 0
	steps = 0

	# construct
	gameservice = GameService()
	area, agent1, agent2, target = gameservice.construct(area_shape, mergin)

	# video writer
	writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)

	while True:
		# disp
		print u'play: %d, steps: %d' % (play, steps)

		# act
		gameservice.turn(area, agent1, agent2, target, epsilon)

		# update area and agents' q talbe
		agent1.update_q(area, alpha, gamma)
		agent2.update_q(area, alpha, gamma)

		# show image
		image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
		writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
		cv2.imshow('', image)
		if cv2.waitKey() == 27:
		 	break

		# refresh state if agents catch the taget
		steps += 1
		if agent1.reward == 3:
			print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
			gameservice.reset(area, agent1, agent2, target)
			agent1.save_q('q1.npy')
			agent2.save_q('q2.npy')
			total_steps[play] = steps
			steps = 0
			play += 1

		# count
		if play == total_plays:
			break

	pyplot.plot(numpy.arange(0, total_plays), total_steps)
	pyplot.savefig('graph.png')
	cv2.destroyAllWindows()
	print '!!!finish!!!'

point

Each agent has an independent $ Q $ value. The state $ s $ is represented by the coordinates of the agent and the coordinates of the target. The coordinates are determined in order from the upper left. When there is no agent or target in the recognition area, it is represented by the numerical value of $ 25 $. Actions are represented by numerical values of $ 0, 1, 2, 3 $ in the order of lower left and upper right. (Number of actions) $ \ times $ (target coordinates) $ \ times $ (agent coordinates), so the size of the $ Q $ value is $ 4 \ times 26 \ times 26 $. Since we cannot know how the agent or target behaves when updating the $ Q $ value, The surrounding state at the destination depends only on one's own behavior, and it is assumed that the behavior changes accordingly.

0 1 2 3 4
5 6 7 8 9
10 11 12 13 14
15 16 17 18 19
20 21 22 23 24

↑ Agent's recognition range and its coordinates

result

Graph

The graph below shows the number of steps taken to capture the target on the vertical axis and the number of trials on the horizontal axis. Independent $ 300 $ trials are performed $ 30 $, and the average number of steps in each trial is the value on the vertical axis. As the learning progresses, you can see that the number of steps required to capture the target decreases.

Action

I made an image of an example of the learned behavior. White is the agent and gray is the target. I am learning behaviors that hunt down the target. Actually, there are some images that have not been learned well, but I have not considered them, so I will omit them.

in conclusion

I tried to solve the multi-agent tracking problem with python. There may be some points that the implementation is not working well, so please point out. This time, I only checked the learning results with the agent and target visible, but I will also check the learning results when only one of them can be seen. If there is a discovery, I will add it as needed.

Recommended Posts

[Reinforcement learning] Tracking by multi-agent
[Introduction] Reinforcement learning
Future reinforcement learning_2
Future reinforcement learning_1
Reinforcement learning for tic-tac-toe
[Reinforcement learning] Bandit task
Python + Unity Reinforcement Learning (Learning)
Reinforcement learning 1 introductory edition
Stock investment by deep reinforcement learning (policy gradient method) (1)
Reinforcement learning 18 Colaboratory + Acrobat + ChainerRL
Reinforcement learning 7 Learning data log output
Play with reinforcement learning with MuZero
Reinforcement learning 17 Colaboratory + CartPole + ChainerRL
Reinforcement learning 19 Colaboratory + Mountain_car + ChainerRL
Reinforcement learning 6 First Chainer RL
Reinforcement learning starting with Python
Reinforcement learning 20 Colaboratory + Pendulum + ChainerRL
Reinforcement learning 5 Try programming CartPole?
Reinforcement learning 9 ChainerRL magic remodeling
Reinforcement learning Learn from today
Reinforcement learning 4 CartPole first step
Deep Reinforcement Learning 1 Introduction to Reinforcement Learning
4 [/] Four Arithmetic by Machine Learning
Deep reinforcement learning 2 Implementation of reinforcement learning
DeepMind Reinforcement Learning Framework Acme
Reinforcement learning: Accelerate Value Iteration
Try to make a blackjack strategy by reinforcement learning ((1) Implementation of blackjack)
Reinforcement learning 21 Colaboratory + Pendulum + ChainerRL + A2C
TF2RL: Reinforcement learning library for TensorFlow2.x
Reinforcement learning 34 Make continuous Agent videos
Reinforcement learning 13 Try Mountain_car with ChainerRL.
Python + Unity Reinforcement learning environment construction
Machine learning summary by Python beginners
Reinforcement learning 22 Colaboratory + CartPole + ChainerRL + A3C
Explore the maze with reinforcement learning
Reinforcement learning 8 Try using Chainer UI
Deep learning learned by implementation 1 (regression)
Reinforcement learning 24 Colaboratory + CartPole + ChainerRL + ACER
Reinforcement learning 3 Dynamic programming / TD method
Interval scheduling learning memo ~ by python ~
Deep Reinforcement Learning 3 Practical Edition: Breakout
I tried reinforcement learning using PyBrain
Learn while making! Deep reinforcement learning_1