Python API to externally trigger MoveIt Studio

The MoveIt Studio web app provides a graphical interface to develop and execute Objectives. However, you may want to execute these Objectives via code or using your own external application.

The Objective Server, which is a part of the MoveIt Studio Agent, is the means by which MoveIt Studio internally schedules Objectives to be run. It advertises a ROS 2 Action Interface for loading, running, and halting Objectives. Specifically, the MoveIt Studio Agent provides a /do_objective action which loads a behavior tree into the Agent and then begins planning and execution.

You can use any ROS 2 client library capable of sending action goals, such as rclcpp or rclpy, to interact with the MoveIt Studio Objective Server directly. By doing this, you can load, run, and halt Objectives programmatically. In this tutorial, you will learn how to interact with the MoveIt Studio Objective Server using Python code.

Using Python to Interact with the Objective Server

Since the Objective Server utilizes a ROS 2 Action Interface, you can interact with it by providing an Action Client to send requests to the Action Server. As an example, this tutorial uses a Python script demonstrating how to create an Action Client for sending Objective requests to the Agent. Feel free to adapt this script to your application or to serve as inspiration for your own implementation.

The example script can be found in the example MoveIt Studio workspace, at moveit_studio_ws/src/moveit_studio_agent_examples/scripts/call_do_objective.py. This example workspace can be found locally in the directory where MoveIt Studio was installed (by default at $HOME/moveit_studio). This script is also provided below:

#!/usr/bin/env python3

# Copyright 2023 Picknik Inc.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#
#    * Redistributions in binary form must reproduce the above copyright
#      notice, this list of conditions and the following disclaimer in the
#      documentation and/or other materials provided with the distribution.
#
#    * Neither the name of the Picknik Inc. nor the names of its
#      contributors may be used to endorse or promote products derived from
#      this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


import argparse
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from moveit_msgs.msg import MoveItErrorCodes
from moveit_studio_agent_msgs.action import DoObjectiveSequence


class DoObjectiveSequenceClient(Node):
    """
    ROS 2 node that acts as an Action Client for MoveIt Studio's Objective Server.
    """

    def __init__(self):
        super().__init__("DoObjectiveSequence")
        self._action_client = ActionClient(self, DoObjectiveSequence, "do_objective")

    def send_goal(self, objective_name, cancel):
        """
        Sends a DoObjectiveSequence Goal to the Objective Server via the node's Action Client.

        Args:
            objective_name: the (string) name of an objective to run.

        Returns:
            goal_future: a rclpy.task.Future to a rclpy.action.client.ClientGoalHandle.
        """
        goal_msg = DoObjectiveSequence.Goal()
        goal_msg.objective_name = objective_name
        self.cancel = cancel
        self._action_client.wait_for_server()
        self._send_goal_future = self._action_client.send_goal_async(goal_msg)
        self._send_goal_future.add_done_callback(self.goal_response_callback)
        return self._send_goal_future

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info("Goal rejected.")

            rclpy.shutdown()
            return

        self._goal_handle = goal_handle
        self.get_logger().info("Goal accepted...")

        get_result_future = goal_handle.get_result_async()
        get_result_future.add_done_callback(self.get_result_callback)
        # Cancel the goal after a set amount of time (in seconds)
        if self.cancel:
            self._timer = self.create_timer(2.0, self.cancel_goal)

    def get_result_callback(self, future):
        result = future.result().result
        if result.error_code.val == MoveItErrorCodes.SUCCESS:
            self.get_logger().info("Objective succeeded!")
        elif hasattr(result.error_code, "error_message"):
            self.get_logger().info(
                f"Objective failed: {result.error_code.error_message}"
            )
        else:
            self.get_logger().info(
                f"Objective failed. MoveItErrorCode Value: {result.error_code.val}"
            )

        rclpy.shutdown()

    def cancel_goal(self):
        """
        Cancels an Objective Server's DoObjectiveSequence Goal via the node's Action Client.

        Returns:
            future: a rclpy.task.Future that completes when the goal is canceled.
        """
        self.get_logger().info("Attempting to cancel goal.")
        future = self._goal_handle.cancel_goal_async()
        future.add_done_callback(self.cancel_goal_callback)
        # Cancel the timer that this was a part of.
        self._timer.cancel()
        return future

    def cancel_goal_callback(self, future):
        cancel_response = future.result()
        if cancel_response.goals_canceling:
            self.get_logger().info("Goal successfully canceled.")
        else:
            self.get_logger().info("Goal failed to cancel.")

        rclpy.shutdown()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("objective_name", type=str, help="Name of Objective to run.")
    parser.add_argument(
        "--cancel",
        action="store_true",
        help="Optional boolean for if the objective should be automatically cancelled after a set amount of time.",
    )
    args = parser.parse_args()

    rclpy.init()

    client = DoObjectiveSequenceClient()

    client.send_goal(args.objective_name, args.cancel)

    rclpy.spin(client)


