Skip to main content
Version: 9

Run an Existing Objective

This guide shows how to launch an Objective that already exists in your robot configuration. For both transports the entry point is the /do_objective action (moveit_studio_sdk_msgs/action/DoObjectiveSequence), with objective_name set to the Objective shown in the MoveIt Pro UI. A synchronous service alternative — /execute_objective — is described at the end of this page.

If you have not configured a transport yet, see the Programmatic SDKs overview.

danger

With great power comes great responsibility. If you launch DoObjectiveSequence and lose the goal handle (RCL) or goal ID (websocket), you will no longer be able to cancel the running Objective from that client. If the Objective has no end condition, you will need to use the web UI to cancel it or restart MoveIt Pro.

warning

Only one Objective runs at a time. Sending a new goal cancels any currently running Objective.

Launch an Objective

"""Run an existing Objective via the DoObjectiveSequence action."""

from typing import Any, List, Optional

import rclpy
from rclpy.action import ActionClient
from rclpy.action.client import ClientGoalHandle
from rclpy.node import Node
from rclpy.task import Future

from moveit_studio_sdk_msgs.action import DoObjectiveSequence


def main(args: Optional[List[str]] = None) -> None:
rclpy.init(args=args)
node: Node = Node("objective_client")
action_client: ActionClient = ActionClient(node, DoObjectiveSequence, "/do_objective")
goal_handle: Optional[ClientGoalHandle] = None

try:
if not action_client.wait_for_server(timeout_sec=5.0):
node.get_logger().error("Action server not available after 5s")
return

goal_msg = DoObjectiveSequence.Goal(objective_name="Close Gripper")

send_goal_future: Future = action_client.send_goal_async(goal_msg)
rclpy.spin_until_future_complete(node, send_goal_future)

goal_handle = send_goal_future.result()
if not goal_handle or not goal_handle.accepted:
node.get_logger().error("Objective rejected by server")
return

result_future: Future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(node, result_future)

result_response: Optional[Any] = result_future.result()
if result_response is None:
node.get_logger().error("Failed to get result from action server")
return

result: DoObjectiveSequence.Result = result_response.result
if result.error_code.val == 1: # MoveItErrorCodes.SUCCESS
node.get_logger().info("Objective completed successfully")
else:
node.get_logger().error(
f"Objective failed: code={result.error_code.val} message={result.error_message}"
)
except KeyboardInterrupt:
if goal_handle:
goal_handle.cancel_goal_async()
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()

DoObjectiveSequence.Result.error_code is a moveit_msgs/MoveItErrorCodes. The full list of values is documented in MoveIt's MoveItErrorCodes.msg.

Synchronous vs. Asynchronous Execution

The action interface is inherently asynchronous. The RCL example above blocks the calling thread with spin_until_future_complete; the roslibpy example polls a flag. To overlap the Objective with other work, simply return from the launching code path and react in the result callback (websocket) or the result future (RCL).

def handle_result(future):
result = future.result().result
# ...

def handle_goal_response(future):
goal_handle = future.result()
if not goal_handle.accepted:
return
goal_handle.get_result_async().add_done_callback(handle_result)

send_goal_future = action_client.send_goal_async(goal_msg)
send_goal_future.add_done_callback(handle_goal_response)

Cancel a Running Objective

# Retain the goal handle from the send_goal_async future, then:
cancel_future = goal_handle.cancel_goal_async()
rclpy.spin_until_future_complete(node, cancel_future)

Using the ExecuteObjective Service

/execute_objective (moveit_studio_sdk_msgs/srv/ExecuteObjective) is a synchronous wrapper around DoObjectiveSequence: the service call returns once the Objective finishes. It accepts the same fields as the action goal, including parameter_overrides. Use it when the calling code does not need feedback or asynchronous control and would otherwise spin its own state machine.

import rclpy
from rclpy.node import Node
from moveit_studio_sdk_msgs.srv import ExecuteObjective

rclpy.init()
node = Node("execute_objective_client")
client = node.create_client(ExecuteObjective, "/execute_objective")
client.wait_for_service(timeout_sec=5.0)

request = ExecuteObjective.Request()
request.objective_name = "Look at Table"

future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)
response = future.result()
print(f"Error code: {response.error_code.val}")

node.destroy_node()
rclpy.shutdown()

Next Steps