Skip to main content
Version: 8

Websocket Client Library API

External system diagram

After you have built an application with MoveIt Pro, you may need to integrate it with a higher layer system, typically if you want to run application code that can be ROS agnostic.

This page focuses on using a Websocket Client Library to communicate with MoveIt Pro via rosbridge. If you are using a native ROS 2 Client Library (RCL) such as rclpy or rclcpp, see the Native ROS Client Library API.

Websocket client libraries connect to the rosbridge server and speak the rosbridge JSON protocol, allowing non-ROS applications to publish/subscribe, call services, and interact with ROS actions. Common websocket libraries include:

Setup

The MoveIt Pro Runtime runs a rosbridge server listening on port 3201. To connect to it across your network, expose that port in your robot's firewall settings.

The following examples are in Python but could be implemented in any of the languages above.

Running an Objective

To run an Objective from your application using websockets, send a goal to the /do_objective action.

For example, to communicate with MoveIt Pro from a Python script you can install the roslibpy library. Note that ROS2 action support is only available in the GitHub main branch, not yet in the pip release.

# Create and activate a virtual environment
python3 -m venv ~/roslibpy_venv
source ~/roslibpy_venv/bin/activate

# Install from GitHub main branch for ROS2 action support
pip install git+https://github.com/RobotWebTools/roslibpy.git@main

Note: Remember to activate the virtual environment (source ~/roslibpy_venv/bin/activate) before running your Python scripts that use roslibpy.

Here is an example Python script that will start the "Close Gripper" Objective via the DoObjectiveSequence action, wait for the result, and then print the outcome. Note that you will need to know the IP address of your robot if not running on the same machine.

import roslibpy
from roslibpy import ActionClient
import time

# Connect a client to the rosbridge running on the robot
# Replace the host with the IP of your robot
client = roslibpy.Ros(host='localhost', port=3201)

# Connect first before creating the action client
client.run()

# Define the action client for DoObjectiveSequence using ROS2 ActionClient
action = ActionClient(
client,
'/do_objective',
'moveit_studio_sdk_msgs/action/DoObjectiveSequence'
)

# Track completion
action_done = False
result_data = None

def on_result(result):
global action_done, result_data
print('Action completed!')
# The actual result data is in result['values']
error_code = result.get('values', {}).get('error_code', {})
print(f"Error code: {error_code.get('val', 'unknown')}")
print(f"Error message: {error_code.get('message', 'none')}")
result_data = result
action_done = True

def on_feedback(feedback):
print(f"Feedback: {feedback}")

def on_error(error):
global action_done
print(f"Error: {error}")
action_done = True

# Send goal
goal = {"objective_name": "Close Gripper"}
print(f"Sending goal: {goal['objective_name']}")

goal_id = action.send_goal(goal, on_result, on_feedback, on_error)
print(f"Goal ID: {goal_id}")

# Wait for result
print("Waiting for result...")
timeout = 30
start = time.time()
while not action_done and (time.time() - start) < timeout:
time.sleep(0.1)

if action_done:
print("Goal completed!")
else:
print("Timeout waiting for result")

# Clean up
client.terminate()

Running Objectives Synchronously vs. Asynchronously

The basic example above is synchronous - it waits for the action to complete. For asynchronous operation, you can send the goal and continue with other work while the action runs:

import roslibpy
from roslibpy import ActionClient
import time

# Connect first
client = roslibpy.Ros(host='localhost', port=3201)
client.run()

# Create action client after connection
action = ActionClient(client, '/do_objective', 'moveit_studio_sdk_msgs/action/DoObjectiveSequence')

action_done = False

# Asynchronous example with callbacks
def on_result(result):
global action_done
print('Close Gripper completed!')
error_code = result.get('values', {}).get('error_code', {})
print(f"Error code: {error_code.get('val', 'unknown')}")
action_done = True

def on_feedback(feedback):
print(f"Progress: {feedback}")

def on_error(error):
global action_done
print(f"Action failed: {error}")
action_done = True

# Send goal asynchronously
goal = {"objective_name": "Close Gripper"}
goal_id = action.send_goal(goal, on_result, on_feedback, on_error)
print(f"Started objective with goal ID: {goal_id}")

# Continue doing other work here
for i in range(5):
if action_done:
break
print(f"Doing other work... {i+1}/5")
time.sleep(1)

# Wait if not done
while not action_done:
time.sleep(0.1)

client.terminate()
warning

Only one Objective runs at a time. Sending a new goal will cancel any currently running Objective.

Cancelling Objectives

To explicitly cancel a running objective:

# Store the goal ID when sending
goal_id = action.send_goal(goal, on_result, on_feedback, on_error)

# Cancel it later
action.cancel_goal(goal_id)

Note: Cancellation currently triggers the error callback.

Through the rosbridge protocol you have access to the full ROS messaging system inside MoveIt Pro. Some examples of projects you could build using this interface:

  • Webapp UI
  • Machine Learning Image Pipeline
  • Multi-Robot Coordination
  • Factory Controller
  • Mobile App

Websockets are useful when you need to interact with a ROS based robot system across a network.

Running an Objective with XML-Based Port Remapping

It is also possible to run an Objective or subtree that already exists while remapping ports using custom XML. The following example wraps the Move to Waypoint subtree and remaps the waypoint_name port to Look at Table.

import roslibpy
from roslibpy import ActionClient
import time

client = roslibpy.Ros(host='localhost', port=3201)
client.run()

action = ActionClient(
client,
'/do_objective',
'moveit_studio_sdk_msgs/action/DoObjectiveSequence'
)

