Skip to content

Commit

Permalink
merged cache controls
Browse files Browse the repository at this point in the history
  • Loading branch information
neuenfeldttj committed Mar 19, 2024
2 parents 07a1bb4 + ae5bae3 commit 98ca58e
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 230 deletions.
4 changes: 3 additions & 1 deletion config/teleop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ teleop:
sensor_actuator:
multiplier: 1
slow_mode_multiplier: 0.5
xbox_index: 5
cache:
multiplier: 1
xbox_index: 4

drive_controls:
forward_back:
Expand Down
235 changes: 118 additions & 117 deletions src/teleoperation/backend/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def connect(self):
self.sa_throttle_cmd_pub = rospy.Publisher("sa_throttle_cmd", Throttle, queue_size=1)
self.sa_velocity_cmd_pub = rospy.Publisher("sa_velocity_cmd", Velocity, queue_size=1)
self.sa_position_cmd_pub = rospy.Publisher("sa_position_cmd", Position, queue_size=1)
self.cache_throttle_cmd_pub = rospy.Publisher("cache_throttle_cmd", Throttle, queue_size=1)
self.cache_velocity_cmd_pub = rospy.Publisher("cache_velocity_cmd", Velocity, queue_size=1)
self.cache_position_cmd_pub = rospy.Publisher("cache_position_cmd", Position, queue_size=1)

# Subscribers
self.pdb_sub = rospy.Subscriber("/pdb_data", PDLB, self.pdb_callback)
Expand Down Expand Up @@ -137,6 +140,10 @@ def disconnect(self, close_code):
self.led_sub.unregister()
self.nav_state_sub.unregister()
self.imu_calibration.unregister()
self.sa_temp_data.unregister()
self.sa_humidity_data.unregister()
self.sa_thermistor_data.unregister()
self.science_spectral.unregister()

