Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier] NET0450 check that observed details (from a display provider) correspond to injected ones #220

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,11 @@ def fail_check():

def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):
self._evaluate_uas_id(details.raw.get("uas_id"), participants)
self._evaluate_operator_id(details.operator_id, participants)
self._evaluate_operator_id(None, details.operator_id, participants)
self._evaluate_operator_location(
None,
None,
None,
details.operator_location,
details.operator_altitude,
details.operator_altitude_type,
Expand All @@ -226,26 +229,39 @@ def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):

def evaluate_dp_details(
self,
injected_details: injection.RIDFlightDetails,
observed_details: Optional[observation_api.GetDetailsResponse],
participants: List[str],
):
if not observed_details:
return

self._evaluate_arbitrary_uas_id(
observed_details.get("uas", {}).get("id"), participants
injected_details.get(
"uas_id", injected_details.get("serial_number", None)
), # fall back on seria number if no UAS ID
observed_details.get("uas", {}).get("id", None),
participants,
)

operator = observed_details.get("operator", {})
self._evaluate_operator_id(operator.get("id"), participants)
operator_obs = observed_details.get("operator", {})

self._evaluate_operator_id(
injected_details.operator_id, operator_obs.get("id", None), participants
)

operator_location = operator.get("location", {})
operator_altitude = operator.get("altitude", {})
operator_altitude_value = operator_altitude.get("altitude")
operator_altitude_obs = operator_obs.get("altitude", {})
operator_altitude_value_obs = operator_altitude_obs.get("altitude")
operator_altitude_inj = injected_details.get("operator_altitude", {})
self._evaluate_operator_location(
operator_location,
Altitude.w84m(value=operator_altitude_value),
operator_altitude.get("altitude_type"),
injected_details.get("operator_location", None),
operator_altitude_inj.get(
"altitude", None
), # should be of the correct type already
operator_altitude_inj.get("altitude_type", None),
operator_obs.get("location", None),
Altitude.w84m(value=operator_altitude_value_obs),
operator_altitude_obs.get("altitude_type", None),
participants,
)

Expand Down Expand Up @@ -296,24 +312,37 @@ def _evaluate_uas_id(self, value: Optional[UASID], participants: List[str]):
message=f"Unsupported version {self._rid_version}: skipping UAS ID evaluation",
)

def _evaluate_arbitrary_uas_id(self, value: str, participants: List[str]):
def _evaluate_arbitrary_uas_id(
self, value_inj: str, value_obs: str, participants: List[str]
):
if self._rid_version == RIDVersion.f3411_22a:
with self._test_scenario.check(
"UAS ID presence in flight details", participants
) as check:
if not value:
if value_obs is None:
check.record_failed(
f"UAS ID not present as required by the Common Dictionary definition: {value}",
f"UAS ID not present as required by the Common Dictionary definition: {value_obs}",
severity=Severity.Medium,
)
return

if SerialNumber(value).valid:
if SerialNumber(value_obs).valid:
self._test_scenario.check(
"UAS ID (Serial Number format) consistency with Common Dictionary",
participants,
).record_passed(participants)

if value_obs is not None:
with self._test_scenario.check(
"UAS ID is consistent with injected one", participants
) as check:
if value_inj != value_obs:
check.record_failed(
"Observed UAS ID not consistent with injected one",
details=f"Observed: {value_obs} - injected: {value_inj}",
severity=Severity.Medium,
)

