Skip to content

Commit

Permalink
Update c2_pub_sub.py
Browse files Browse the repository at this point in the history
  • Loading branch information
robotastic committed Feb 27, 2024
1 parent f8ddef5 commit 11cb97f
Showing 1 changed file with 114 additions and 7 deletions.
121 changes: 114 additions & 7 deletions skyscan-c2/c2_pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ class C2PubSub(BaseMQTTPubSub):
def __init__(
self: Any,
hostname: str,
config_topic: str,
c2_topic: str,
ledger_topic: str,
object_topic: str,
prioritized_ledger_topic: str,
manual_override_topic: str,
min_tilt: float,
min_altitude: float,
max_altitude: float,
object_distance_threshold: str,
device_latitude: str,
device_longitude: str,
Expand All @@ -66,6 +70,7 @@ def __init__(
super().__init__(**kwargs)
# initialize attributes
self.hostname = hostname
self.config_topic = config_topic
self.c2_topic = c2_topic
self.ledger_topic = ledger_topic
self.object_topic = object_topic
Expand All @@ -75,6 +80,9 @@ def __init__(
self.device_longitude = float(device_longitude)
self.device_altitude = float(device_altitude)
self.object_distance_threshold = float(object_distance_threshold)
self.min_tilt = min_tilt
self.min_altitude = min_altitude
self.max_altitude = max_altitude
self.lambda_t = self.device_longitude # [deg]
self.varphi_t = self.device_latitude # [deg]
self.h_t = self.device_altitude # [m]
Expand Down Expand Up @@ -138,11 +146,15 @@ def __init__(
logging.info(
f"""C2PubSub initialized with parameters:
hostname = {hostname}
config_topic = {config_topic}
c2_topic = {c2_topic}
ledger_topic = {ledger_topic}
object_topic = {object_topic}
prioritized_ledger_topic = {prioritized_ledger_topic}
manual_override_topic = {manual_override_topic}
min_tilt = {min_tilt}
min_altitude = {min_altitude}
max_altitude = {max_altitude}
object_distance_threshold = {object_distance_threshold}
device_latitude = {device_latitude}
device_longitude = {device_longitude}
Expand Down Expand Up @@ -174,7 +186,7 @@ def _calculate_camera_angles(self: Any, data: Any) -> tuple[float, float, float]
) <= set(data.keys()):
logging.info(f"Required keys missing from object message data: {data}")
return 0.0, 0.0, 0.0
#logging.info(f"Processing object msg data: {data}")
# logging.info(f"Processing object msg data: {data}")
self.timestamp_o = float(data["timestamp"]) # [s]
self.timestamp_c = self.timestamp_o
self.lambda_o = data["longitude"] # [deg]
Expand All @@ -197,7 +209,9 @@ def _calculate_camera_angles(self: Any, data: Any) -> tuple[float, float, float]
lead_time = self.lead_time # [s]

object_msg_age = datetime.utcnow().timestamp() - self.timestamp_o # [s]
logging.debug(f"Object msg age: {object_msg_age} [s] Lead time: {lead_time} [s]")
logging.debug(
f"Object msg age: {object_msg_age} [s] Lead time: {lead_time} [s]"
)
lead_time += object_msg_age

# Compute position and velocity in the topocentric (ENz)
Expand Down Expand Up @@ -240,7 +254,7 @@ def _calculate_camera_angles(self: Any, data: Any) -> tuple[float, float, float]
self.elv_o = math.degrees(
math.atan2(r_ENz_o_1_t[2], axis_ptz_utilities.norm(r_ENz_o_1_t[0:2]))
) # [deg]
#logging.info(f"Object azimuth and elevation: {self.azm_o}, {self.elv_o} [deg]")
# logging.info(f"Object azimuth and elevation: {self.azm_o}, {self.elv_o} [deg]")

# Compute pan and tilt to point the camera at the object
r_uvw_o_1_t = np.matmul(self.E_XYZ_to_uvw, r_XYZ_o_1_t)
Expand Down Expand Up @@ -290,6 +304,76 @@ def _relative_distance_meters(
* 1000
)

def decode_payload(
self, msg: Union[mqtt.MQTTMessage, str], data_payload_type: str
) -> Dict[Any, Any]:
"""
Decode the payload carried by a message.
Parameters
----------
payload: mqtt.MQTTMessage
The MQTT message
data_payload_type: str
The data payload type
Returns
-------
data : Dict[Any, Any]
The data payload of the message payload
"""
if type(msg) == mqtt.MQTTMessage:
payload = msg.payload.decode()
else:
payload = msg
try:
json_payload = json.loads(payload)
data_payload = json_payload[data_payload_type]
except (KeyError, TypeError) as e:
logging.error(f"Error: {e}")
logging.error(json_payload)
logging.error(
f"Data payload type: {data_payload_type} not found in payload: {data_payload}"
)
return {}
return json.loads(data_payload)

def _config_callback(
self,
_client: Union[mqtt.Client, None],
_userdata: Union[Dict[Any, Any], None],
msg: Union[mqtt.MQTTMessage, str],
) -> None:
"""
Process configuration message.
Parameters
----------
_client: Union[mqtt.Client, None]
MQTT client
_userdata: Union[Dict[Any, Any], None]
Any required user data
msg: Union[mqtt.MQTTMessage, Dict[Any, Any]]
An MQTT message, or dictionary
Returns
-------
None
"""
# Assign data attributes allowed to change during operation,
# ignoring config message data without a "axis-ptz-controller"
# key

data = self.decode_payload(msg, "Configuration")
if "skyscan-c2" not in data:
logging.info(f"Configuration message data missing skyscan-c2: {data}")
return
logging.info(f"Processing config msg data: {data}")
config = data["skyscan-c2"]
self.min_tilt = config.get("min_tilt", self.min_tilt)
self.min_altitude = config.get("min_altitude", self.min_altitude)
self.max_altitude = config.get("max_altitude", self.max_altitude)

def _target_selection_callback(
self: Any, _client: mqtt.Client, _userdata: Dict[Any, Any], msg: Any
) -> None:
Expand Down Expand Up @@ -328,12 +412,27 @@ def _target_selection_callback(
axis=1,
)

object_ledger_df["min_tilt_fail"] = (
object_ledger_df["camera_tilt"] < self.min_tilt
)
object_ledger_df["min_altitude_fail"] = (
object_ledger_df["altitude"] < self.min_altitude
)
object_ledger_df["max_altitude_fail"] = (
object_ledger_df["altitude"] > self.max_altitude
)

logging.info(f"Object ledger: {object_ledger_df.to_string()}")
if not object_ledger_df.empty and not self.override_object:
logging.debug("Standard distance thresholding")
object_ledger_df = object_ledger_df[
object_ledger_df["relative_distance"]
<= self.object_distance_threshold
(
object_ledger_df["relative_distance"]
<= self.object_distance_threshold
)
& (object_ledger_df["min_tilt_fail"] == False)
& (object_ledger_df["min_altitude_fail"] == False)
& (object_ledger_df["max_altitude_fail"] == False)
]
if not object_ledger_df.empty:
logging.debug("Object[s] within distance threshold")
Expand Down Expand Up @@ -451,8 +550,12 @@ def main(self: Any) -> None:
)

self.add_subscribe_topics(
[self.ledger_topic, self.manual_override_topic],
[self._target_selection_callback, self._target_selection_callback],
[self.ledger_topic, self.manual_override_topic, self.config_topic],
[
self._target_selection_callback,
self._target_selection_callback,
self._config_callback,
],
[2, 2],
)

Expand All @@ -471,6 +574,7 @@ def main(self: Any) -> None:
c2 = C2PubSub(
hostname=str(os.environ.get("HOSTNAME")),
mqtt_ip=str(os.environ.get("MQTT_IP")),
config_topic=os.environ.get("CONFIG_TOPIC", ""),
c2_topic=str(os.environ.get("C2_TOPIC")),
ledger_topic=str(os.environ.get("LEDGER_TOPIC")),
object_topic=str(os.environ.get("OBJECT_TOPIC")),
Expand All @@ -480,6 +584,9 @@ def main(self: Any) -> None:
device_longitude=str(os.environ.get("TRIPOD_LONGITUDE")),
device_altitude=str(os.environ.get("TRIPOD_ALTITUDE")),
lead_time=float(os.environ.get("LEAD_TIME", 1.0)),
min_tilt=float(os.environ.get("MIN_TILT", 0.0)),
min_altitude=float(os.environ.get("MIN_ALTITUDE", 0.0)),
max_altitude=float(os.environ.get("MAX_ALTITUDE", 100000000.0)),
object_distance_threshold=str(os.environ.get("OBJECT_DISTANCE_THRESHOLD")),
log_level=str(os.environ.get("LOG_LEVEL", "INFO")),
)
Expand Down

0 comments on commit 11cb97f

Please sign in to comment.