Skip to content

Commit

Permalink
code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
robotastic committed Apr 1, 2024
1 parent da223e7 commit e746cf1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 36 deletions.
73 changes: 38 additions & 35 deletions axis-ptz-controller/axis_ptz_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
from camera_configuration import CameraConfiguration
from camera_control import CameraControl


class Status(Enum):
SLEEPING = 0
SLEWING = 1
TRACKING = 2


class AxisPtzController(BaseMQTTPubSub):
"""Point the camera at an object using a proportional rate
controller, and capture images while in track."""
Expand Down Expand Up @@ -524,7 +526,9 @@ def _config_callback(
"capture_interval", self.capture_interval
) # [s]
self.capture_dir = config.get("capture_dir", self.capture_dir)
self.tracking_interval = config.get("tracking_interval", self.tracking_interval) # [s]
self.tracking_interval = config.get(
"tracking_interval", self.tracking_interval
) # [s]
# Cannot set tripod yaw, pitch, and roll because of side effects
self.pan_gain = config.get("pan_gain", self.pan_gain) # [1/s]
self.pan_rate_max = config.get("pan_rate_max", self.pan_rate_max)
Expand Down Expand Up @@ -617,8 +621,6 @@ def _log_config(self) -> None:
f"AxisPtzController configuration:\n{json.dumps(config, indent=4)}"
)



def _compute_object_pointing(self, time_since_last_update=0) -> None:
# Compute position in the geocentric (XYZ) coordinate system
# of the object relative to the tripod at time zero, the
Expand All @@ -630,13 +632,14 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None:

# Assign lead time, computing and adding age of object
# message, if enabled
time_delta = self.lead_time # [s] time_since_last_update
time_delta = self.lead_time # [s] time_since_last_update
object_msg_age = 0
if self.include_age:
object_msg_age = time() - self.timestamp_o #datetime.utcnow().timestamp() - self.timestamp_o # [s]
object_msg_age = (
time() - self.timestamp_o
) # datetime.utcnow().timestamp() - self.timestamp_o # [s]
logging.debug(f"Object msg age: {object_msg_age} [s]")
time_delta += object_msg_age


# Compute position and velocity in the topocentric (ENz)
# coordinate system of the object relative to the tripod at
Expand All @@ -650,8 +653,12 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None:
self.vertical_rate_o,
]
)
r_ENz_o_N_t = self.r_ENz_o_0_t + self.v_ENz_o_0_t * object_msg_age # where the object is now
r_ENz_o_1_t = self.r_ENz_o_0_t + self.v_ENz_o_0_t * time_delta # where the object will be in the future, for leading
r_ENz_o_N_t = (
self.r_ENz_o_0_t + self.v_ENz_o_0_t * object_msg_age
) # where the object is now
r_ENz_o_1_t = (
self.r_ENz_o_0_t + self.v_ENz_o_0_t * time_delta
) # where the object will be in the future, for leading

# Compute position, at time one, and velocity, at time zero,
# in the geocentric (XYZ) coordinate system of the object
Expand Down Expand Up @@ -685,7 +692,9 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None:
# Compute pan and tilt to point the camera at the object
r_uvw_o_n_t = np.matmul(self.E_XYZ_to_uvw, self.r_XYZ_o_N_t)
r_uvw_o_1_t = np.matmul(self.E_XYZ_to_uvw, self.r_XYZ_o_1_t)
self.rho_o_now = math.degrees(math.atan2(r_uvw_o_n_t[0], r_uvw_o_n_t[1])) # [deg]
self.rho_o_now = math.degrees(
math.atan2(r_uvw_o_n_t[0], r_uvw_o_n_t[1])
) # [deg]
self.tau_o_now = math.degrees(
math.atan2(r_uvw_o_n_t[2], axis_ptz_utilities.norm(r_uvw_o_n_t[0:2]))
)
Expand All @@ -694,13 +703,14 @@ def _compute_object_pointing(self, time_since_last_update=0) -> None:
math.atan2(r_uvw_o_1_t[2], axis_ptz_utilities.norm(r_uvw_o_1_t[0:2]))
) # [deg]
logging.debug(f"Object pan and tilt: {self.rho_o}, {self.tau_o} [deg]")
logging.debug(f"\t🔭\tObject - Pan: {self.rho_o}\t Obj ENz 1t: {r_ENz_o_1_t}\t Obj ENz 0t: {self.r_ENz_o_0_t}\t Obj Velo 0t: {self.v_ENz_o_0_t}\t Lead time: {time_delta}")
logging.debug(
f"\t🔭\tObject - Pan: {self.rho_o}\t Obj ENz 1t: {r_ENz_o_1_t}\t Obj ENz 0t: {self.r_ENz_o_0_t}\t Obj Velo 0t: {self.v_ENz_o_0_t}\t Lead time: {time_delta}"
)

def _track_object(self, time_since_last_update) -> None:

if self.status != Status.TRACKING:
return

# Make sure Object info is not updated while pointing is being computed
self.object_lock.acquire()

Expand All @@ -716,7 +726,6 @@ def _track_object(self, time_since_last_update) -> None:
else:
logging.debug(f"Controller pan and tilt: {self.rho_c}, {self.tau_c} [deg]")


# Compute angle delta between camera and object pan and tilt
self.delta_rho = self._compute_angle_delta(self.rho_c, self.rho_o)
self.delta_tau = self._compute_angle_delta(self.tau_c, self.tau_o)
Expand Down Expand Up @@ -767,7 +776,9 @@ def _track_object(self, time_since_last_update) -> None:
rho_2_dot_o = math.degrees(-omega_2[2])
tau_2_dot_o = math.degrees(omega_2[0])

