[Renforcer l'apprentissage] Suivi par multi-agents

introduction

J'ai essayé de résoudre le problème de suivi multi-agent avec python. En tant que manuel ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural network, gene algorithm, Enhanced learning-Electrical Society GA Méthode d'apprentissage basée sur les neuro et sa recherche d'application Nous avons utilisé le comité d'experts / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning et ses algorithmes).

Structure de cet article

Suivi multi-agents

Découvrez le meilleur comportement pour capturer des cibles de 1 $ avec des agents de 2 $. L'état dans lequel l'agent prend en sandwich la cible est défini comme l'état capturé. «Pincer» signifie que l'agent existe à 2 $ $ au voisinage de 4 $ de la cible.

règle

Les règles détaillées sont présentées ci-dessous. ["Learning and its algorithme"](http://www.amazon.co.jp/ Learning et son algorithme - Réseau neuronal, algorithme génétique, méthode d'apprentissage améliorée utilisant GA neuro de l'Institut des ingénieurs électriciens du Japon et de son comité d'experts en recherche d'application La règle de rencontre / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning et son algorithme) est adoptée.

  1. Fournir un plan torique en forme de grille à 20 $ \ fois 20 $ comme environnement.
  2. Placez au hasard des agents à 2 $ et des cibles à 1 $ dans l'environnement.
  3. L'agent peut observer l'état de la grille $ 2 $ centrée sur lui-même. Autrement dit, il reconnaît la grille de 25 $, y compris sa propre grille, en tant qu'état.
  4. Chaque agent procède indépendamment au Q-learning. En fonction du résultat de l'apprentissage, l'action de déplacer la grille de 1 $ vers le haut, le bas, la gauche ou la droite par pas de 1 $ est sélectionnée. Vous serez toujours récompensé par -1 $ pour chaque action. Si un autre agent ou cible existe déjà à destination, il y restera.
  5. La cible se déplace aléatoirement dans le sens haut, bas, gauche ou droite pour chaque pas de 1 $. S'il y a un agent à la destination, une autre action aléatoire est sélectionnée.
  6. Lorsque des agents à 2 $ pincent la cible, récompensez chaque agent à 2 $ et terminez l'essai à 1 $. Après cela, revenez à 2. et commencez l'essai suivant.

Mettre à jour la valeur Q

Mettez à jour la valeur $ Q $ avec la formule suivante.

Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]

$ Q (s_t, a) $ représente la valeur de l'action $ a $ dans un état $ s_t $. L'action dans l'état $ s_t $ $ a $ donne la récompense $ r_ {t + 1} $. Lors de la mise à jour de la valeur $ Q $, la récompense la plus récente $ r_ {t + 1} $ est ajoutée à la valeur maximale obtenue par l'action $ a '$ dans l'état suivant $ s_ {t + 1} $. Ajoutez le produit multiplié par gamma $. Cela signifie que l'action dans un certain état est sélectionnée en considérant non seulement la dernière récompense mais également la valeur dans l'état suivant. $ \ Gamma $ est appelé le taux d'actualisation, et $ \ alpha $ est appelé le taux d'apprentissage.

la mise en oeuvre

Je vais le mettre séparément dans le code et le point.

code

Je l'ai implémenté comme suit.

.
├── main.py
├── model
│   ├── agent.py
│   ├── area.py
│   ├── movingobject.py
│   ├── target.py
└── service
    └── gameservice.py

movingobject.py


import numpy
import random
import copy

class MovingObject:

    def __init__(self, area):
        self.state = self.__random_state(area)
        self.actions = self.__create_actions(area.shape)
        self.moves = self.__create_moves()


# public method
    def reset_state(self, area):
        self.state = self.__random_state(area)


    def get_act_index(self, epsilon):
        return self.__select_action(epsilon)


    def move_state(self, act_index):
        return self.state + self.moves.get(act_index)