if __name__ == "__main__":
    main()

This is a relatively simple example; the script first sets up a Node which will behave as an Action Client. Next, it takes in an argument from the command line and uses this to construct a DoObjectiveSequence Goal object corresponding to an Objective we wish to run.

Note

The Objective Server uses moveit_studio_agent_msgs to define its internal state and operations, including actions for the Action Interface such as DoObjectiveSequence. Since this is a package within MoveIt Studio, make sure that the package containing your script is part of a MoveIt Studio User Workspace.

Tip

Be sure to add moveit_studio_agent_msgs as a dependency in your packages.xml and CMakeLists.txt if using a custom ROS 2 package.

Finally, the script sends this goal to the Action Client and waits until the asynchronous request is completed. The other functions can be ignored for this section of the tutorial, as these are callbacks useful for logging errors and responses from the Action Server.

To run this for the “Close Gripper” Objective, you can run:

ros2 run moveit_studio_agent_examples call_do_objective.py "Close Gripper"

You should now see the gripper close either in the MoveIt Studio UI if running a simulated robot, or on your physical robot if running on hardware. Congratulations, you just called an Objective manually!

Canceling an Objective

As an example for how to cancel a running Objective, the call_do_objective.py has an optional argument --cancel, which will automatically cancel the running Objective after a fixed amount of time. To do this, the following functions have been implemented:

def cancel_goal(self):
    """
    Cancels an Objective Server's DoObjectiveSequence Goal via the node's Action Client.

    Returns:
        future: a rclpy.task.Future that completes when the goal is canceled.
    """
    self.get_logger().info("Attempting to cancel goal.")
    future = self._goal_handle.cancel_goal_async()
    future.add_done_callback(self.cancel_goal_callback)
    # Cancel the timer that this was a part of.
    self._timer.cancel()
    return future

def cancel_goal_callback(self, future):
    cancel_response = future.result()
    if cancel_response.goals_canceling:
        self.get_logger().info("Goal successfully canceled.")
    else:
        self.get_logger().info("Goal failed to cancel.")

    rclpy.shutdown()

The cancel_goal function has been added to a timer in the goal_response_callback function so we can cancel an accepted Objective. The timer that calls cancel_goal implements how long to wait before attempting to cancel the Objective.

Currently to cancel a running Objective, we need the ClientGoalHandle for the Objective. This handle is stored from the send_goal function as self._send_goal_future. Once we have the goal’s handle, we call cancel_goal_async() on it (or cancel_goal() for the synchronous version).

The cancel_goal_callback function then shows the result of the cancellation attempt by checking the cancel_goal_async()’s future’s result.

To demonstrate an Objective with --cancel, run:

ros2 run moveit_studio_agent_examples call_do_objective.py --cancel "3 Waypoint Pick and Place"

This should begin the “3 Waypoint Pick and Place” example which will be interrupted after a set amount of time due to the cancellation request.

