From 72c4322c0e47c5a7c14b5929a4589d00c861a811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Misbach?= Date: Tue, 10 Dec 2024 16:17:52 +0100 Subject: [PATCH] [uss_qualifier/scenarios/netrid] Move recent position timings checks to display_data_evaluator from common_data_dictionary --- .../netrid/common_dictionary_evaluator.py | 133 +--------- .../common_dictionary_evaluator_test.py | 232 +---------------- .../astm/netrid/display_data_evaluator.py | 125 ++++++++- .../netrid/display_data_evaluator_test.py | 246 ++++++++++++++++++ .../common_dictionary_evaluator_sp_flight.md | 2 +- .../common_dictionary_evaluator_sp_flight.md | 2 +- 6 files changed, 374 insertions(+), 366 deletions(-) create mode 100644 monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator_test.py diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py index dc1f11d934..f4232bcda1 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator.py @@ -1,8 +1,6 @@ -import datetime import math from typing import List, Optional -import s2sphere from arrow import ParserError from implicitdict import StringBasedDateTime from uas_standards.ansi_cta_2063_a import SerialNumber @@ -25,22 +23,15 @@ ) from monitoring.monitorlib.fetch.rid import ( - FetchedFlights, FlightDetails, ) from monitoring.monitorlib.fetch.rid import Flight, Position from monitoring.monitorlib.geo import validate_lat, validate_lng, Altitude, LatLngPoint from monitoring.monitorlib.rid import RIDVersion from monitoring.uss_qualifier.common_data_definitions import Severity +from monitoring.uss_qualifier.configurations.configuration import ParticipantID from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration -from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType, PendingCheck - -# SP responses to /flights endpoint's p99 should be below this: -SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3 -NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60 -_POSITION_TIMESTAMP_MAX_AGE_SEC = ( - NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC -) +from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType class RIDCommonDictionaryEvaluator(object): @@ -54,34 +45,17 @@ def __init__( self._test_scenario = test_scenario self._rid_version = rid_version - def evaluate_sp_flights( + def evaluate_sp_flight( self, - requested_area: s2sphere.LatLngRect, - observed_flights: FetchedFlights, - participants: List[str], + observed_flight: Flight, + participant_id: ParticipantID, ): """Implements fragment documented in `common_dictionary_evaluator_sp_flight.md`.""" - for url, uss_flights in observed_flights.uss_flight_queries.items(): - # For the timing checks, we want to look at the flights relative to the query - # they came from, as they may be provided from different SP's. - for f in uss_flights.flights: - self.evaluate_sp_flight_recent_positions_times( - f, - uss_flights.query.response.reported.datetime, - participants, - ) - - self.evaluate_sp_flight_recent_positions_crossing_area_boundary( - requested_area, f, participants - ) - - for f in observed_flights.flights: - # Evaluate on all flights regardless of where they came from - self._evaluate_operational_status( - f.operational_status, - participants, - ) + self._evaluate_operational_status( + observed_flight.operational_status, + [participant_id], + ) def evaluate_dp_flight( self, @@ -121,95 +95,6 @@ def evaluate_dp_flight( participants, ) - def _evaluate_recent_position_time( - self, p: Position, query_time: datetime.datetime, check: PendingCheck - ): - """Check that the position's timestamp is at most 60 seconds before the request time.""" - if (query_time - p.time).total_seconds() > _POSITION_TIMESTAMP_MAX_AGE_SEC: - check.record_failed( - "A Position timestamp was older than the tolerance.", - details=f"Position timestamp: {p.time}, query time: {query_time}", - severity=Severity.Medium, - ) - - def evaluate_sp_flight_recent_positions_times( - self, f: Flight, query_time: datetime.datetime, participants: List[str] - ): - with self._test_scenario.check( - "Recent positions timestamps", participants - ) as check: - for p in f.recent_positions: - self._evaluate_recent_position_time(p, query_time, check) - - def _chronological_positions(self, f: Flight) -> List[s2sphere.LatLng]: - """ - Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last. - """ - return [ - s2sphere.LatLng.from_degrees(p.lat, p.lng) - for p in sorted(f.recent_positions, key=lambda p: p.time) - ] - - def _sliding_triples( - self, points: List[s2sphere.LatLng] - ) -> List[List[s2sphere.LatLng]]: - """ - Returns a list of triples of consecutive positions in passed the list. - """ - return [ - (points[i], points[i + 1], points[i + 2]) for i in range(len(points) - 2) - ] - - def evaluate_sp_flight_recent_positions_crossing_area_boundary( - self, requested_area: s2sphere.LatLngRect, f: Flight, participants: List[str] - ): - with self._test_scenario.check( - "Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing", - participants, - ) as check: - - def fail_check(): - check.record_failed( - "A position outside the area was neither preceded nor followed by a position inside the area.", - details=f"Positions: {f.recent_positions}, requested_area: {requested_area}", - severity=Severity.Medium, - ) - - positions = self._chronological_positions(f) - if len(positions) < 2: - # Check does not apply in this case - return - - if len(positions) == 2: - # Only one of the positions can be outside the area. If both are, we fail. - if not requested_area.contains( - positions[0] - ) and not requested_area.contains(positions[1]): - fail_check() - return - - # For each sliding triple we check that if the middle position is outside the area, then either - # the first or the last position is inside the area. This means checking for any point that is inside the - # area in the triple and failing otherwise - for triple in self._sliding_triples(self._chronological_positions(f)): - if not ( - requested_area.contains(triple[0]) - or requested_area.contains(triple[1]) - or requested_area.contains(triple[2]) - ): - fail_check() - - # Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside. - # (These won't be caught by the iteration on the triples above) - if ( - not requested_area.contains(positions[0]) - and not requested_area.contains(positions[1]) - ) or ( - not requested_area.contains(positions[-1]) - and not requested_area.contains(positions[-2]) - ): - fail_check() - def evaluate_sp_details(self, details: FlightDetails, participants: List[str]): """Implements fragment documented in `common_dictionary_evaluator_sp_flight_details.md`.""" diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator_test.py b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator_test.py index f87c9bdc5e..ef9a11697e 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator_test.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/common_dictionary_evaluator_test.py @@ -1,7 +1,6 @@ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from typing import List, Tuple, Optional -import s2sphere from implicitdict import StringBasedDateTime from uas_standards.astm.f3411 import v22a from uas_standards.astm.f3411.v22a.api import ( @@ -380,235 +379,6 @@ def test_height(): ) # mismatching reference -def _assert_evaluate_sp_flight_recent_positions( - f: Flight, query_time: datetime, outcome: bool -): - def step_under_test(self: UnitTestScenario): - evaluator = RIDCommonDictionaryEvaluator( - config=EvaluationConfiguration(), - test_scenario=self, - rid_version=RIDVersion.f3411_22a, - ) - evaluator.evaluate_sp_flight_recent_positions_times( - f, query_time, RIDVersion.f3411_22a - ) - - unit_test_scenario = UnitTestScenario(step_under_test).execute_unit_test() - assert unit_test_scenario.get_report().successful == outcome - - -def test_evaluate_sp_flight_recent_positions(): - - some_time = datetime.now(timezone.utc) - # All samples within last minute: should pass - _assert_evaluate_sp_flight_recent_positions( - mock_flight(some_time, 7, 10), some_time, True - ) - # oldest sample outside last minute: should fail - _assert_evaluate_sp_flight_recent_positions( - mock_flight(some_time, 8, 10), some_time, False - ) - # No positions: not expected but this is not this test's problem - _assert_evaluate_sp_flight_recent_positions( - mock_flight(some_time, 0, 10), some_time, True - ) - - -def _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - requested_area: s2sphere.LatLngRect, f: Flight, outcome: bool -): - def step_under_test(self: UnitTestScenario): - evaluator = RIDCommonDictionaryEvaluator( - config=EvaluationConfiguration(), - test_scenario=self, - rid_version=RIDVersion.f3411_22a, - ) - evaluator.evaluate_sp_flight_recent_positions_crossing_area_boundary( - requested_area, f, RIDVersion.f3411_22a - ) - - unit_test_scenario = UnitTestScenario(step_under_test).execute_unit_test() - assert unit_test_scenario.get_report().successful == outcome - - -def test_evaluate_sp_flight_recent_positions_crossing_area_boundary(): - # Mock flight with no recent position: should pass - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(0.5, 0.5), - ), - mock_flight(datetime.now(timezone.utc), 0, 10), - True, - ) - # Mock flight with one recent position: should pass event if outside of area - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(0.5, 0.5), - ), - mock_flight(datetime.now(timezone.utc), 1, 10), - True, - ) - - # Mock flight with two recent positions within area: should pass - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2, 2), - ), - mock_flight(datetime.now(timezone.utc), 2, 10), - True, - ) - - # Mock flight with two recent positions outside area: should fail - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(0.5, 0.5), - ), - mock_flight(datetime.now(timezone.utc), 2, 10), - False, - ) - - # Mock flight with two recent positions, one of which is outside area: should pass - f2_1 = mock_flight(datetime.now(timezone.utc), 0, 0) - f2_1.v22a_value.recent_positions = to_positions( - [(1.0, 1.0), (-1.0, -1.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f2_1, - True, - ) - - f2_2 = mock_flight(datetime.now(timezone.utc), 0, 0) - f2_2.v22a_value.recent_positions = to_positions( - [(-1.0, -1.0), (1.0, 1.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f2_2, - True, - ) - - # Mock flight with 3 recent positions completely outside requested area: should fail - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(0.5, 0.5), - ), - mock_flight(datetime.now(timezone.utc), 3, 10), - False, - ) - - # Mock flight with 3 recent positions, the second of which is in the area: should pass - f3_1 = mock_flight(datetime.now(timezone.utc), 0, 0) - f3_1.v22a_value.recent_positions = to_positions( - [(-1.0, -1.0), (1.0, 1.0), (3.0, 3.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f3_1, - True, - ) - - # Mock flight with 3 recent positions, only the last of which is in the area: should fail - f3_2 = mock_flight(datetime.now(timezone.utc), 0, 0) - f3_2.v22a_value.recent_positions = to_positions( - [(-1.0, -1.0), (3.0, 3.0), (1.0, 1.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f3_2, - False, - ) - - # Mock flight with 3 recent positions, only the first of which is in the area: should fail - f3_3 = mock_flight(datetime.now(timezone.utc), 0, 0) - f3_3.v22a_value.recent_positions = to_positions( - [(1.0, 1.0), (3.0, 3.0), (-1.0, -1.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f3_3, - False, - ) - - # Mock flight with 3 recent positions within requested area: should pass - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2, 2), - ), - mock_flight(datetime.now(timezone.utc), 3, 10), - True, - ) - - # Mock flight with 4 recent positions, last position outside requested area: should pass - f4_1 = mock_flight(datetime.now(timezone.utc), 0, 0) - f4_1.v22a_value.recent_positions = to_positions( - [(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (3.0, 3.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f4_1, - True, - ) - - # Mock flight with 4 recent positions, first position outside requested area: should pass - f4_2 = mock_flight(datetime.now(timezone.utc), 0, 0) - f4_2.v22a_value.recent_positions = to_positions( - [(3.0, 3.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)], datetime.now(timezone.utc) - ) - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2.0, 2.0), - ), - f4_2, - True, - ) - - # Mock flight completely within requested area: should pass - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(2, 2), - ), - mock_flight(datetime.now(timezone.utc), 7, 10), - True, - ) - - # Mock flight completely outside requested area: should fail - _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( - s2sphere.LatLngRect( - s2sphere.LatLng.from_degrees(0.0, 0.0), - s2sphere.LatLng.from_degrees(0.5, 0.5), - ), - mock_flight(datetime.now(timezone.utc), 7, 10), - False, - ) - - def mock_flight( last_position_time: datetime, positions_count: int, diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py index 61b74480fb..628645ec7a 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py @@ -1,12 +1,13 @@ +import datetime + import math from dataclasses import dataclass -from typing import List, Optional, Dict, Union, Set, Tuple +from typing import List, Optional, Dict, Union, Set, Tuple, cast import arrow import s2sphere from loguru import logger from s2sphere import LatLng, LatLngRect -from uas_standards.astm.f3411.v22a.api import RIDHeightReference from uas_standards.interuss.automated_testing.rid.v1.observation import ( Flight, @@ -24,6 +25,7 @@ ) from monitoring.monitorlib.rid import RIDVersion from monitoring.uss_qualifier.common_data_definitions import Severity +from monitoring.uss_qualifier.configurations.configuration import ParticipantID from monitoring.uss_qualifier.resources.astm.f3411.dss import DSSInstance from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration from monitoring.uss_qualifier.resources.netrid.observers import RIDSystemObserver @@ -54,6 +56,13 @@ def _rect_str(rect) -> str: TIMESTAMP_ACCURACY_PRECISION = 0.05 HEIGHT_PRECISION_M = 1 +# SP responses to /flights endpoint's p99 should be below this: +SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3 +NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60 +_POSITION_TIMESTAMP_MAX_AGE_SEC = ( + NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC +) + @dataclass class DPObservedFlight(object): @@ -879,8 +888,9 @@ def _evaluate_normal_sp_observation( set(), ) - # Verify that flights queries returned correctly-formatted data for mapping in mappings.values(): + participant_id = mapping.injected_flight.uss_participant_id + observed_flight = mapping.observed_flight.flight flights_queries = [ q for flight_url, q in sp_observation.uss_flight_queries.items() @@ -888,16 +898,18 @@ def _evaluate_normal_sp_observation( ] if len(flights_queries) != 1: raise RuntimeError( - f"Found {len(flights_queries)} flights queries (instead of the expected 1) for flight {mapping.observed_flight.id} corresponding to injection ID {mapping.injected_flight.flight.injection_id} for {mapping.injected_flight.uss_participant_id}" + f"Found {len(flights_queries)} flights queries (instead of the expected 1) for flight {mapping.observed_flight.id} corresponding to injection ID {mapping.injected_flight.flight.injection_id} for {participant_id}" ) flights_query = flights_queries[0] + + # Verify that flights queries returned correctly-formatted data errors = schema_validation.validate( self._rid_version.openapi_path, self._rid_version.openapi_flights_response_path, flights_query.query.response.json, ) with self._test_scenario.check( - "Flights data format", [mapping.injected_flight.uss_participant_id] + "Flights data format", participant_id ) as check: if errors: check.record_failed( @@ -910,10 +922,21 @@ def _evaluate_normal_sp_observation( ), query_timestamps=[flights_query.query.request.timestamp], ) - self._common_dictionary_evaluator.evaluate_sp_flights( - requested_area, - sp_observation, - participants=[mapping.injected_flight.uss_participant_id], + + # Check recent positions timings + self._evaluate_sp_flight_recent_positions_times( + observed_flight, + flights_query.query.response.reported.datetime, + participant_id, + ) + self._evaluate_sp_flight_recent_positions_crossing_area_boundary( + requested_area, observed_flight, participant_id + ) + + # Check flight consistency with common data dictionary + self._common_dictionary_evaluator.evaluate_sp_flight( + observed_flight, + participant_id, ) # Check that required fields are present and match for any observed flights matching injected flights @@ -1199,3 +1222,87 @@ def _evaluate_area_too_large_sp_observation( mapping.observed_flight.query.query.request.timestamp ], ) + + def _evaluate_sp_flight_recent_positions_times( + self, f: Flight, query_time: datetime.datetime, participant: ParticipantID + ): + with self._test_scenario.check( + "Recent positions timestamps", participant + ) as check: + for p in f.recent_positions: + # check that the position's timestamp is at most 60 seconds before the request time + if ( + query_time - p.time + ).total_seconds() > _POSITION_TIMESTAMP_MAX_AGE_SEC: + check.record_failed( + "A Position timestamp was older than the tolerance.", + details=f"Position timestamp: {p.time}, query time: {query_time}", + severity=Severity.Medium, + ) + + def _evaluate_sp_flight_recent_positions_crossing_area_boundary( + self, requested_area: s2sphere.LatLngRect, f: Flight, participant: ParticipantID + ): + with self._test_scenario.check( + "Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing", + participant, + ) as check: + + def fail_check(): + check.record_failed( + "A position outside the area was neither preceded nor followed by a position inside the area.", + details=f"Positions: {f.recent_positions}, requested_area: {requested_area}", + severity=Severity.Medium, + ) + + positions = _chronological_positions(f) + if len(positions) < 2: + # Check does not apply in this case + return + + if len(positions) == 2: + # Only one of the positions can be outside the area. If both are, we fail. + if not requested_area.contains( + positions[0] + ) and not requested_area.contains(positions[1]): + fail_check() + return + + # For each sliding triple we check that if the middle position is outside the area, then either + # the first or the last position is inside the area. This means checking for any point that is inside the + # area in the triple and failing otherwise + for triple in _sliding_triples(_chronological_positions(f)): + if not ( + requested_area.contains(triple[0]) + or requested_area.contains(triple[1]) + or requested_area.contains(triple[2]) + ): + fail_check() + + # Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside. + # (These won't be caught by the iteration on the triples above) + if ( + not requested_area.contains(positions[0]) + and not requested_area.contains(positions[1]) + ) or ( + not requested_area.contains(positions[-1]) + and not requested_area.contains(positions[-2]) + ): + fail_check() + + +def _chronological_positions(f: Flight) -> List[s2sphere.LatLng]: + """ + Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last. + """ + return [ + s2sphere.LatLng.from_degrees(p.lat, p.lng) + for p in sorted(f.recent_positions, key=lambda p: p.time) + ] + + +def _sliding_triples(points: List[s2sphere.LatLng]) -> List[List[s2sphere.LatLng]]: + """ + Returns a list of triples of consecutive positions in passed the list. + """ + return [[points[i], points[i + 1], points[i + 2]] for i in range(len(points) - 2)] diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator_test.py b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator_test.py new file mode 100644 index 0000000000..2d8769b392 --- /dev/null +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator_test.py @@ -0,0 +1,246 @@ +from datetime import datetime, timezone + +import s2sphere + +from monitoring.monitorlib.fetch.rid import Flight +from monitoring.monitorlib.rid import RIDVersion +from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration +from monitoring.uss_qualifier.scenarios.astm.netrid.common_dictionary_evaluator_test import ( + mock_flight, + to_positions, +) +from monitoring.uss_qualifier.scenarios.astm.netrid.display_data_evaluator import ( + RIDObservationEvaluator, +) +from monitoring.uss_qualifier.scenarios.interuss.unit_test import UnitTestScenario + + +def _assert_evaluate_sp_flight_recent_positions( + f: Flight, query_time: datetime, outcome: bool +): + def step_under_test(self: UnitTestScenario): + evaluator = RIDObservationEvaluator( + config=EvaluationConfiguration(), + test_scenario=self, + rid_version=RIDVersion.f3411_22a, + injected_flights=[], + ) + evaluator._evaluate_sp_flight_recent_positions_times( + f, query_time, RIDVersion.f3411_22a + ) + + unit_test_scenario = UnitTestScenario(step_under_test).execute_unit_test() + assert unit_test_scenario.get_report().successful == outcome + + +def test_evaluate_sp_flight_recent_positions(): + + some_time = datetime.now(timezone.utc) + # All samples within last minute: should pass + _assert_evaluate_sp_flight_recent_positions( + mock_flight(some_time, 7, 10), some_time, True + ) + # oldest sample outside last minute: should fail + _assert_evaluate_sp_flight_recent_positions( + mock_flight(some_time, 8, 10), some_time, False + ) + # No positions: not expected but this is not this test's problem + _assert_evaluate_sp_flight_recent_positions( + mock_flight(some_time, 0, 10), some_time, True + ) + + +def _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + requested_area: s2sphere.LatLngRect, f: Flight, outcome: bool +): + def step_under_test(self: UnitTestScenario): + evaluator = RIDObservationEvaluator( + config=EvaluationConfiguration(), + test_scenario=self, + rid_version=RIDVersion.f3411_22a, + injected_flights=[], + ) + evaluator._evaluate_sp_flight_recent_positions_crossing_area_boundary( + requested_area, f, RIDVersion.f3411_22a + ) + + unit_test_scenario = UnitTestScenario(step_under_test).execute_unit_test() + assert unit_test_scenario.get_report().successful == outcome + + +def test_evaluate_sp_flight_recent_positions_crossing_area_boundary(): + # Mock flight with no recent position: should pass + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(0.5, 0.5), + ), + mock_flight(datetime.now(timezone.utc), 0, 10), + True, + ) + # Mock flight with one recent position: should pass event if outside of area + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(0.5, 0.5), + ), + mock_flight(datetime.now(timezone.utc), 1, 10), + True, + ) + + # Mock flight with two recent positions within area: should pass + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2, 2), + ), + mock_flight(datetime.now(timezone.utc), 2, 10), + True, + ) + + # Mock flight with two recent positions outside area: should fail + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(0.5, 0.5), + ), + mock_flight(datetime.now(timezone.utc), 2, 10), + False, + ) + + # Mock flight with two recent positions, one of which is outside area: should pass + f2_1 = mock_flight(datetime.now(timezone.utc), 0, 0) + f2_1.v22a_value.recent_positions = to_positions( + [(1.0, 1.0), (-1.0, -1.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f2_1, + True, + ) + + f2_2 = mock_flight(datetime.now(timezone.utc), 0, 0) + f2_2.v22a_value.recent_positions = to_positions( + [(-1.0, -1.0), (1.0, 1.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f2_2, + True, + ) + + # Mock flight with 3 recent positions completely outside requested area: should fail + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(0.5, 0.5), + ), + mock_flight(datetime.now(timezone.utc), 3, 10), + False, + ) + + # Mock flight with 3 recent positions, the second of which is in the area: should pass + f3_1 = mock_flight(datetime.now(timezone.utc), 0, 0) + f3_1.v22a_value.recent_positions = to_positions( + [(-1.0, -1.0), (1.0, 1.0), (3.0, 3.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f3_1, + True, + ) + + # Mock flight with 3 recent positions, only the last of which is in the area: should fail + f3_2 = mock_flight(datetime.now(timezone.utc), 0, 0) + f3_2.v22a_value.recent_positions = to_positions( + [(-1.0, -1.0), (3.0, 3.0), (1.0, 1.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f3_2, + False, + ) + + # Mock flight with 3 recent positions, only the first of which is in the area: should fail + f3_3 = mock_flight(datetime.now(timezone.utc), 0, 0) + f3_3.v22a_value.recent_positions = to_positions( + [(1.0, 1.0), (3.0, 3.0), (-1.0, -1.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f3_3, + False, + ) + + # Mock flight with 3 recent positions within requested area: should pass + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2, 2), + ), + mock_flight(datetime.now(timezone.utc), 3, 10), + True, + ) + + # Mock flight with 4 recent positions, last position outside requested area: should pass + f4_1 = mock_flight(datetime.now(timezone.utc), 0, 0) + f4_1.v22a_value.recent_positions = to_positions( + [(1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (3.0, 3.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f4_1, + True, + ) + + # Mock flight with 4 recent positions, first position outside requested area: should pass + f4_2 = mock_flight(datetime.now(timezone.utc), 0, 0) + f4_2.v22a_value.recent_positions = to_positions( + [(3.0, 3.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)], datetime.now(timezone.utc) + ) + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2.0, 2.0), + ), + f4_2, + True, + ) + + # Mock flight completely within requested area: should pass + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(2, 2), + ), + mock_flight(datetime.now(timezone.utc), 7, 10), + True, + ) + + # Mock flight completely outside requested area: should fail + _assert_evaluate_sp_flight_recent_positions_crossing_area_boundary( + s2sphere.LatLngRect( + s2sphere.LatLng.from_degrees(0.0, 0.0), + s2sphere.LatLng.from_degrees(0.5, 0.5), + ), + mock_flight(datetime.now(timezone.utc), 7, 10), + False, + ) diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/v19/common_dictionary_evaluator_sp_flight.md b/monitoring/uss_qualifier/scenarios/astm/netrid/v19/common_dictionary_evaluator_sp_flight.md index c5cf0f5d24..e1c7da0830 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/v19/common_dictionary_evaluator_sp_flight.md +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/v19/common_dictionary_evaluator_sp_flight.md @@ -1,6 +1,6 @@ # ASTM NetRID v19 Service Provider flight consistency with Common Data Dictionary test step fragment -This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flights`. +This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flight`. ## Service Provider altitude check diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/v22a/common_dictionary_evaluator_sp_flight.md b/monitoring/uss_qualifier/scenarios/astm/netrid/v22a/common_dictionary_evaluator_sp_flight.md index ac71dcf28b..d4ffe022d9 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/v22a/common_dictionary_evaluator_sp_flight.md +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/v22a/common_dictionary_evaluator_sp_flight.md @@ -1,6 +1,6 @@ # ASTM NetRID v22a Service Provider flight consistency with Common Data Dictionary test step fragment -This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flights`. +This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flight`. ## Service Provider altitude check