# private method
    def __random_state(self, area):
        h, w = area.shape
        mergin = area.mergin

        while True:
            y = numpy.random.randint(mergin, h - mergin)
            x = numpy.random.randint(mergin, w - mergin)

            if area.state[y, x] == 0:
                break

        return numpy.array([y, x])


    def __create_actions(self, area_shape):
        actions = []
        h, w = area_shape

        for j in range(h):
            actions.append([])
            for i in range(w):
                action_org = [0, 1, 2, 3]
                action = self.__remove_actions(action_org, h, w, j, i)
                actions[j].append(action)

        return numpy.array(actions)


    def __remove_actions(self, action_org, h, w, j, i):
        action = copy.deepcopy(action_org)
        mergin = 2
        if j == mergin:
            action.remove(3)
        if j == h - mergin - 1:
            action.remove(1)
        if i == mergin:
			action.remove(0)
        if i == w - mergin - 1:
			action.remove(2)

        return action


    def __create_moves(self):
        # 0->left, 1->down, 2-> right, 3->up
        return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}


    def __select_action(self, epsilon):
        y, x = self.state
        action = self.actions[y, x]
        if numpy.random.rand() > epsilon:
            return self.__greedy_action(action)
        else:
            return self.__random_action(action)


    def __greedy_action(self, action):
        sy, sx = self.around
        qmax = self.q[action, sy, sx].max()
        indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
        random.shuffle(indexes)

        return action[indexes[0]]


    def __random_action(self, action):
        act_indexes = copy.deepcopy(action)
        random.shuffle(act_indexes)

        return act_indexes[0]

tartget.py


from movingobject import *

class Target(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 127


# public method
	def act(self, area):
		_y, _x = self.state
		if self.__check_catched(area, _y, _x) == False:
			area.state[_y, _x] = 0
			while True:
				act_index = MovingObject.get_act_index(self, 1.0)
				y, x = MovingObject.move_state(self, act_index)
				if area.state[y, x] == 0:
					self.state = numpy.array([y ,x])
					area.state[y, x] = self.value
					break


# private method
	def __check_catched(self, area, _y, _x):
		check = self.__is_surrounded(area)
		check *= self.__is_atcorners(area, _y, _x)

		return check


	def __is_surrounded(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))


	def __is_atcorners(self, area, _y, _x):
		h, w = area.shape
		mergin = 2
		c = _y == mergin or _y == h - mergin - 1
		c *= _x == mergin or _x == w - mergin - 1

		return c

agent.py


from movingobject import *

class Agent(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 255
		self.q = self.__creat_q_table()
		self.around = numpy.zeros(2).astype('int')
		self.act_index = -1
		self.reward = 0


# public method
	def lookout(self, area):
		y, x = self.state
		self.around = self.__get_object_states(area, y, x)


	def act(self, area, epsilon):
		_y, _x = self.state
		area.state[_y, _x] = 0
		if self.__is_alone():
			epsilon = 1.0
		self.act_index = MovingObject.get_act_index(self, epsilon)
		y, x = MovingObject.move_state(self, self.act_index)
		if area.state[y, x] == 0:
			self.state = numpy.array([y, x])
			area.state[y, x] = self.value
		else:
			area.state[_y, _x] = self.value


	def update_q(self, area, alpha, gamma):
		sy, sx = self.around
		self.reward = self.__get_reward(area)
		act_index = self.act_index
		q = self.q[act_index, sy, sx]
		self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)


	def save_q(self, fname):
		numpy.save(fname, self.q)