logging.info(f"Object pan and tilt orig rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s] vs {rho_2_dot_o}, {tau_2_dot_o} [deg/s]")
logging.info(
f"Object pan and tilt orig rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s] vs {rho_2_dot_o}, {tau_2_dot_o} [deg/s]"
)
logging.debug(
f"Object pan and tilt rates: {self.rho_dot_o}, {self.tau_dot_o} [deg/s]"
)
Expand Down Expand Up @@ -823,15 +834,12 @@ def _track_object(self, time_since_last_update) -> None:
self.do_capture = True
self.capture_time = time()

if ( self.do_capture
and time() - self.capture_time > self.capture_interval
):
if self.do_capture and time() - self.capture_time > self.capture_interval:
capture_thread = threading.Thread(target=self._capture_image)
capture_thread.daemon = True
capture_thread.start()


#logging.info(f"\t⏱️\tRATES - 🎥 Pan: {self.rho_dot_c}\tTilt: {self.tau_dot_c}\t🛩️ Pan: {self.rho_dot_o}\tTilt: {self.tau_dot_o} ANGLES: 🎥 Pan: {self.rho_c}\tTilt: {self.tau_c}\t🛩️ Pan: {self.rho_o}\tTilt: {self.tau_o} ")
# logging.info(f"\t⏱️\tRATES - 🎥 Pan: {self.rho_dot_c}\tTilt: {self.tau_dot_c}\t🛩️ Pan: {self.rho_dot_o}\tTilt: {self.tau_dot_o} ANGLES: 🎥 Pan: {self.rho_c}\tTilt: {self.tau_c}\t🛩️ Pan: {self.rho_o}\tTilt: {self.tau_o} ")
elapsed_time = time() - start_time
# Log camera pointing using MQTT
if self.log_to_mqtt:
Expand Down Expand Up @@ -876,15 +884,11 @@ def _track_object(self, time_since_last_update) -> None:
logging.debug(f"Publishing logger msg: {logger_msg}")
self.publish_to_topic(self.logger_topic, logger_msg)




def _slew_camera(self) -> None:

if self.status == Status.SLEWING:
logging.error("Camera is already slewing")
return

self.status = Status.SLEWING

# Get camera pan and tilt
Expand All @@ -898,9 +902,7 @@ def _slew_camera(self) -> None:
f"Absolute move to pan: {self.rho_o}, and tilt: {self.tau_o}, with zoom: {self.zoom}"
)
try:
self.camera_control.absolute_move(
self.rho_o, self.tau_o, self.zoom, 99
)
self.camera_control.absolute_move(self.rho_o, self.tau_o, self.zoom, 99)
except Exception as e:
logging.error(f"Error: {e}")
else:
Expand All @@ -926,7 +928,6 @@ def _slew_camera(self) -> None:
# Start Tracking
self.status = Status.TRACKING


def _orientation_callback(
self,
_client: Union[mqtt.Client, None],
Expand Down Expand Up @@ -1038,7 +1039,9 @@ def _object_callback(
) <= set(data.keys()):
logging.info(f"Required keys missing from object message data: {data}")
return
logging.info(f"\t🗒️\tProcessing object msg data: {data['object_id']} \t {data['latitude']} \t {data['longitude']} \t {data['altitude']}")
logging.info(
f"\t🗒️\tProcessing object msg data: {data['object_id']} \t {data['latitude']} \t {data['longitude']} \t {data['altitude']}"
)
self.timestamp_o = float(data["timestamp"]) # [s]
self.timestamp_c = self.timestamp_o
self.lambda_o = data["longitude"] # [deg]
Expand All @@ -1060,10 +1063,10 @@ def _object_callback(
self.camera_control.stop_move()
except Exception as e:
logging.error(f"Error: {e}")

# Reset the stop timer because we received an object message
self._reset_stop_timer()

if self.use_camera:
# Point the camera at any new object directly
if self.object_id != object_id:
Expand Down Expand Up @@ -1243,7 +1246,6 @@ def _compute_pan_rate_index(self, rho_dot: float):
else:
self.pan_rate_index = round((100 / self.pan_rate_max) * rho_dot)


def _compute_tilt_rate_index(self, tau_dot: float):
"""Compute tilt rate index between -100 and 100 using rates in
deg/s, limiting the results to the specified minimum and
Expand Down Expand Up @@ -1420,7 +1422,6 @@ def main(self) -> None:
self.manual_control_topic, self._manual_control_callback
)


update_tracking_time = time()

# Enter the main loop
Expand All @@ -1435,9 +1436,11 @@ def main(self) -> None:
if not self.use_camera:
self._update_pointing()


# Track object
if ( self.use_camera and time() - update_tracking_time > self.tracking_interval ):
if (
self.use_camera
and time() - update_tracking_time > self.tracking_interval
):
time_since_last_update = time() - update_tracking_time
update_tracking_time = time()
self._track_object(time_since_last_update)
Expand Down
3 changes: 2 additions & 1 deletion axis-ptz-controller/camera_control.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from sensecam_control import vapix_control

logging.getLogger("vapix_control").setLevel(logging.CRITICAL)
logging.getLogger("sensecam_control").setLevel(logging.CRITICAL)
from typing import Tuple, Union
Expand Down Expand Up @@ -55,7 +56,7 @@ def get_ptz(self) -> Tuple[float, float, float, float]:
"""
try:
logging.disable(logging.INFO)
logging.disable(logging.INFO)
resp = self._camera_command({"query": "position"})
logging.disable(logging.NOTSET)
pan = float(resp.text.split()[0].split("=")[1])
Expand Down

0 comments on commit e746cf1

Please sign in to comment.