ROS Lecture 113 Perform tasks with smach

environment

This article works in the following environment.

item value
CPU Core i5-8250U
Ubuntu 16.04
ROS Kinetic

For installation, refer to ROS Course 02 Installation. Also, the program in this article has been uploaded to github. See ROS Course 11 git repository.

Overview

smach is a python library for executing high-level tasks based on state transitions. There are also convenient functions such as displaying state transitions on the GUI. I will explain how to use this library.

Installation

installation of smach


sudo apt install ros-kinetic-smach ros-kinetic-smach-viewer

Example 1 Basic usage

I will explain the basic usage.

Source code

The basis of smach is State. Each State performs processing (ʻexcute () ) and produces one result (ʻoutcomes). The State Machine itself produces one result (ʻoutcomes). However, the StateMachine has a State inside, and the result of that State makes a connection to transition to another State or output the result. When ʻexecute () of State Machine is executed, ʻexecute ()` of the internally registered State is executed in order.

py_lecture/scripts/smach_simple1.py


#!/usr/bin/env python

import rospy
import smach

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

def main():
    rospy.init_node('smach_somple1')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
        smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

    outcome = sm_top.execute()

if __name__ == '__main__':
    main()

Run

1st terminal


roscore

Second terminal


rosrun py_lecture smach_simple1.py

When you execute it, you can see that the states change in order as shown below.

output


[INFO] [1585988828.285383]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[  INFO ] : State machine terminating 'STATE1':'exit':'succeeded'

Example 2 State Machine Hierarchy and Introspection

StateMachines can have a hierarchical structure. You can also check the status of State Machine with GUI by using a function called Introspection.

Source code

In the previous example, StateMachine only added State, but you can also add StateMachine.

py_lecture/scripts/smach_simple2.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

class State3(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

# main
def main():
    rospy.init_node('smach_somple2')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})

        sm_sub = smach.StateMachine(outcomes=['done'])
        with sm_sub:
            smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
            smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

        smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})

    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

Run

1st terminal


roscore

Second terminal


rosrun py_lecture smach_simple1.py

Third terminal


 rosrun smach_viewer smach_viewer.py 

The following screen will be displayed.

smach2.gif

Example 3 Performing complex tasks

To perform a realistic task, you need to instruct other ROS nodes from the smach's ROS node. Here, actionlib is used to send a command to move_base. In addition, when the actual task is executed, it is necessary to be able to send a cancellation by an external instruction on the way.

Source code

py_lecture/scripts/smach_task.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty

# define state Foo
class MoveBaseState(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

def child_term_cb(outcome_map):
    print("child_term_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return True
    if outcome_map['MONITOR2']:
        return True
    return False

def out_cb(outcome_map):
    print("out_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return 'done'
    else:
        return 'exit'

def monitor_cb(ud, msg):
    print("monitor_cb")
    return False

def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['done', 'exit'])
    
    # Open the container
    with sm_top:
        goal1=MoveBaseGoal()
        goal1.target_pose.header.frame_id = "dtw_robot1/map"
        goal1.target_pose.pose.position.x = 0.5
        goal1.target_pose.pose.orientation.w = 1.0
        smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})

        task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
        with task2_concurrence:
            goal2=MoveBaseGoal()
            goal2.target_pose.header.frame_id = "dtw_robot1/map"
            goal2.target_pose.pose.position.x = -0.5
            goal2.target_pose.pose.orientation.w = 1.0
            smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
            smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
        smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'}) 



    # Execute SMACH plan
    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()


if __name__ == '__main__':
    main()

Run

1st terminal


roslaunch nav_lecture move_base.launch 

Second terminal


rosrun smach_viewer smach_viewer.py 

Third terminal


rosrun py_lecture smach_task.py

If you execute the above normally, the two tasks TASK1 and TASK2 will be executed in sequence as shown below.

smach3_1.gif

If you type the following command while TASK2 is being executed, TASK2 will be stopped.

4th terminal


rostopic pub -1 /sm_stop std_msgs/Empty "{}" 

smach3_2.gif

reference

smach tutorial

Link to table of contents page

Link to ROS course table of contents

Recommended Posts

ROS Lecture 113 Perform tasks with smach
ROS Lecture 108 Using Database (mongo) with ROS
Secure ROS with AppArmor
Perform logical operations with Perceptron
Perform Stratified Split with PyTorch
ROS course 105 Operate toio with ROS