Python API to externally trigger MoveIt Pro
The MoveIt Pro SDK provides an ObjectiveManager
Python class which can be used to start and stop Objectives.
This ObjectiveManager
handles connecting to the required ROS service servers that live in the Bridge (/execute_objective
and /cancel_objective
):
As shown in the image above, users can use the action client directly to control Objectives, but this approach requires users to manage all of the Objective cancellation logic.
To simplify programmatic Objective control, we recommended using the ObjectiveManager
.
The ObjectiveManager
supports starting Objectives both synchronously and asynchronously.
The moveit_studio_py/examples
directory in the MoveIt Pro SDK has example scripts that demonstrate how to use the ObjectiveManager
API.
These example scripts are explained below.
Note
The example commands presented below assume that MoveIt Pro is running with the picknik_ur_mock_hw_config
configuration package.
If MoveIt Pro isn’t already running with this configuration, you can do so by running ./moveit_pro run -c picknik_ur_mock_hw_config
from the MoveIt Pro install directory.
Running Objectives Synchronously
The following example shows how to make a synchronous (blocking) execute Objective call.
The key thing to notice here is that when calling ObjectiveManager.start_objective
, we set blocking=True
:
# POSSIBILITY OF SUCH DAMAGE.
# Start an Objective and block until the Objective has completed execution.
import argparse
import rclpy
from moveit_studio_py.objective_manager import ObjectiveManager
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"objective_name", type=str, help="Name of the Objective to start."
)
args = parser.parse_args()
rclpy.init()
objective_manager = ObjectiveManager()
print(f"Starting {args.objective_name}.")
success, error_message = objective_manager.start_objective(
args.objective_name, blocking=True
)
if success:
print("Objective executed successfully!")
else:
print(error_message)
rclpy.shutdown()
if __name__ == "__main__":
main()
We recommend running an Objective synchronously if the Objective does not take long to complete.
To test this example script, let’s run a simple Objective like Close Gripper
.
Run the following command in the MoveIt Pro Agent container:
ros2 run moveit_studio_py start_blocking.py "Close Gripper"
The logs in the web app should have indicated that the Objective executed successfully. You can also verify that the gripper was closed by inspecting the robot visualization in the web app.
We can also re-open the gripper that we just closed:
ros2 run moveit_studio_py start_blocking.py "Open Gripper"
Again, the logs and robot visualization in the web app should confirm that the gripper is now open.
Running Objectives Asynchronously
For Objectives that can take a long time to complete, or run indefinitely until they fail or a user cancels them, it is typically useful to not block code execution while the Objective runs.
The ObjectiveManager
can also run Objectives asynchronously.
To do this, the blocking
parameter of the start_objective
call should be False
, and a callback (parameter async_callback
) should also be passed to start_objective
.
The callback is triggered when the asynchronous Objective has completed execution.
Here’s an example script that implements this:
# POSSIBILITY OF SUCH DAMAGE.
# Starts an Objective asynchronously, and then stops the running Objective after waiting for a user-defined
# number of seconds.
import argparse
import rclpy
import time
from moveit_msgs.msg import MoveItErrorCodes
from moveit_studio_py.objective_manager import ObjectiveManager
def done_cb(future: rclpy.task.Future) -> None:
"""
Callback that is triggered when an Objective that was started asynchronously is done executing.
Args:
future: the Objective's future, which contains info about the completion status of the Objective.
"""
result = future.result()
if result.error_code.val == MoveItErrorCodes.SUCCESS:
print("Objective executed successfully!")
elif result.error_message:
print(result.error_message)
else:
print(f"MoveItErrorCode Value: {result.error_code.val}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"objective_name", type=str, help="Name of the Objective to start."
)
parser.add_argument(
"wait_time",
type=float,
default=0.0,
help="Time to wait (in seconds) before cancelling the Objective.",
)
args = parser.parse_args()
if args.wait_time < 0:
raise ValueError("wait_time must be a value >= 0.")
rclpy.init()
objective_manager = ObjectiveManager()
print(f"Starting {args.objective_name}.")
objective_manager.start_objective(
args.objective_name, blocking=False, async_callback=done_cb
)
time.sleep(args.wait_time)
print(f"Stopping {args.objective_name}.")
objective_manager.stop_objective()
rclpy.shutdown()
if __name__ == "__main__":
main()
The done_cb
method in this example is quite generic, and can be used/modified as needed for your own application.
What’s important to note about the asynchronous callback is that it must accept a rclpy.task.Future
as its only argument.
The Future holds the result of the ExecuteObjective
service type (see moveit_studio_sdk_msgs/srv/ExecuteObjective.srv
in the MoveIt Pro SDK), which lets users know whether the Objective completed successfully or not.
The 3 Waypoints Pick and Place
Objective is a good Objective to run asynchronously, since it will run indefinitely until a user stops the Objective or some other error occurs.
Let’s run this Objective for 10 seconds.
During the 10 second Objective timeout, be sure to look at the web app to verify that the Objective is executing:
ros2 run moveit_studio_py start_stop_async.py "3 Waypoints Pick and Place" 10
Running an Objective With Parameter Overrides
Some Objectives depend on user-defined input.
The ObjectiveManager
supports this through a mechanism called parameter overrides.
The following example shows how a parameter override is used to set the target waypoint for the Move to Joint State
Objective:
# POSSIBILITY OF SUCH DAMAGE.
# Run the "Move to Waypoint" Objective with the waypoint defined as a parameter override.
import argparse
import rclpy
from moveit_studio_py.objective_manager import ObjectiveManager
from moveit_studio_sdk_msgs.msg import BehaviorParameter
from moveit_studio_sdk_msgs.msg import BehaviorParameterDescription
def main():
__OBJECTIVE_NAME = "Move to Waypoint"
parser = argparse.ArgumentParser()
parser.add_argument("waypoint_name", type=str, help="Name of waypoint to move to.")
args = parser.parse_args()
# define the Objective's target waypoint via parameter override
waypoint_override = BehaviorParameter()
waypoint_override.description.name = "waypoint_name"
waypoint_override.description.type = BehaviorParameterDescription.TYPE_STRING
waypoint_override.string_value = args.waypoint_name
waypoint_override.behavior_namespaces = ["move_to_waypoint"]
rclpy.init()
print(f"Starting {__OBJECTIVE_NAME}.")
objective_manager = ObjectiveManager()
parameter_overrides = [waypoint_override]
success, error_msg = objective_manager.start_objective(
__OBJECTIVE_NAME, parameter_overrides=parameter_overrides, blocking=True
)
if success:
print("Objective executed successfully!")
else:
print(error_msg)
rclpy.shutdown()
if __name__ == "__main__":
main()
The waypoint_override
variable (of type BehaviorParameter
- see moveit_studio_sdk_msgs/msg/BehaviorParameter
in the MoveIt Pro SDK) is what implements the parameter override.
Every BehaviorParameter
instance must have the following fields set:
description.name
: The name of the parameter that’s being set. If you’re unsure what the name should be, look for aparameter_name
attribute the Objective’s xml file. This is typically in elements with aGet<X>FromUser
attribute.description.type
: The type of the parameter that’s being set. This information can also be found in the Objective’s xml file. The type is typically embedded in theGet<X>FromUser
attribute.<type>_value
: The value of the parameter, given the parameter’s type.behavior_namespaces
: The name(s) of the Behavior(s) that this parameter applies to. Currently, parameters should only apply to one behavior.
Let’s take a look at the Move to Joint State
Objective’s xml file as an example for determining what all of these values should be for the waypoint parameter override.
You should see the following line in this file:
<Action ID="GetStringFromUser" parameter_name="move_to_waypoint.waypoint_name" parameter_value="{waypoint_name}" />
parameter_name="move_to_waypoint.waypoint_name"
tells us that behavior_namespaces
should be set to move_to_waypoint
, and that description.name
should be waypoint_name
.
ID="GetStringFromUser"
tells us that description.type
is BehaviorParameterDescription.TYPE_STRING
(see moveit_studio_sdk_msgs/msg/BehaviorParameterDescription
in the MoveIt Pro SDK for the full list of parameter types).
Since the parameter type is a string, <type>_value
resolves to string_value
.
Now that the parameter override has been explained, let’s run the script with a valid waypoint as the only runtime argument.
A list of valid waypoints can be found in the “Endpoints” tab of the web app.
This waypoint will be set to the string_value
field of the waypoint_override
variable.
For example, if “Look at Left Table” is a valid waypoint, we could run the following:
ros2 run moveit_studio_py move_to_waypoint.py "Look at Left Table"
The web app should now show the robot moving to the specified waypoint.
Running the MoveIt Pro SDK Separately From the MoveIt Pro Agent
So far, all of the examples above were ran from the MoveIt Pro Agent container.
One powerful feature of the MoveIt Pro SDK is that it can be used in isolation from MoveIt Pro on a ROS 2 enabled system.
To test this in isolation, the README
in the MoveIt Pro SDK documents a Docker workflow that can be ran locally.
Note
Make sure that you use the moveit_studio_sdk
repository tag that matches your MoveIt Pro release.
For example, if you are using the 3.0.0
release, you should clone the repository as follows.
git clone -b 3.0.0 https://github.com/PickNikRobotics/moveit_studio_sdk.git