# TODO: Add registration id format check
# TODO: Add utm id format check
# TODO: Add specific session id format check
Expand Down Expand Up @@ -360,7 +389,7 @@ def _evaluate_timestamp(
if abs(t_inj.datetime - t_obs.datetime).total_seconds() > 1.0:
check.record_failed(
"Observed timestamp inconsistent with injected one",
details=f"Injected timestamp: {timestamp_inj} Observed one: {timestamp_obs}",
details=f"Injected timestamp: {timestamp_inj} - Observed one: {timestamp_obs}",
severity=Severity.Medium,
)
else:
Expand All @@ -369,18 +398,35 @@ def _evaluate_timestamp(
message=f"Unsupported version {self._rid_version}: skipping timestamp evaluation",
)

def _evaluate_operator_id(self, value: Optional[str], participants: List[str]):
def _evaluate_operator_id(
self,
value_inj: Optional[str],
value_obs: Optional[str],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
if value:
if value_obs:
with self._test_scenario.check(
"Operator ID consistency with Common Dictionary", participants
) as check:
is_ascii = all([0 <= ord(c) < 128 for c in value])
is_ascii = all([0 <= ord(c) < 128 for c in value_obs])
if not is_ascii:
check.record_failed(
"Operator ID contains non-ascii characters",
severity=Severity.Medium,
)

if value_inj is not None:
with self._test_scenario.check(
"Operator ID is consistent with injected one", participants
) as check:
if value_inj != value_obs:
check.record_failed(
"Observed Operator ID not consistent with injected one",
details=f"Observed: {value_obs} - injected: {value_inj}",
severity=Severity.Medium,
)

else:
self._test_scenario.record_note(
key="skip_reason",
Expand Down Expand Up @@ -418,7 +464,7 @@ def _evaluate_speed(
if abs(speed_obs - speed_inj) > 0.125:
check.record_failed(
"Observed speed different from injected speed",
details=f"Injected speed was {speed_inj} observed speed is {speed_obs}",
details=f"Injected speed was {speed_inj} - observed speed is {speed_obs}",
severity=Severity.Medium,
)
else:
Expand Down Expand Up @@ -462,7 +508,7 @@ def _evaluate_track(
if abs_track_diff > 0.5:
check.record_failed(
"Observed track direction different from injected one",
details=f"Inject track was {track_inj} observed one is {track_obs}",
details=f"Inject track was {track_inj} - observed one is {track_obs}",
severity=Severity.Medium,
)

Expand Down Expand Up @@ -510,7 +556,7 @@ def _evaluate_position(
):
check.record_failed(
"Observed position inconsistent with injected one",
details=f"Injected Position: {position_inj} Observed Position: {position_obs}",
details=f"Injected Position: {position_inj} - Observed Position: {position_obs}",
severity=Severity.Medium,
)
else:
Expand Down Expand Up @@ -551,7 +597,7 @@ def _evaluate_height(
):
check.record_failed(
"Observed Height is inconsistent with injected one",
details=f"Observed height: {height_obs} injected: {height_inj}",
details=f"Observed height: {height_obs} - injected: {height_inj}",
severity=Severity.Medium,
)
else:
Expand All @@ -562,24 +608,27 @@ def _evaluate_height(

def _evaluate_operator_location(
self,
position: Optional[LatLngPoint],
altitude: Optional[Altitude],
altitude_type: Optional[observation_api.OperatorAltitudeAltitudeType],
position_inj: Optional[LatLngPoint],
altitude_inj: Optional[Altitude],
altitude_type_inj: Optional[injection.OperatorAltitudeAltitudeType],
position_obs: Optional[LatLngPoint],
altitude_obs: Optional[Altitude],
altitude_type_obs: Optional[observation_api.OperatorAltitudeAltitudeType],
participants: List[str],
):
if self._rid_version == RIDVersion.f3411_22a:
with self._test_scenario.check(
"Operator Location consistency with Common Dictionary", participants
) as check:
if not position:
if not position_obs:
check.record_failed(
"Missing Operator Location position",
details=f"Invalid position: {position}",
details=f"Invalid position: {position_obs}",
severity=Severity.Medium,
)
return

lat = position.lat
lat = position_obs.lat
try:
lat = validate_lat(lat)
except ValueError:
Expand All @@ -588,17 +637,33 @@ def _evaluate_operator_location(
details=f"Invalid latitude: {lat}",
severity=Severity.Medium,
)
lng = position.lng
lng = position_obs.lng
try:
lng = validate_lng(lng)
position_valid = True
except ValueError:
position_valid = False
check.record_failed(
"Operator Location contains an invalid longitude",
details=f"Invalid longitude: {lng}",
severity=Severity.Medium,
)

alt = altitude
if position_valid and position_obs is not None and position_inj is not None:
with self._test_scenario.check(
"Operator Location is consistent with injected one", participants
) as check:
if (
abs(position_obs.lat - position_inj.lat) > 0.01
or abs(position_obs.lng - position_obs.lng) > 0.01
):
check.record_failed(
summary="Operator Location not consistent with injected one",
details=f"Observed: {position_obs} - injected: {position_inj}",
severity=Severity.Medium,
)

alt = altitude_obs
if alt:
with self._test_scenario.check(
"Operator Altitude consistency with Common Dictionary",
Expand All @@ -616,8 +681,24 @@ def _evaluate_operator_location(
details=f"Invalid Operator Altitude units: {alt.units}",
severity=Severity.Medium,
)
if altitude_inj is not None:

with self._test_scenario.check(
"Operator Altitude is consistent with injected one",
participants,
) as check:
if (
alt.units != altitude_inj.units
or alt.reference != altitude_inj.reference
or abs(alt.value - altitude_inj.value) > 1
):
check.record_failed(
"Observed operator altitude inconsistent with injected one",
details=f"Observed: {alt} - injected: {altitude_inj}",
severity=Severity.Medium,
)

alt_type = altitude_type
alt_type = altitude_type_obs
if alt_type:
with self._test_scenario.check(
"Operator Altitude Type consistency with Common Dictionary",
Expand All @@ -634,6 +715,18 @@ def _evaluate_operator_location(
severity=Severity.Medium,
)

if altitude_type_inj is not None:
with self._test_scenario.check(
"Operator Altitude Type is consistent with injected one",
participants,
) as check:
if alt_type != altitude_type_inj:
check.record_failed(
"Observed Operator Altitude Type is inconsistent with injected one",
details=f"Observed: {alt_type} - Injected: {altitude_type_inj}",
severity=Severity.Medium,
)

else:
self._test_scenario.record_note(
key="skip_reason",
Expand Down Expand Up @@ -670,7 +763,7 @@ def _evaluate_operational_status(
if not value_obs == value_inj:
check.record_failed(
"Observed operational status inconsistent with injected one",
details=f"Injected operational status: {value_inj} Observed {value_obs}",
details=f"Injected operational status: {value_inj} - Observed {value_obs}",
)

else:
Expand Down
Loading
Loading