# Define an XML string that remaps the waypoint_name port
objective_xml = """
<root BTCPP_format="4" main_tree_to_execute="Move to Waypoint Remapped">
<BehaviorTree ID="Move to Waypoint Remapped">
<Control ID="Sequence" name="TopLevelSequence">
<SubTree
ID="Move to Waypoint"
_collapsed="true"
waypoint_name="Look at Table"
joint_group_name="manipulator"
controller_names="/joint_trajectory_controller"
use_all_planners="false"
/>
</Control>
</BehaviorTree>
</root>
"""

done = False

def on_result(result):
global done
error_code = result.get('values', {}).get('error_code', {})
print(f"Error code: {error_code.get('val', 'unknown')}")
print(f"Error message: {error_code.get('message', 'none')}")
done = True

def on_feedback(feedback):
print(f"Feedback: {feedback}")

def on_error(error):
global done
print(f"Action failed: {error}")
done = True

goal = {"objective_xml_string": objective_xml}
goal_id = action.send_goal(goal, on_result, on_feedback, on_error)
print(f"Started remapped objective, goal ID: {goal_id}")

start = time.time()
while not done and (time.time() - start) < 60:
time.sleep(0.1)

client.terminate()

You can change the waypoint_name value to any valid waypoint in your configuration (for example, Park Far Right) to move the robot to different positions without modifying the underlying Objective.

Overriding Behavior Parameters

An alternative approach to port remapping is using the parameter_overrides field in the DoObjectiveSequence action goal. This allows you to dynamically override typed parameters for specific behaviors without modifying the Objective XML.

Parameter Types

The parameter_overrides field accepts an array of BehaviorParameter messages with the following type codes:

  • 0: STRING
  • 1: POSE
  • 2: DOUBLE
  • 3: INT
  • 4: BOOL
  • 5: JOINT_STATE

Structure Requirements

Each parameter override must include:

  • description: Object with name (string) and type (integer type code)
  • behavior_namespaces: Array of behavior names to apply the override to
  • All value fields initialized (even if unused): string_value, pose_value, double_value, int_value, bool_value, joint_state_value

Example: Overriding Motion Parameters

import roslibpy
from roslibpy import ActionClient
import time

client = roslibpy.Ros(host='localhost', port=3201)
client.run()

action = ActionClient(
client,
'/do_objective',
'moveit_studio_sdk_msgs/action/DoObjectiveSequence'
)

# Helper function to create properly structured parameter overrides
def create_parameter_override(name, param_type, value, behavior_namespace):
"""
Create a BehaviorParameter with all required fields.

Args:
name: Parameter name
param_type: Type code (0=STRING, 1=POSE, 2=DOUBLE, 3=INT, 4=BOOL, 5=JOINT_STATE)
value: The value to set
behavior_namespace: Behavior name to apply this override to
"""
param = {
"description": {
"name": name,
"type": param_type
},
"behavior_namespaces": [behavior_namespace],
# All value fields must be initialized
"string_value": "",
"pose_value": {
"header": {"frame_id": "", "stamp": {"sec": 0, "nanosec": 0}},
"pose": {
"position": {"x": 0.0, "y": 0.0, "z": 0.0},
"orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}
}
},
"double_value": 0.0,
"int_value": 0,
"bool_value": False,
"joint_state_value": {
"header": {"stamp": {"sec": 0, "nanosec": 0}, "frame_id": ""},
"name": [],
"position": [],
"velocity": [],
"effort": []
}
}

# Set the appropriate value field based on type
if param_type == 0: # STRING
param["string_value"] = str(value)
elif param_type == 2: # DOUBLE
param["double_value"] = float(value)
elif param_type == 3: # INT
param["int_value"] = int(value)
elif param_type == 4: # BOOL
param["bool_value"] = bool(value)

return param

# Create parameter overrides for motion control
parameter_overrides = [
create_parameter_override(
name="velocity_scale_factor",
param_type=2, # DOUBLE
value=0.5,
behavior_namespace="Move to Waypoint"
),
create_parameter_override(
name="acceleration_scale_factor",
param_type=2, # DOUBLE
value=0.3,
behavior_namespace="Move to Waypoint"
),
create_parameter_override(
name="waypoint_name",
param_type=0, # STRING
value="Home",
behavior_namespace="Move to Waypoint"
)
]

done = False

def on_result(result):
global done
error_code = result.get('values', {}).get('error_code', {})
print(f"Error code: {error_code.get('val', 'unknown')}")
print(f"Error message: {error_code.get('message', 'none')}")
done = True

def on_feedback(feedback):
print(f"Feedback: {feedback}")

def on_error(error):
global done
print(f"Action failed: {error}")
done = True

# Send goal with parameter overrides
goal = {
"objective_name": "Look at Table",
"parameter_overrides": parameter_overrides
}

goal_id = action.send_goal(goal, on_result, on_feedback, on_error)
print(f"Started objective with parameter overrides, goal ID: {goal_id}")

start = time.time()
while not done and (time.time() - start) < 60:
time.sleep(0.1)

client.terminate()

Using the ExecuteObjective Service

For synchronous execution, you can use the /execute_objective service instead of the action interface:

import roslibpy

client = roslibpy.Ros(host='localhost', port=3201)
client.run()

service = roslibpy.Service(
client,
'/execute_objective',
'moveit_studio_sdk_msgs/srv/ExecuteObjective'
)

# Use the same parameter_overrides structure as above
request = roslibpy.ServiceRequest({
"objective_name": "Look at Table",
# Use overrides here if necessary
# "parameter_overrides": parameter_overrides
})

result = service.call(request)
print(f"Service result: {result}")

client.terminate()
tip

Use parameter_overrides when you need to dynamically change behavior parameters at runtime without modifying Objective XML. This is ideal for applications where parameter values come from external systems (HMI, databases, sensors, etc.).