Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Cartesian paths #8

Merged
merged 3 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ex_pose_goal.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def main(args=None):
node.get_logger().info(
f"Moving to {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}"
)
moveit2.move_to_pose(position=position, quat_xyzw=quat_xyzw)
moveit2.move_to_pose(position=position, quat_xyzw=quat_xyzw, cartesian=False)
moveit2.wait_until_executed()

rclpy.shutdown()
Expand Down
103 changes: 97 additions & 6 deletions pymoveit2/moveit2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
OrientationConstraint,
PositionConstraint,
)
from moveit_msgs.srv import GetMotionPlan, GetPositionFK, GetPositionIK
from moveit_msgs.srv import (
GetCartesianPath,
GetMotionPlan,
GetPositionFK,
GetPositionIK,
)
from rclpy.action import ActionClient
from rclpy.callback_groups import CallbackGroup
from rclpy.node import Node
Expand Down Expand Up @@ -131,6 +136,20 @@ def __init__(
)
self.__kinematic_path_request = GetMotionPlan.Request()

# Create a separate service client for Cartesian planning
self._plan_cartesian_path_service = self._node.create_client(
srv_type=GetCartesianPath,
srv_name="compute_cartesian_path",
qos_profile=QoSProfile(
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
),
callback_group=callback_group,
)
self.__cartesian_path_request = GetCartesianPath.Request()

# Create action client for trajectory execution
self.__follow_joint_trajectory_action_client = ActionClient(
node=self._node,
Expand Down Expand Up @@ -208,6 +227,7 @@ def move_to_pose(
tolerance_position: float = 0.001,
tolerance_orientation: float = 0.001,
weight_position: float = 1.0,
cartesian: bool = False,
weight_orientation: float = 1.0,
):
"""
Expand Down Expand Up @@ -255,6 +275,7 @@ def move_to_pose(
tolerance_orientation=tolerance_orientation,
weight_position=weight_position,
weight_orientation=weight_orientation,
cartesian=cartesian,
)
)

Expand All @@ -263,6 +284,7 @@ def move_to_configuration(
joint_positions: List[float],
joint_names: Optional[List[str]] = None,
tolerance: float = 0.001,
cartesian: bool = False,
weight: float = 1.0,
):
"""
Expand Down Expand Up @@ -303,6 +325,7 @@ def move_to_configuration(
joint_names=joint_names,
tolerance_joint_position=tolerance,
weight_joint_position=weight,
cartesian=cartesian,
)
)

Expand All @@ -322,6 +345,7 @@ def plan(
weight_orientation: float = 1.0,
weight_joint_position: float = 1.0,
start_joint_state: Optional[Union[JointState, List[float]]] = None,
cartesian: bool = False,
) -> Optional[JointTrajectory]:
"""
Plan motion based on previously set goals. Optional arguments can be passed in to
Expand Down Expand Up @@ -372,12 +396,15 @@ def plan(
self.__move_action_goal.request.start_state.joint_state = self.joint_state

# Plan trajectory by sending a goal (blocking)
if self.__execute_via_moveit:
# Use action client
joint_trajectory = self._send_goal_move_action_plan_only()
if cartesian:
joint_trajectory = self._plan_cartesian_path()
else:
# Use service
joint_trajectory = self._plan_kinematic_path()
if self.__execute_via_moveit:
# Use action client
joint_trajectory = self._send_goal_move_action_plan_only()
else:
# Use service
joint_trajectory = self._plan_kinematic_path()

# Clear all previous goal constrains
self.clear_goal_constraints()
Expand Down Expand Up @@ -873,6 +900,70 @@ def _plan_kinematic_path(
)
return None

def _plan_cartesian_path(
self,
max_step: float = 0.0025,
wait_for_server_timeout_sec: Optional[float] = 1.0,
) -> Optional[JointTrajectory]:
# Re-use request from move action goal
self.__cartesian_path_request.start_state = (
self.__move_action_goal.request.start_state
)
self.__cartesian_path_request.group_name = (
self.__move_action_goal.request.group_name
)
self.__cartesian_path_request.link_name = self.__end_effector_name
self.__cartesian_path_request.max_step = max_step

stamp = self._node.get_clock().now().to_msg()
self.__cartesian_path_request.header.stamp = stamp

self.__cartesian_path_request.path_constraints = (
self.__move_action_goal.request.path_constraints
)
for (
position_constraint
) in self.__cartesian_path_request.path_constraints.position_constraints:
position_constraint.header.stamp = stamp
for (
orientation_constraint
) in self.__cartesian_path_request.path_constraints.orientation_constraints:
orientation_constraint.header.stamp = stamp
# no header in joint_constraint message type

target_pose = Pose()
target_pose.position = (
self.__move_action_goal.request.goal_constraints[-1]
.position_constraints[-1]
.constraint_region.primitive_poses[0]
.position
)
target_pose.orientation = (
self.__move_action_goal.request.goal_constraints[-1]
.orientation_constraints[-1]
.orientation
)

self.__cartesian_path_request.waypoints = [target_pose]
AndrejOrsula marked this conversation as resolved.
Show resolved Hide resolved

if not self._plan_cartesian_path_service.wait_for_service(
timeout_sec=wait_for_server_timeout_sec
):
self._node.get_logger().warn(
f"Service '{self._plan_cartesian_path_service.srv_name}' is not yet available. Better luck next time!"
)
return None

res = self._plan_cartesian_path_service.call(self.__cartesian_path_request)

if MoveItErrorCodes.SUCCESS == res.error_code.val:
return res.solution.joint_trajectory
else:
self._node.get_logger().warn(
f"Planning failed! Error code: {res.error_code.val}."
)
return None

def _send_goal_async_move_action(
self, wait_for_server_timeout_sec: Optional[float] = 1.0
):
Expand Down