Calling Objectives with Parameters

Some Objectives require additional arguments, which can be added via a BehaviorParameter.

As an example, we will add one such parameter to the “Move to Joint State” Objective, which requires a waypoint_name parameter as input. We will specify the desired goal waypoint as an argument to the script in place of the objective name.

To set this parameter, we are going to need to modify call_do_objective.py.

Feel free to use this example as a reference to implement any Objective using any input ports needed. Go to the directory where MoveIt Studio was installed (the default location being $HOME/moveit_studio). Open moveit_studio_ws/src/moveit_studio_agent_examples/scripts/call_do_objective.py in a text editor or IDE.

We first need to import ROS 2 message types for Behavior Parameters, found in the moveit_studio_behavior_msgs package. Specifically, we will import the BehaviorParameter and BehaviorParameterDescription messages:

from moveit_msgs.msg import MoveItErrorCodes
from moveit_studio_agent_msgs.action import DoObjectiveSequence
from moveit_studio_behavior_msgs.msg import (
    BehaviorParameter,
    BehaviorParameterDescription,
)

Next, edit the send_goal function so that the action goal provides an override for the waypoint_name parameter, as shown below:

def send_goal(self, waypoint_name, cancel):
    goal_msg = DoObjectiveSequence.Goal()
    goal_msg.objective_name = "Move to Joint State"
    self.cancel = cancel

    behavior_parameter = BehaviorParameter()
    behavior_parameter.behavior_namespaces.append("move_to_joint_state")
    behavior_parameter.description.name = "waypoint_name"
    behavior_parameter.description.type = BehaviorParameterDescription.TYPE_STRING
    behavior_parameter.string_value = waypoint_name
    goal_msg.parameter_overrides = [behavior_parameter]

    self._action_client.wait_for_server()
    result = self._action_client.send_goal_async(goal_msg)
    return result

Note that we have changed the parameter to send_goal from objective_name to waypoint_name since this function is hardcoded for the “Move to Joint State” Objective now.

Finally, we update the script’s objective_name argument to waypoint_name to reflect our changes:

def main(args=None):
    parser = argparse.ArgumentParser()
    parser.add_argument("waypoint_name", type=str, help="Name of waypoint to move to.")

Overriding parameters when starting an Objective will vary depending on the parameter name and type. However, the general approach is as follows:

  • We first create a new BehaviorParameter() object.

  • Next, we append the Objective name (in snake-case format) to the parameter object’s behavior_namespaces.

  • We then set the description.name, which is the name of the parameter we are setting.

  • Next, we set the description.type. This and the next line will change depending on the type of Input Port being set which depends on the Objective and the Behaviors it uses. For Move to Joint State we need a string specifying which waypoint to move to. We specify this by setting the description.type to the BehaviorParameterDescription enum type TYPE_STRING.

    Tip

    Other enum options currently supported are TYPE_POSE, TYPE_DOUBLE, TYPE_INT, TYPE_BOOL, and TYPE_JOINT_STATE.

  • We then set the corresponding <type_name>_value (in our case TYPE_STRING corresponds to string_value) to the name of the waypoint we wish to go to.

  • Finally, we add this Behavior parameter to the parameter_overrides list of the goal_msg.

That’s it! You can now rebuild your user workspace and execute this as follows, specifying the name of the waypoint to move to:

ros2 run moveit_studio_agent_examples call_do_objective.py "Behind"

The finished source code for this example is provided in moveit_studio_ws/src/moveit_studio_agent_examples/scripts/call_do_objective_waypoint.py if you want a finished reference. You can make similar modifications for any Objective you wish to call. Feel free to experiment and create your own scripts using your favorite ROS 2 client library.

Note

If you are creating a new ROS 2 package, don’t forget to add the moveit_studio_agent_msgs and moveit_studio_behavior_msgs packages as dependencies in your package’s package.xml and CMakeLists.txt files!