J'ai essayé de résoudre le problème de suivi multi-agent avec python. En tant que manuel ["Learning and its algorithms"](http://www.amazon.co.jp/ Learning and its algorithms-Neural network, gene algorithm, Enhanced learning-Electrical Society GA Méthode d'apprentissage basée sur les neuro et sa recherche d'application Nous avons utilisé le comité d'experts / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning et ses algorithmes).
Structure de cet article
Découvrez le meilleur comportement pour capturer des cibles de 1 $ avec des agents de 2 $. L'état dans lequel l'agent prend en sandwich la cible est défini comme l'état capturé. «Pincer» signifie que l'agent existe à 2 $ $ au voisinage de 4 $ de la cible.
Les règles détaillées sont présentées ci-dessous. ["Learning and its algorithme"](http://www.amazon.co.jp/ Learning et son algorithme - Réseau neuronal, algorithme génétique, méthode d'apprentissage améliorée utilisant GA neuro de l'Institut des ingénieurs électriciens du Japon et de son comité d'experts en recherche d'application La règle de rencontre / dp / 4627827512 / ref = sr_1_2? S = books & ie = UTF8 & qid = 1464583596 & sr = 1-2 & keywords = learning et son algorithme) est adoptée.
Mettez à jour la valeur $ Q $ avec la formule suivante.
Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]
$ Q (s_t, a) $ représente la valeur de l'action $ a $ dans un état $ s_t $. L'action dans l'état $ s_t $ $ a $ donne la récompense $ r_ {t + 1} $. Lors de la mise à jour de la valeur $ Q $, la récompense la plus récente $ r_ {t + 1} $ est ajoutée à la valeur maximale obtenue par l'action $ a '$ dans l'état suivant $ s_ {t + 1} $. Ajoutez le produit multiplié par gamma $. Cela signifie que l'action dans un certain état est sélectionnée en considérant non seulement la dernière récompense mais également la valeur dans l'état suivant. $ \ Gamma $ est appelé le taux d'actualisation, et $ \ alpha $ est appelé le taux d'apprentissage.
Je vais le mettre séparément dans le code et le point.
Je l'ai implémenté comme suit.
.
├── main.py
├── model
│ ├── agent.py
│ ├── area.py
│ ├── movingobject.py
│ ├── target.py
└── service
└── gameservice.py
movingobject.py
import numpy
import random
import copy
class MovingObject:
def __init__(self, area):
self.state = self.__random_state(area)
self.actions = self.__create_actions(area.shape)
self.moves = self.__create_moves()
# public method
def reset_state(self, area):
self.state = self.__random_state(area)
def get_act_index(self, epsilon):
return self.__select_action(epsilon)
def move_state(self, act_index):
return self.state + self.moves.get(act_index)
# private method
def __random_state(self, area):
h, w = area.shape
mergin = area.mergin
while True:
y = numpy.random.randint(mergin, h - mergin)
x = numpy.random.randint(mergin, w - mergin)
if area.state[y, x] == 0:
break
return numpy.array([y, x])
def __create_actions(self, area_shape):
actions = []
h, w = area_shape
for j in range(h):
actions.append([])
for i in range(w):
action_org = [0, 1, 2, 3]
action = self.__remove_actions(action_org, h, w, j, i)
actions[j].append(action)
return numpy.array(actions)
def __remove_actions(self, action_org, h, w, j, i):
action = copy.deepcopy(action_org)
mergin = 2
if j == mergin:
action.remove(3)
if j == h - mergin - 1:
action.remove(1)
if i == mergin:
action.remove(0)
if i == w - mergin - 1:
action.remove(2)
return action
def __create_moves(self):
# 0->left, 1->down, 2-> right, 3->up
return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}
def __select_action(self, epsilon):
y, x = self.state
action = self.actions[y, x]
if numpy.random.rand() > epsilon:
return self.__greedy_action(action)
else:
return self.__random_action(action)
def __greedy_action(self, action):
sy, sx = self.around
qmax = self.q[action, sy, sx].max()
indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
random.shuffle(indexes)
return action[indexes[0]]
def __random_action(self, action):
act_indexes = copy.deepcopy(action)
random.shuffle(act_indexes)
return act_indexes[0]
tartget.py
from movingobject import *
class Target(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 127
# public method
def act(self, area):
_y, _x = self.state
if self.__check_catched(area, _y, _x) == False:
area.state[_y, _x] = 0
while True:
act_index = MovingObject.get_act_index(self, 1.0)
y, x = MovingObject.move_state(self, act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y ,x])
area.state[y, x] = self.value
break
# private method
def __check_catched(self, area, _y, _x):
check = self.__is_surrounded(area)
check *= self.__is_atcorners(area, _y, _x)
return check
def __is_surrounded(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
def __is_atcorners(self, area, _y, _x):
h, w = area.shape
mergin = 2
c = _y == mergin or _y == h - mergin - 1
c *= _x == mergin or _x == w - mergin - 1
return c
agent.py
from movingobject import *
class Agent(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 255
self.q = self.__creat_q_table()
self.around = numpy.zeros(2).astype('int')
self.act_index = -1
self.reward = 0
# public method
def lookout(self, area):
y, x = self.state
self.around = self.__get_object_states(area, y, x)
def act(self, area, epsilon):
_y, _x = self.state
area.state[_y, _x] = 0
if self.__is_alone():
epsilon = 1.0
self.act_index = MovingObject.get_act_index(self, epsilon)
y, x = MovingObject.move_state(self, self.act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y, x])
area.state[y, x] = self.value
else:
area.state[_y, _x] = self.value
def update_q(self, area, alpha, gamma):
sy, sx = self.around
self.reward = self.__get_reward(area)
act_index = self.act_index
q = self.q[act_index, sy, sx]
self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)
def save_q(self, fname):
numpy.save(fname, self.q)
# private method
def __creat_q_table(self):
q = numpy.zeros((4, 26, 26))
return q
def __get_object_states(self, area, y, x):
mergin = area.mergin
around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
t_state = numpy.argwhere(around == 127)[:, 1]
_a_state = numpy.argwhere(around == 255)[:, 1]
a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
if numpy.array_equal(t_state, numpy.array([])):
t_state = numpy.array([25])
if numpy.array_equal(a_state, numpy.array([])):
a_state = numpy.array([25])
return numpy.r_[t_state, a_state]
def __is_alone(self):
return numpy.array_equal(self.around, numpy.array([25, 25]))
def __get_reward(self, area):
return 3 if self.__is_surrounding(area) else -1
def __is_surrounding(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
check += numpy.array_equal(a_state.mean(axis = 0), t_state)
return check
def __get_max_q(self):
y, x = self.state
actions = self.actions[y, x]
sy, sx = self.around
_sy, _sx = self.__next_around(sy, sx)
return self.q[actions, _sy, _sx].max()
def __next_around(self, sy, sx):
act_index = self.act_index
_sy = self.__next_state(act_index, sy)
_sx = self.__next_state(act_index, sx)
return numpy.array([_sy, _sx])
def __next_state(self, act_index, z):
if z != 25:
if act_index == 0 and (z + 1) % 5 != 0:
z += 1
elif act_index == 1 and z / 5 != 0:
z -= 5
elif act_index == 2 and z % 5 != 0:
z -= 1
elif act_index == 3 and z / 5 != 4:
z += 5
else:
z = 25
return z
area.py
import numpy
class Area:
def __init__(self, shape, mergin):
self.shape = shape
self.mergin = mergin
self.state = self.__init_state()
# public method
def update_state(self, mvobj):
y, x = mvobj.state
self.state[y, x] = mvobj.value
def reset_state(self):
self.state = self.__init_state()
# private method
def __init_state(self):
return numpy.zeros(self.shape).astype('uint8')
gameservice.py
from model.area import Area
from model.agent import Agent
from model.target import Target
class GameService:
def __init__(self):
'SERVICE'
def construct(self, area_shape, mergin):
area = Area(area_shape, mergin)
agent1 = Agent(area)
area.update_state(agent1)
agent2 = Agent(area)
area.update_state(agent2)
target = Target(area)
area.update_state(target)
return (area, agent1, agent2, target)
def turn(self, area, agent1, agent2, target, epsilon):
agent1.lookout(area)
agent2.lookout(area)
agent1.act(area, epsilon)
agent2.act(area, epsilon)
target.act(area)
def reset(self, area, agent1, agent2, target):
area.reset_state()
agent1.reset_state(area)
area.update_state(agent1)
agent2.reset_state(area)
area.update_state(agent2)
target.reset_state(area)
area.update_state(target)
main.py
from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot
if __name__ == '__main__':
# init
h = 24
w = 24
area_shape = (h, w)
mergin = 2
epsilon = 0.3
alpha = 0.3
gamma = 0.7
image_shape = (200, 200)
# count
total_plays = 300
total_steps = numpy.zeros(total_plays).astype('int')
play = 0
steps = 0
# construct
gameservice = GameService()
area, agent1, agent2, target = gameservice.construct(area_shape, mergin)
# video writer
writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)
while True:
# disp
print u'play: %d, steps: %d' % (play, steps)
# act
gameservice.turn(area, agent1, agent2, target, epsilon)
# update area and agents' q talbe
agent1.update_q(area, alpha, gamma)
agent2.update_q(area, alpha, gamma)
# show image
image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
cv2.imshow('', image)
if cv2.waitKey() == 27:
break
# refresh state if agents catch the taget
steps += 1
if agent1.reward == 3:
print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
gameservice.reset(area, agent1, agent2, target)
agent1.save_q('q1.npy')
agent2.save_q('q2.npy')
total_steps[play] = steps
steps = 0
play += 1
# count
if play == total_plays:
break
pyplot.plot(numpy.arange(0, total_plays), total_steps)
pyplot.savefig('graph.png')
cv2.destroyAllWindows()
print '!!!finish!!!'
Chaque agent a une valeur indépendante en $ Q $. L'état $ s $ est représenté par les coordonnées de l'agent et les coordonnées de la cible. Les coordonnées sont déterminées dans l'ordre à partir du coin supérieur gauche. Lorsqu'il n'y a pas d'agent ou de cible dans la zone de reconnaissance, cela est représenté par la valeur numérique de 25 $. Les actions sont exprimées en $ 0, 1, 2, 3 $ dans l'ordre inférieur gauche et supérieur droit. (Nombre d'actions) $ \ fois $ (coordonnées cibles) $ \ fois $ (coordonnées d'agent), donc la taille de la valeur $ Q $ est de 4 $ \ fois 26 \ fois 26 $. Puisqu'il n'est pas possible de savoir comment l'agent ou la cible se comporte lors de la mise à jour de la valeur $ Q $, L'état environnant à la destination ne dépend que de ses propres actions, et on suppose que l'état de l'environnement change de la quantité de cette action.
0 | 1 | 2 | 3 | 4 |
5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 |
↑ Plage de reconnaissance de l'agent et ses coordonnées
Le graphique ci-dessous montre le nombre d'étapes prises pour capturer la cible sur l'axe vertical et le nombre d'essais sur l'axe horizontal. Les essais indépendants de 300 $ sont réalisés à 30 $ et le nombre moyen d'étapes dans chaque essai est la valeur sur l'axe vertical. Au fur et à mesure que l'apprentissage progresse, vous pouvez voir que le nombre d'étapes requises pour capturer la cible diminue.
J'ai fait une image d'un exemple du comportement appris. Le blanc est l'agent et le gris est la cible. J'apprends des comportements qui traquent la cible. En fait, il y a des images qui n'ont pas été bien apprises, mais je ne les ai pas considérées, alors je les omettrai.
J'ai essayé de résoudre le problème de suivi multi-agent avec python. Il peut y avoir des points où la mise en œuvre ne fonctionne pas correctement, veuillez donc le signaler. Cette fois, j'ai seulement confirmé les résultats d'apprentissage avec l'agent et la cible visibles. Je vérifierai également les résultats d'apprentissage lorsqu'un seul d'entre eux sera visible. S'il y a une découverte, je l'ajouterai au besoin.
Recommended Posts