# private method
	def __creat_q_table(self):
		q = numpy.zeros((4, 26, 26))

		return q

	def __get_object_states(self, area, y, x):
		mergin = area.mergin
		around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
		t_state = numpy.argwhere(around == 127)[:, 1]
		_a_state = numpy.argwhere(around == 255)[:, 1]
		a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
		if numpy.array_equal(t_state, numpy.array([])):
			t_state = numpy.array([25])
		if numpy.array_equal(a_state, numpy.array([])):
			a_state = numpy.array([25])

		return numpy.r_[t_state, a_state]


	def __is_alone(self):
		return numpy.array_equal(self.around, numpy.array([25, 25]))


	def __get_reward(self, area):
		return 3 if self.__is_surrounding(area) else -1


	def __is_surrounding(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
		check += numpy.array_equal(a_state.mean(axis = 0), t_state)

		return check


	def __get_max_q(self):
		y, x = self.state
		actions = self.actions[y, x]
		sy, sx = self.around
		_sy, _sx = self.__next_around(sy, sx)
		return self.q[actions, _sy, _sx].max()


	def __next_around(self, sy, sx):
		act_index = self.act_index
		_sy = self.__next_state(act_index, sy)
		_sx = self.__next_state(act_index, sx)

		return numpy.array([_sy, _sx])


	def __next_state(self, act_index, z):
		if z != 25:
			if act_index == 0 and (z + 1) % 5 != 0:
				z += 1
			elif act_index == 1 and z / 5 != 0:
				z -= 5
			elif act_index == 2 and z % 5 != 0:
				z -= 1
			elif act_index == 3 and z / 5 != 4:
				z += 5
			else:
				z = 25

		return z

area.py


import numpy

class Area:

    def __init__(self, shape, mergin):
        self.shape = shape
        self.mergin = mergin
        self.state = self.__init_state()


# public method
    def update_state(self, mvobj):
        y, x = mvobj.state
        self.state[y, x] = mvobj.value


    def reset_state(self):
        self.state = self.__init_state()


# private method
    def __init_state(self):
        return numpy.zeros(self.shape).astype('uint8')

gameservice.py


from model.area import Area
from model.agent import Agent
from model.target import Target

class GameService:

    def __init__(self):
        'SERVICE'

    def construct(self, area_shape, mergin):
        area = Area(area_shape, mergin)
    	agent1 = Agent(area)
        area.update_state(agent1)
    	agent2 = Agent(area)
        area.update_state(agent2)
    	target = Target(area)
        area.update_state(target)

        return (area, agent1, agent2, target)


    def turn(self, area, agent1, agent2, target, epsilon):
        agent1.lookout(area)
        agent2.lookout(area)
        agent1.act(area, epsilon)
        agent2.act(area, epsilon)
        target.act(area)


    def reset(self, area, agent1, agent2, target):
        area.reset_state()
        agent1.reset_state(area)
        area.update_state(agent1)
        agent2.reset_state(area)
        area.update_state(agent2)
        target.reset_state(area)
        area.update_state(target)

main.py


from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot

if __name__ == '__main__':

	# init
	h = 24
	w = 24
	area_shape = (h, w)
	mergin = 2
	epsilon = 0.3
	alpha = 0.3
	gamma = 0.7
	image_shape = (200, 200)

	# count
	total_plays = 300
	total_steps = numpy.zeros(total_plays).astype('int')
	play = 0
	steps = 0

	# construct
	gameservice = GameService()
	area, agent1, agent2, target = gameservice.construct(area_shape, mergin)

	# video writer
	writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)

	while True:
		# disp
		print u'play: %d, steps: %d' % (play, steps)

		# act
		gameservice.turn(area, agent1, agent2, target, epsilon)

		# update area and agents' q talbe
		agent1.update_q(area, alpha, gamma)
		agent2.update_q(area, alpha, gamma)

		# show image
		image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
		writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
		cv2.imshow('', image)
		if cv2.waitKey() == 27:
		 	break

		# refresh state if agents catch the taget
		steps += 1
		if agent1.reward == 3:
			print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
			gameservice.reset(area, agent1, agent2, target)
			agent1.save_q('q1.npy')
			agent2.save_q('q2.npy')
			total_steps[play] = steps
			steps = 0
			play += 1

		# count
		if play == total_plays:
			break

	pyplot.plot(numpy.arange(0, total_plays), total_steps)
	pyplot.savefig('graph.png')
	cv2.destroyAllWindows()
	print '!!!finish!!!'

point

