Skip to content

Commit

Permalink
[uss_qualifier] NET0450 check that observed details (from a display p…
Browse files Browse the repository at this point in the history
…rovider) correspond to injected ones (#220)

NET0450 check that observed details (on a display provider) correspond to injected ones
  • Loading branch information
Shastick authored Sep 26, 2023
1 parent d15e5e3 commit ca7c10e
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,11 @@ def fail_check():

def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):
self._evaluate_uas_id(details.raw.get("uas_id"), participants)
self._evaluate_operator_id(details.operator_id, participants)
self._evaluate_operator_id(None, details.operator_id, participants)
self._evaluate_operator_location(
None,
None,
None,
details.operator_location,
details.operator_altitude,
details.operator_altitude_type,
Expand All @@ -226,26 +229,39 @@ def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):

def evaluate_dp_details(
self,
injected_details: injection.RIDFlightDetails,
observed_details: Optional[observation_api.GetDetailsResponse],
participants: List[str],
):
if not observed_details:
return

self._evaluate_arbitrary_uas_id(
observed_details.get("uas", {}).get("id"), participants
injected_details.get(
"uas_id", injected_details.get("serial_number", None)
), # fall back on seria number if no UAS ID
observed_details.get("uas", {}).get("id", None),
participants,
)

operator = observed_details.get("operator", {})
self._evaluate_operator_id(operator.get("id"), participants)
operator_obs = observed_details.get("operator", {})

self._evaluate_operator_id(
injected_details.operator_id, operator_obs.get("id", None), participants
)

operator_location = operator.get("location", {})
operator_altitude = operator.get("altitude", {})
operator_altitude_value = operator_altitude.get("altitude")
operator_altitude_obs = operator_obs.get("altitude", {})
operator_altitude_value_obs = operator_altitude_obs.get("altitude")
operator_altitude_inj = injected_details.get("operator_altitude", {})
self._evaluate_operator_location(
operator_location,
Altitude.w84m(value=operator_altitude_value),
operator_altitude.get("altitude_type"),
injected_details.get("operator_location", None),
operator_altitude_inj.get(
"altitude", None
), # should be of the correct type already
operator_altitude_inj.get("altitude_type", None),
operator_obs.get("location", None),
Altitude.w84m(value=operator_altitude_value_obs),
operator_altitude_obs.get("altitude_type", None),
participants,
)

Expand Down Expand Up @@ -296,24 +312,37 @@ def _evaluate_uas_id(self, value: Optional[UASID], participants: List[str]):
message=f"Unsupported version {self._rid_version}: skipping UAS ID evaluation",
)

def _evaluate_arbitrary_uas_id(self, value: str, participants: List[str]):
def _evaluate_arbitrary_uas_id(
self, value_inj: str, value_obs: str, participants: List[str]
):
if self._rid_version == RIDVersion.f3411_22a:
with self._test_scenario.check(
"UAS ID presence in flight details", participants
) as check:
if not value:
if value_obs is None:
check.record_failed(
f"UAS ID not present as required by the Common Dictionary definition: {value}",
f"UAS ID not present as required by the Common Dictionary definition: {value_obs}",
severity=Severity.Medium,
)
return

if SerialNumber(value).valid:
if SerialNumber(value_obs).valid:
self._test_scenario.check(
"UAS ID (Serial Number format) consistency with Common Dictionary",
participants,
).record_passed(participants)

if value_obs is not None:
with self._test_scenario.check(
"UAS ID is consistent with injected one", participants
) as check:
if value_inj != value_obs:
check.record_failed(
"Observed UAS ID not consistent with injected one",
details=f"Observed: {value_obs} - injected: {value_inj}",
severity=Severity.Medium,
)

