Cet article fonctionne dans l'environnement suivant.
article | valeur |
---|---|
CPU | Core i5-8250U |
Ubuntu | 16.04 |
ROS | Kinetic |
Pour l'installation, reportez-vous à ROS Course 02 Installation. Le programme de cet article a également été téléchargé sur github. Veuillez vous référer à ROS Lecture 11 git Repository.
smach est une bibliothèque python pour exécuter des tâches de haut niveau basées sur des transitions d'état. Il existe également des fonctions pratiques telles que l'affichage de la transition d'état sur l'interface graphique. Je vais vous expliquer comment utiliser cette bibliothèque.
installation de smach
sudo apt install ros-kinetic-smach ros-kinetic-smach-viewer
J'expliquerai l'utilisation de base.
La base de smach est l'État. Chaque état effectue un traitement (ʻexcute () ) et produit un résultat (ʻoutcomes
).
State Machine elle-même produit un résultat (ʻoutcomes). Cependant, StateMachine a un état à l'intérieur, et le résultat de cet état se connecte à un autre état ou produit le résultat. Lorsque ʻexecute ()
de StateMachine est exécuté, ʻexecute ()` de l'État enregistré en interne est exécuté dans l'ordre.
py_lecture/scripts/smach_simple1.py
#!/usr/bin/env python
import rospy
import smach
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
def main():
rospy.init_node('smach_somple1')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
outcome = sm_top.execute()
if __name__ == '__main__':
main()
1er terminal
roscore
Deuxième terminal
rosrun py_lecture smach_simple1.py
Lorsque vous l'exécutez, vous pouvez voir que les états changent dans l'ordre comme indiqué ci-dessous.
production
[INFO] [1585988828.285383]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[ INFO ] : State machine terminating 'STATE1':'exit':'succeeded'
StateMachine peut avoir une structure hiérarchique. Vous pouvez également vérifier l'état de State Machine avec l'interface graphique en utilisant une fonction appelée Introspection.
Dans l'exemple précédent, StateMachine n'a ajouté que State, mais vous pouvez également ajouter StateMachine.
py_lecture/scripts/smach_simple2.py
#!/usr/bin/env python
import rospy
import smach
import smach_ros
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
class State3(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
# main
def main():
rospy.init_node('smach_somple2')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})
sm_sub = smach.StateMachine(outcomes=['done'])
with sm_sub:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
1er terminal
roscore
Deuxième terminal
rosrun py_lecture smach_simple1.py
Troisième terminal
rosrun smach_viewer smach_viewer.py
L'écran suivant s'affiche.
Il est nécessaire d'instruire d'autres nœuds ROS à partir du nœud ROS du smach pour effectuer des tâches réalistes. Ici, actionlib est utilisé pour envoyer une commande à move_base. De plus, lorsque la tâche proprement dite est exécutée, il est nécessaire de pouvoir envoyer une annulation par une instruction externe en cours de route.
py_lecture/scripts/smach_task.py
#!/usr/bin/env python
import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty
# define state Foo
class MoveBaseState(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# define state Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome1'
def child_term_cb(outcome_map):
print("child_term_cb")
if outcome_map['MOVE2'] == 'succeeded':
return True
if outcome_map['MONITOR2']:
return True
return False
def out_cb(outcome_map):
print("out_cb")
if outcome_map['MOVE2'] == 'succeeded':
return 'done'
else:
return 'exit'
def monitor_cb(ud, msg):
print("monitor_cb")
return False
def main():
rospy.init_node('smach_example_state_machine')
# Create the top level SMACH state machine
sm_top = smach.StateMachine(outcomes=['done', 'exit'])
# Open the container
with sm_top:
goal1=MoveBaseGoal()
goal1.target_pose.header.frame_id = "dtw_robot1/map"
goal1.target_pose.pose.position.x = 0.5
goal1.target_pose.pose.orientation.w = 1.0
smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})
task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
with task2_concurrence:
goal2=MoveBaseGoal()
goal2.target_pose.header.frame_id = "dtw_robot1/map"
goal2.target_pose.pose.position.x = -0.5
goal2.target_pose.pose.orientation.w = 1.0
smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'})
# Execute SMACH plan
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
1er terminal
roslaunch nav_lecture move_base.launch
Deuxième terminal
rosrun smach_viewer smach_viewer.py
Troisième terminal
rosrun py_lecture smach_task.py
Si vous exécutez normalement ce qui précède, les deux tâches TASK1 et TASK2 seront exécutées dans l'ordre indiqué ci-dessous.
Si vous tapez la commande suivante pendant l'exécution de TASK2, TASK2 sera arrêté.
4ème terminal
rostopic pub -1 /sm_stop std_msgs/Empty "{}"
Lien vers la table des matières du cours ROS
Recommended Posts