This article works in the following environment.
item | value |
---|---|
CPU | Core i5-8250U |
Ubuntu | 16.04 |
ROS | Kinetic |
For installation, refer to ROS Course 02 Installation. Also, the program in this article has been uploaded to github. See ROS Course 11 git repository.
smach is a python library for executing high-level tasks based on state transitions. There are also convenient functions such as displaying state transitions on the GUI. I will explain how to use this library.
installation of smach
sudo apt install ros-kinetic-smach ros-kinetic-smach-viewer
I will explain the basic usage.
The basis of smach is State. Each State performs processing (ʻexcute () ) and produces one result (ʻoutcomes
).
The State Machine itself produces one result (ʻoutcomes). However, the StateMachine has a State inside, and the result of that State makes a connection to transition to another State or output the result. When ʻexecute ()
of State Machine is executed, ʻexecute ()` of the internally registered State is executed in order.
py_lecture/scripts/smach_simple1.py
#!/usr/bin/env python
import rospy
import smach
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
def main():
rospy.init_node('smach_somple1')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
outcome = sm_top.execute()
if __name__ == '__main__':
main()
1st terminal
roscore
Second terminal
rosrun py_lecture smach_simple1.py
When you execute it, you can see that the states change in order as shown below.
output
[INFO] [1585988828.285383]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[ INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[ INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[ INFO ] : State machine terminating 'STATE1':'exit':'succeeded'
StateMachines can have a hierarchical structure. You can also check the status of State Machine with GUI by using a function called Introspection.
In the previous example, StateMachine only added State, but you can also add StateMachine.
py_lecture/scripts/smach_simple2.py
#!/usr/bin/env python
import rospy
import smach
import smach_ros
class State1(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done','exit'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state STATE1')
rospy.sleep(2.0)
if self.counter < 3:
self.counter += 1
return 'done'
else:
return 'exit'
class State2(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
class State3(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['done'])
def execute(self, userdata):
rospy.loginfo('Executing state STATE2')
rospy.sleep(2.0)
return 'done'
# main
def main():
rospy.init_node('smach_somple2')
sm_top = smach.StateMachine(outcomes=['succeeded'])
with sm_top:
smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})
sm_sub = smach.StateMachine(outcomes=['done'])
with sm_sub:
smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})
smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
1st terminal
roscore
Second terminal
rosrun py_lecture smach_simple1.py
Third terminal
rosrun smach_viewer smach_viewer.py
The following screen will be displayed.
To perform a realistic task, you need to instruct other ROS nodes from the smach's ROS node. Here, actionlib is used to send a command to move_base. In addition, when the actual task is executed, it is necessary to be able to send a cancellation by an external instruction on the way.
py_lecture/scripts/smach_task.py
#!/usr/bin/env python
import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty
# define state Foo
class MoveBaseState(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# define state Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome1'
def child_term_cb(outcome_map):
print("child_term_cb")
if outcome_map['MOVE2'] == 'succeeded':
return True
if outcome_map['MONITOR2']:
return True
return False
def out_cb(outcome_map):
print("out_cb")
if outcome_map['MOVE2'] == 'succeeded':
return 'done'
else:
return 'exit'
def monitor_cb(ud, msg):
print("monitor_cb")
return False
def main():
rospy.init_node('smach_example_state_machine')
# Create the top level SMACH state machine
sm_top = smach.StateMachine(outcomes=['done', 'exit'])
# Open the container
with sm_top:
goal1=MoveBaseGoal()
goal1.target_pose.header.frame_id = "dtw_robot1/map"
goal1.target_pose.pose.position.x = 0.5
goal1.target_pose.pose.orientation.w = 1.0
smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})
task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
with task2_concurrence:
goal2=MoveBaseGoal()
goal2.target_pose.header.frame_id = "dtw_robot1/map"
goal2.target_pose.pose.position.x = -0.5
goal2.target_pose.pose.orientation.w = 1.0
smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'})
# Execute SMACH plan
sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
sis.start()
outcome = sm_top.execute()
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
1st terminal
roslaunch nav_lecture move_base.launch
Second terminal
rosrun smach_viewer smach_viewer.py
Third terminal
rosrun py_lecture smach_task.py
If you execute the above normally, the two tasks TASK1 and TASK2 will be executed in sequence as shown below.
If you type the following command while TASK2 is being executed, TASK2 will be stopped.
4th terminal
rostopic pub -1 /sm_stop std_msgs/Empty "{}"