def receive(self, text_data):
"""
Expand All @@ -154,8 +161,8 @@ def receive(self, text_data):
self.calibrate_motors(message)
elif message["type"] == "arm_adjust":
self.arm_adjust(message)
elif message["type"] == "arm_values":
self.handle_arm_message(message)
elif message["type"] == "arm_values" or message["type"] == "cache_values":
self.handle_controls_message(message)
elif message["type"] == "sa_arm_values":
self.handle_sa_arm_message(message)
elif message["type"] == "sa_enable_uv_bulb":
Expand Down Expand Up @@ -236,108 +243,84 @@ def to_velocity(self, input: int, joint_name: str, brushless: bool = True) -> fl
* (self.brushed_motors[joint_name]["max_velocity"] - self.brushed_motors[joint_name]["min_velocity"])
/ 2
)

def handle_sa_arm_message(self, msg):

def handle_controls_message(self, msg):
CACHE = ["cache"]
SA_NAMES = ["sa_x", "sa_y", "sa_z", "sampler", "sensor_actuator"]
RA_NAMES = self.RA_NAMES
ra_slow_mode = False
raw_left_trigger = msg["axes"][self.xbox_mappings["left_trigger"]]
left_trigger = raw_left_trigger if raw_left_trigger > 0 else 0
raw_right_trigger = msg["axes"][self.xbox_mappings["right_trigger"]]
right_trigger = raw_right_trigger if raw_right_trigger > 0 else 0
arm_pubs = [self.arm_position_cmd_pub,self.arm_velocity_cmd_pub,self.arm_throttle_cmd_pub,self.arm_ik_pub]
sa_pubs = [self.sa_position_cmd_pub,self.sa_velocity_cmd_pub,self.sa_throttle_cmd_pub]
cache_pubs = [self.cache_position_cmd_pub,self.cache_velocity_cmd_pub,self.cache_throttle_cmd_pub]
publishers = []
controls_names = []
if msg["type"] == "cache_values":
controls_names = CACHE
publishers = cache_pubs
elif msg["type"] == "arm_values":
controls_names = RA_NAMES
publishers = arm_pubs
elif msg["type"] == "sa_arm_values":
controls_names = SA_NAMES
publishers = sa_pubs

if msg["arm_mode"] == "ik":
base_link_in_map = SE3.from_tf_tree(self.tf_buffer, "map", "base_link")

left_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_trigger"]])
if left_trigger < 0:
left_trigger = 0

right_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["right_trigger"]])
if right_trigger < 0:
right_trigger = 0
base_link_in_map.position[0]+= self.ik_names["x"]*self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_x"]]),
base_link_in_map.position[1]+= self.ik_names["y"]*self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_y"]]),
base_link_in_map.position[2]-=self.ik_names["z"]*left_trigger+self.ik_names["z"]*right_trigger

arm_ik_cmd = IK(pose=Pose(position=Point(*base_link_in_map.position), orientation=Quaternion(*base_link_in_map.rotation.quaternion)))
publishers[3].publish(arm_ik_cmd)


if msg["arm_mode"] == "position":
sa_position_cmd = Position(
names=SA_NAMES,
positions=msg["positions"],
position_names = controls_names
if msg["type"] == "arm_values":
position_names = ["joint_b", "joint_c", "joint_de_pitch", "joint_de_roll"]
position_cmd = Position(
names=position_names,
positions=msg["positions"],
)
self.sa_position_cmd_pub.publish(sa_position_cmd)
publishers[0].publish(position_cmd)

elif msg["arm_mode"] == "velocity":
sa_velocity_cmd = Velocity()
sa_velocity_cmd.names = SA_NAMES
sa_velocity_cmd.velocities = [
velocity_cmd = Velocity()
velocity_cmd.names = controls_names
if msg["type"] == "cache_values":
velocity_cmd.velocities = [
self.sa_config["cache"]["multiplier"] *self.filter_xbox_button(msg["buttons"],
"right_bumper", "left_bumper")
]
elif msg["type"] == "sa_arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_x"]["xbox_index"]]), "sa_x", False
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_y"]["xbox_index"]]), "sa_y", False
),
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_z"]["xbox_index"]]), "sa_z", False
self.filter_xbox_axis(msg["axes"][self.sa_config["sa_z"]["xbox_index"]]), "sa_z", True
),
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper"),
]

self.sa_velocity_cmd_pub.publish(sa_velocity_cmd)

elif msg["arm_mode"] == "throttle":
sa_throttle_cmd = Throttle()
sa_throttle_cmd.names = SA_NAMES

sa_throttle_cmd.throttles = [
self.sa_config[name]["multiplier"]
* self.filter_xbox_axis(msg["axes"][self.sa_config["sa_x"]["xbox_index"]], 0.15, True),
self.sa_config[name]["multiplier"]
* self.filter_xbox_axis(msg["axes"][self.sa_config["sa_y"]["xbox_index"]], 0.15, True),
self.sa_config[name]["multiplier"]
* self.filter_xbox_axis(msg["axes"][self.sa_config["sa_z"]["xbox_index"]], 0.15, True),
]
sa_throttle_cmd.throttles.extend(
[
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper"),
]
)

fast_mode_activated = msg["buttons"][self.xbox_mappings["a"]] or msg["buttons"][self.xbox_mappings["b"]]
if not fast_mode_activated:
for i, name in enumerate(SA_NAMES):
# When going up (vel > 0) with SA joint 2, we DON'T want slow mode.
if not (name == "sa_y" and sa_throttle_cmd.throttles[i] > 0):
sa_throttle_cmd.throttles[i] *= self.sa_config[name]["slow_mode_multiplier"]

self.sa_throttle_cmd_pub.publish(sa_throttle_cmd)

def handle_arm_message(self, msg):
ra_slow_mode = False
if msg["arm_mode"] == "ik":
base_link_in_map = SE3.from_tf_tree(self.tf_buffer, "map", "base_link")

left_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_trigger"]])
if left_trigger < 0:
left_trigger = 0

right_trigger = self.filter_xbox_axis(msg["axes"][self.xbox_mappings["right_trigger"]])
if right_trigger < 0:
right_trigger = 0
base_link_in_map.position[0] += (
self.ik_names["x"] * self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_x"]]),
)
base_link_in_map.position[1] += (
self.ik_names["y"] * self.filter_xbox_axis(msg["axes"][self.xbox_mappings["left_js_y"]]),
)
base_link_in_map.position[2] -= self.ik_names["z"] * left_trigger + self.ik_names["z"] * right_trigger

arm_ik_cmd = IK(
pose=Pose(
position=Point(*base_link_in_map.position),
orientation=Quaternion(*base_link_in_map.rotation.quaternion),
)
)
self.arm_ik_pub.publish(arm_ik_cmd)

elif msg["arm_mode"] == "position":
arm_position_cmd = Position(
names=["joint_b", "joint_c", "joint_de_pitch", "joint_de_roll"],
positions=msg["positions"],
)
self.arm_position_cmd_pub.publish(arm_position_cmd)

elif msg["arm_mode"] == "velocity":
arm_velocity_cmd = Velocity()
arm_velocity_cmd.names = self.RA_NAMES
arm_velocity_cmd.velocities = [
elif msg["type"] == "arm_values":
velocity_cmd.velocities = [
self.to_velocity(
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_a"]["xbox_index"]]), "joint_a"
),
Expand All @@ -355,36 +338,60 @@ def handle_arm_message(self, msg):
),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "b", "x"),
]

self.arm_velocity_cmd_pub.publish(arm_velocity_cmd)

]
publishers[1].publish(velocity_cmd)

elif msg["arm_mode"] == "throttle":
arm_throttle_cmd = Throttle()
arm_throttle_cmd.names = self.RA_NAMES
d_pad_x = msg["axes"][self.xbox_mappings["d_pad_x"]]
if d_pad_x > 0.5:
ra_slow_mode = True
elif d_pad_x < -0.5:
ra_slow_mode = False

arm_throttle_cmd.throttles = [
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_a"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_b"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_c"]["xbox_index"]]),
(self.filter_xbox_axis(msg["axes"][2]) - self.filter_xbox_axis(msg["axes"][5])) * 0.3,
(self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper")) * 0.3,
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "b", "x"),
]
throttle_cmd = Throttle()
throttle_cmd.names = controls_names
if msg["type"] == "cache_values":
throttle_cmd.throttles = [
self.sa_config["cache"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper")
]
elif msg["type"] == "sa_arm_values":
d_pad_x = msg["axes"][self.xbox_mappings["d_pad_x"]]
if d_pad_x > 0.5:
ra_slow_mode = True
elif d_pad_x < -0.5:
ra_slow_mode = False

throttle_cmd.throttles = [
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_a"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_b"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_c"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_de_pitch"]["xbox_index"]]),
self.filter_xbox_axis(msg["axes"][self.ra_config["joint_de_roll"]["xbox_index"]]),
self.ra_config["allen_key"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "y", "a"),
self.ra_config["gripper"]["multiplier"] * self.filter_xbox_button(msg["buttons"], "b", "x"),
]

for i, name in enumerate(self.RA_NAMES):
if ra_slow_mode:
arm_throttle_cmd.throttles[i] *= self.ra_config[name]["slow_mode_multiplier"]
if self.ra_config[name]["invert"]:
arm_throttle_cmd.throttles[i] *= -1
for i, name in enumerate(self.RA_NAMES):
if ra_slow_mode:
throttle_cmd.throttles[i] *= self.ra_config[name]["slow_mode_multiplier"]
if self.ra_config[name]["invert"]:
throttle_cmd.throttles[i] *= -1
elif msg["type"] == "sa_values":
throttle_cmd.throttles = [
self.sa_config[name]["multiplier"] * self.filter_xbox_axis(msg["axes"][info["xbox_index"]], 0.15, True)
for name, info in self.sa_config.items()
if name.startswith("sa")
]

self.arm_throttle_cmd_pub.publish(arm_throttle_cmd)
throttle_cmd.throttles.extend(
[
self.sa_config["sampler"]["multiplier"] * (right_trigger - left_trigger),
self.sa_config["sensor_actuator"]["multiplier"]
* self.filter_xbox_button(msg["buttons"], "right_bumper", "left_bumper"),
]
)

fast_mode_activated = msg["buttons"][self.xbox_mappings["a"]] or msg["buttons"][self.xbox_mappings["b"]]
if not fast_mode_activated:
for i, name in enumerate(SA_NAMES):
# When going up (vel > 0) with SA joint 2, we DON'T want slow mode.
if not (name == "sa_y" and throttle_cmd.throttles[i] > 0):
throttle_cmd.throttles[i] *= self.sa_config[name]["slow_mode_multiplier"]
publishers[2].publish(throttle_cmd)

def handle_joystick_message(self, msg):
# Tiny deadzone so we can safely e-stop with dampen switch
Expand Down Expand Up @@ -440,11 +447,6 @@ def handle_joystick_message(self, msg):
def pdb_callback(self, msg):
self.send(text_data=json.dumps({"type": "pdb", "temperatures": msg.temperatures, "currents": msg.currents}))

def calibration_checkbox_callback(self, msg):
self.send(
text_data=json.dumps({"type": "calibration_status", "names": msg.names, "calibrated": msg.calibrated})
)

def arm_controller_callback(self, msg):
hits = []
for n in msg.limit_hit:
Expand Down Expand Up @@ -498,6 +500,7 @@ def limit_switch(self, msg):
fail.append(joint)
except rospy.ServiceException as e:
print(f"Service call failed: {e}")

def sa_uv_bulb_callback(self, msg):
try:
result = self.toggle_uv(data=msg["data"])
Expand Down Expand Up @@ -644,9 +647,7 @@ def mast_gimbal(self, msg):
def change_cameras(self, msg):
try:
camera_cmd = CameraCmd(msg["device"], msg["resolution"])
rospy.logerr(camera_cmd)
result = self.change_cameras_srv(primary=msg["primary"], camera_cmd=camera_cmd)
rospy.logerr(result)
except rospy.ServiceException as e:
print(f"Service call failed: {e}")

Expand Down
Loading

0 comments on commit 98ca58e

Please sign in to comment.