Python API to externally trigger MoveIt Pro
The MoveIt Pro SDK provides an ObjectiveManager
Python class which can be used to start and stop Objectives.
This ObjectiveManager
handles connecting to the required ROS service servers that live in the Bridge (/execute_objective
and /cancel_objective
):
As shown in the image above, users can use the action client directly to control Objectives, but this approach requires users to manage all of the Objective cancellation logic.
To simplify programmatic Objective control, we recommended using the ObjectiveManager
.
The ObjectiveManager
supports starting Objectives both synchronously and asynchronously.
The moveit_studio_py/examples
directory in the MoveIt Pro SDK has example scripts that demonstrate how to use the ObjectiveManager
API.
These example scripts are explained below.
The example commands presented below assume that MoveIt Pro is running with the picknik_ur_mock_hw_config
configuration package.
If MoveIt Pro isn't already running with this configuration, you can do so by running moveit_pro run -c picknik_ur_mock_hw_config
.
Running Objectives Synchronously
The following example shows how to make a synchronous (blocking) execute Objective call.
The key thing to notice here is that when calling ObjectiveManager.start_objective
, we set blocking=True
:
# Start an Objective and block until the Objective has completed execution.
import argparse
import rclpy
from moveit_studio_py.objective_manager import ObjectiveManager
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"objective_name", type=str, help="Name of the Objective to start."
)
args = parser.parse_args()
rclpy.init()
objective_manager = ObjectiveManager()
print(f"Starting {args.objective_name}.")
success, error_message = objective_manager.start_objective(
args.objective_name, blocking=True
)
if success:
print("Objective executed successfully!")
else:
print(error_message)
rclpy.shutdown()
if __name__ == "__main__":
main()
We recommend running an Objective synchronously if the Objective does not take long to complete.
To test this example script, let's run a simple Objective like Close Gripper
.
Run the following command in the MoveIt Pro Agent container:
ros2 run moveit_studio_py start_blocking.py "Close Gripper"
The logs in the web app should have indicated that the Objective executed successfully. You can also verify that the gripper was closed by inspecting the robot visualization in the web app.
We can also re-open the gripper that we just closed:
ros2 run moveit_studio_py start_blocking.py "Open Gripper"
Again, the logs and robot visualization in the web app should confirm that the gripper is now open.
Running Objectives Asynchronously
For Objectives that can take a long time to complete, or run indefinitely until they fail or a user cancels them, it is typically useful to not block code execution while the Objective runs.
The ObjectiveManager
can also run Objectives asynchronously.
To do this, the blocking
parameter of the start_objective
call should be False
, and a callback (parameter async_callback
) should also be passed to start_objective
.
The callback is triggered when the asynchronous Objective has completed execution.
Here's an example script that implements this:
# Starts an Objective asynchronously, and then stops the running Objective after waiting for a user-defined
# number of seconds.
import argparse
import rclpy
import time
from moveit_msgs.msg import MoveItErrorCodes
from moveit_studio_py.objective_manager import ObjectiveManager
done = False
def done_cb(future: rclpy.task.Future) -> None:
"""
Callback that is triggered when an Objective that was started asynchronously is done executing.
Args:
future: the Objective's future, which contains info about the completion status of the Objective.
"""
result = future.result()
if result.error_code.val == MoveItErrorCodes.SUCCESS:
print("Objective executed successfully!")
elif result.error_code.val == MoveItErrorCodes.PREEMPTED:
print("Objective STOPPED by user")
else:
print("Objective FAILED")
print(f"MoveItErrorCode Value: {result.error_code.val}")
global done
done = True
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"objective_name", type=str, help="Name of the Objective to start."
)
parser.add_argument(
"wait_time",
type=float,
default=0.0,
help="Time to wait (in seconds) before cancelling the Objective.",
)
args = parser.parse_args()
if args.wait_time < 0:
raise ValueError("wait_time must be a value >= 0.")
rclpy.init()
objective_manager = ObjectiveManager()
print(f"Starting {args.objective_name}.")
global done
done = False
objective_manager.start_objective(
args.objective_name, blocking=False, async_callback=done_cb
)
cancel_time = time.time() + args.wait_time
while not done and time.time() < cancel_time:
time.sleep(1)
if not done:
print(f"Stopping {args.objective_name}.")
objective_manager.stop_objective()
rclpy.shutdown()
if __name__ == "__main__":
main()
The done_cb
method in this example is quite generic, and can be used/modified as needed for your own application.
What's important to note about the asynchronous callback is that it must accept a rclpy.task.Future
as its only argument.
The Future holds the result of the ExecuteObjective
service type (see moveit_studio_sdk_msgs/srv/ExecuteObjective.srv
in the MoveIt Pro SDK), which lets users know whether the Objective completed successfully or not.
The 3 Waypoints Pick and Place
Objective is a good Objective to run asynchronously, since it will run indefinitely until a user stops the Objective or some other error occurs.
Let's run this Objective for 10 seconds.
During the 10 second Objective timeout, be sure to look at the web app to verify that the Objective is executing:
ros2 run moveit_studio_py start_stop_async.py "3 Waypoints Pick and Place" 10
Running the MoveIt Pro SDK Separately From the MoveIt Pro Agent
So far, all of the examples above were ran from the MoveIt Pro Agent container.
One powerful feature of the MoveIt Pro SDK is that it can be used in isolation from MoveIt Pro on a ROS 2 enabled system.
To test this in isolation, the README
in the MoveIt Pro SDK documents a Docker workflow that can be ran locally.
Make sure that you use the moveit_studio_sdk
repository tag that matches your MoveIt Pro release.
For example, if you are using the 3.0.0
release, you should clone the repository as follows.
git clone -b 3.0.0 https://github.com/PickNikRobotics/moveit_studio_sdk.git