# TODO: Add registration id format check
# TODO: Add utm id format check
# TODO: Add specific session id format check
Expand Down Expand Up @@ -360,7 +389,7 @@ def _evaluate_timestamp(
if abs(t_inj.datetime - t_obs.datetime).total_seconds() > 1.0:
check.record_failed(
"Observed timestamp inconsistent with injected one",
details=f"Injected timestamp: {timestamp_inj} Observed one: {timestamp_obs}",
details=f"Injected timestamp: {timestamp_inj} - Observed one: {timestamp_obs}",
severity=Severity.Medium,
)
else:
Expand All @@ -369,18 +398,35 @@ def _evaluate_timestamp(
message=f"Unsupported version {self._rid_version}: skipping timestamp evaluation",
)

def _evaluate_operator_id(self, value: Optional[str], participants: List[str]):
def _evaluate_operator_id(
self,
value_inj: Optional[str],
value_obs: Optional[str],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
if value:
if value_obs:
with self._test_scenario.check(
"Operator ID consistency with Common Dictionary", participants
) as check:
is_ascii = all([0 <= ord(c) < 128 for c in value])
is_ascii = all([0 <= ord(c) < 128 for c in value_obs])
if not is_ascii:
check.record_failed(
"Operator ID contains non-ascii characters",
severity=Severity.Medium,
)

if value_inj is not None:
with self._test_scenario.check(
"Operator ID is consistent with injected one", participants
) as check:
if value_inj != value_obs:
check.record_failed(
"Observed Operator ID not consistent with injected one",
details=f"Observed: {value_obs} - injected: {value_inj}",
severity=Severity.Medium,
)

else:
self._test_scenario.record_note(
key="skip_reason",
Expand Down Expand Up @@ -418,7 +464,7 @@ def _evaluate_speed(
if abs(speed_obs - speed_inj) > 0.125:
check.record_failed(
"Observed speed different from injected speed",
details=f"Injected speed was {speed_inj} observed speed is {speed_obs}",
details=f"Injected speed was {speed_inj} - observed speed is {speed_obs}",
severity=Severity.Medium,
)
else:
Expand Down Expand Up @@ -462,7 +508,7 @@ def _evaluate_track(
if abs_track_diff > 0.5:
check.record_failed(
"Observed track direction different from injected one",
details=f"Inject track was {track_inj} observed one is {track_obs}",
details=f"Inject track was {track_inj} - observed one is {track_obs}",
severity=Severity.Medium,
)

Expand Down Expand Up @@ -510,7 +556,7 @@ def _evaluate_position(
):
check.record_failed(
"Observed position inconsistent with injected one",
details=f"Injected Position: {position_inj} Observed Position: {position_obs}",
details=f"Injected Position: {position_inj} - Observed Position: {position_obs}",
severity=Severity.Medium,
)
else:
Expand Down Expand Up @@ -551,7 +597,7 @@ def _evaluate_height(
):
check.record_failed(
"Observed Height is inconsistent with injected one",
details=f"Observed height: {height_obs} injected: {height_inj}",
details=f"Observed height: {height_obs} - injected: {height_inj}",
severity=Severity.Medium,
)
else:
Expand All @@ -562,24 +608,27 @@ def _evaluate_height(

def _evaluate_operator_location(
self,
position: Optional[LatLngPoint],
altitude: Optional[Altitude],
altitude_type: Optional[observation_api.OperatorAltitudeAltitudeType],
position_inj: Optional[LatLngPoint],
altitude_inj: Optional[Altitude],
altitude_type_inj: Optional[injection.OperatorAltitudeAltitudeType],
position_obs: Optional[LatLngPoint],
altitude_obs: Optional[Altitude],
altitude_type_obs: Optional[observation_api.OperatorAltitudeAltitudeType],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
with self._test_scenario.check(
"Operator Location consistency with Common Dictionary", participants
) as check:
if not position:
if not position_obs:
check.record_failed(
"Missing Operator Location position",
details=f"Invalid position: {position}",
details=f"Invalid position: {position_obs}",
severity=Severity.Medium,
)
return

lat = position.lat
lat = position_obs.lat
try:
lat = validate_lat(lat)
except ValueError:
Expand All @@ -588,17 +637,33 @@ def _evaluate_operator_location(
details=f"Invalid latitude: {lat}",
severity=Severity.Medium,
)
lng = position.lng
lng = position_obs.lng
try:
lng = validate_lng(lng)
position_valid = True
except ValueError:
position_valid = False
check.record_failed(
"Operator Location contains an invalid longitude",
details=f"Invalid longitude: {lng}",
severity=Severity.Medium,
)

alt = altitude
if position_valid and position_obs is not None and position_inj is not None:
with self._test_scenario.check(
"Operator Location is consistent with injected one", participants
) as check:
if (
abs(position_obs.lat - position_inj.lat) > 0.01
or abs(position_obs.lng - position_obs.lng) > 0.01
):
check.record_failed(
summary="Operator Location not consistent with injected one",
details=f"Observed: {position_obs} - injected: {position_inj}",
severity=Severity.Medium,
)

alt = altitude_obs
if alt:
with self._test_scenario.check(
"Operator Altitude consistency with Common Dictionary",
Expand All @@ -616,8 +681,24 @@ def _evaluate_operator_location(
details=f"Invalid Operator Altitude units: {alt.units}",
severity=Severity.Medium,
)
if altitude_inj is not None:

with self._test_scenario.check(
"Operator Altitude is consistent with injected one",
participants,
) as check:
if (
alt.units != altitude_inj.units
or alt.reference != altitude_inj.reference
or abs(alt.value - altitude_inj.value) > 1
):
check.record_failed(
"Observed operator altitude inconsistent with injected one",
details=f"Observed: {alt} - injected: {altitude_inj}",
severity=Severity.Medium,
)

alt_type = altitude_type
alt_type = altitude_type_obs
if alt_type:
with self._test_scenario.check(
"Operator Altitude Type consistency with Common Dictionary",
Expand All @@ -634,6 +715,18 @@ def _evaluate_operator_location(
severity=Severity.Medium,
)

if altitude_type_inj is not None:
with self._test_scenario.check(
"Operator Altitude Type is consistent with injected one",
participants,
) as check:
if alt_type != altitude_type_inj:
check.record_failed(
"Observed Operator Altitude Type is inconsistent with injected one",
details=f"Observed: {alt_type} - Injected: {altitude_type_inj}",
severity=Severity.Medium,
)

else:
self._test_scenario.record_note(
key="skip_reason",
Expand Down Expand Up @@ -670,7 +763,7 @@ def _evaluate_operational_status(
if not value_obs == value_inj:
check.record_failed(
"Observed operational status inconsistent with injected one",
details=f"Injected operational status: {value_inj} Observed {value_obs}",
details=f"Injected operational status: {value_inj} - Observed {value_obs}",
)

else:
Expand Down
Loading

0 comments on commit ca7c10e

Please sign in to comment.