Chaque agent a une valeur indépendante en $ Q $. L'état $ s $ est représenté par les coordonnées de l'agent et les coordonnées de la cible. Les coordonnées sont déterminées dans l'ordre à partir du coin supérieur gauche. Lorsqu'il n'y a pas d'agent ou de cible dans la zone de reconnaissance, cela est représenté par la valeur numérique de 25 $. Les actions sont exprimées en $ 0, 1, 2, 3 $ dans l'ordre inférieur gauche et supérieur droit. (Nombre d'actions) $ \ fois $ (coordonnées cibles) $ \ fois $ (coordonnées d'agent), donc la taille de la valeur $ Q $ est de 4 $ \ fois 26 \ fois 26 $. Puisqu'il n'est pas possible de savoir comment l'agent ou la cible se comporte lors de la mise à jour de la valeur $ Q $, L'état environnant à la destination ne dépend que de ses propres actions, et on suppose que l'état de l'environnement change de la quantité de cette action.

0 1 2 3 4
5 6 7 8 9
10 11 12 13 14
15 16 17 18 19
20 21 22 23 24

↑ Plage de reconnaissance de l'agent et ses coordonnées

résultat

Graphique

Le graphique ci-dessous montre le nombre d'étapes prises pour capturer la cible sur l'axe vertical et le nombre d'essais sur l'axe horizontal. Les essais indépendants de 300 $ sont réalisés à 30 $ et le nombre moyen d'étapes dans chaque essai est la valeur sur l'axe vertical. Au fur et à mesure que l'apprentissage progresse, vous pouvez voir que le nombre d'étapes requises pour capturer la cible diminue.

action

J'ai fait une image d'un exemple du comportement appris. Le blanc est l'agent et le gris est la cible. J'apprends des comportements qui traquent la cible. En fait, il y a des images qui n'ont pas été bien apprises, mais je ne les ai pas considérées, alors je les omettrai.

en conclusion

J'ai essayé de résoudre le problème de suivi multi-agent avec python. Il peut y avoir des points où la mise en œuvre ne fonctionne pas correctement, veuillez donc le signaler. Cette fois, j'ai seulement confirmé les résultats d'apprentissage avec l'agent et la cible visibles. Je vérifierai également les résultats d'apprentissage lorsqu'un seul d'entre eux sera visible. S'il y a une découverte, je l'ajouterai au besoin.

Recommended Posts

[Renforcer l'apprentissage] Suivi par multi-agents
[Introduction] Renforcer l'apprentissage
Apprentissage par renforcement futur_2
Apprentissage par renforcement futur_1
Renforcer l'apprentissage de la troisième ligne
[Renforcer l'apprentissage] Tâche de bandit
Apprentissage amélioré Python + Unity (apprentissage)
Renforcer l'apprentissage 1 édition introductive
Investissement en actions par apprentissage approfondi (méthode du gradient de politique) (1)
Renforcer l'apprentissage 18 Colaboratory + Acrobat + ChainerRL
Apprentissage amélioré 7 Sortie du journal des données d'apprentissage
Renforcer l'apprentissage 17 Colaboratory + CartPole + ChainerRL
Renforcer l'apprentissage 19 Colaboratory + Mountain_car + ChainerRL
Renforcer l'apprentissage 6 First Chainer RL
Apprentissage amélioré à partir de Python
Renforcer l'apprentissage 20 Colaboratoire + Pendule + ChainerRL
Apprentissage par renforcement 5 Essayez de programmer CartPole?
Apprentissage par renforcement 9 Remodelage magique ChainerRL
Renforcer l'apprentissage Apprendre d'aujourd'hui
Renforcer l'apprentissage 4 CartPole première étape
Apprentissage par renforcement profond 1 Introduction au renforcement de l'apprentissage
4 [/] Quatre arithmétiques par apprentissage automatique
Apprentissage par renforcement profond 2 Mise en œuvre de l'apprentissage par renforcement
DeepMind Enhanced Learning Framework Acme
Apprentissage par renforcement: accélérer l'itération de la valeur
Essayez de faire une stratégie de blackjack en renforçant l'apprentissage ((1) Implémentation du blackjack)
Renforcer l'apprentissage 21 Colaboratoire + Pendule + ChainerRL + A2C
TF2RL: bibliothèque d'apprentissage améliorée pour TensorFlow2.x
Apprentissage par renforcement 34 Créez des vidéos d'agent en continu
Renforcer l'apprentissage 13 Essayez Mountain_car avec ChainerRL.
Construction d'un environnement d'apprentissage amélioré Python + Unity
Résumé de l'apprentissage automatique par les débutants de Python
Renforcer l'apprentissage 22 Colaboratory + CartPole + ChainerRL + A3C
Explorez le labyrinthe avec l'apprentissage augmenté
Renforcer l'apprentissage 8 Essayez d'utiliser l'interface utilisateur de Chainer
Apprentissage profond appris par l'implémentation 1 (édition de retour)
Renforcer l'apprentissage 24 Colaboratory + CartPole + ChainerRL + ACER
Apprentissage par renforcement 3 Méthode de planification dynamique / méthode TD
Mémo d'apprentissage de la planification des sections ~ par python ~
Deep Strengthening Learning 3 Édition pratique: Briser des blocs
J'ai essayé l'apprentissage par renforcement avec PyBrain
Apprenez en faisant! Apprentissage par renforcement profond_1