diff --git a/monitoring/monitorlib/geo.py b/monitoring/monitorlib/geo.py index 5d6c138bcb..f06be05c73 100644 --- a/monitoring/monitorlib/geo.py +++ b/monitoring/monitorlib/geo.py @@ -7,12 +7,20 @@ import s2sphere import shapely.geometry from uas_standards.astm.f3548.v21 import api as f3548v21 +from uas_standards.astm.f3411.v19 import api as f3411v19 +from uas_standards.astm.f3411.v22a import api as f3411v22a +from uas_standards.interuss.automated_testing.rid.v1 import ( + injection as f3411testing_injection, +) EARTH_CIRCUMFERENCE_KM = 40075 EARTH_CIRCUMFERENCE_M = EARTH_CIRCUMFERENCE_KM * 1000 EARTH_RADIUS_M = 40075 * 1000 / (2 * math.pi) EARTH_AREA_M2 = 4 * math.pi * math.pow(EARTH_RADIUS_M, 2) +DISTANCE_TOLERANCE_M = 0.01 +COORD_TOLERANCE_DEG = 360 / EARTH_CIRCUMFERENCE_M * DISTANCE_TOLERANCE_M + class DistanceUnits(str, Enum): M = "M" @@ -31,9 +39,29 @@ class LatLngPoint(ImplicitDict): lng: float """Longitude (degrees)""" + @staticmethod + def from_f3411( + position: Union[ + f3411v19.RIDAircraftPosition, + f3411v22a.RIDAircraftPosition, + f3411testing_injection.RIDAircraftPosition, + ] + ): + return LatLngPoint( + lat=position.lat, + lng=position.lng, + ) + def as_s2sphere(self) -> s2sphere.LatLng: return s2sphere.LatLng.from_degrees(self.lat, self.lng) + def match(self, other: LatLngPoint) -> bool: + """Determine whether two points may be mistaken for each other.""" + return ( + abs(self.lat - other.lat) < COORD_TOLERANCE_DEG + and abs(self.lng - other.lng) < COORD_TOLERANCE_DEG + ) + class Radius(ImplicitDict): value: float diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/common/misbehavior.py b/monitoring/uss_qualifier/scenarios/astm/netrid/common/misbehavior.py index e89377589c..1aa3ae1e29 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/common/misbehavior.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/common/misbehavior.py @@ -1,15 +1,11 @@ -import time import traceback -from typing import List, Dict +from typing import List, Set -import arrow import s2sphere -from loguru import logger from requests.exceptions import RequestException +from s2sphere import LatLngRect -from monitoring.monitorlib import fetch from monitoring.monitorlib.fetch import rid -from monitoring.monitorlib.fetch.rid import FetchedFlights from monitoring.monitorlib.infrastructure import UTMClientSession from monitoring.monitorlib.rid import RIDVersion from monitoring.uss_qualifier.common_data_definitions import Severity @@ -19,15 +15,20 @@ NetRIDServiceProviders, EvaluationConfigurationResource, ) -from monitoring.uss_qualifier.scenarios.astm.netrid.common import nominal_behavior -from monitoring.uss_qualifier.scenarios.astm.netrid.display_data_evaluator import ( - DPObservedFlight, - map_observations_to_injected_flights, +from monitoring.uss_qualifier.scenarios.astm.netrid import ( + injection, + display_data_evaluator, +) +from monitoring.uss_qualifier.scenarios.astm.netrid.injected_flight_collection import ( + InjectedFlightCollection, ) from monitoring.uss_qualifier.scenarios.astm.netrid.injection import ( InjectedFlight, InjectedTest, ) +from monitoring.uss_qualifier.scenarios.astm.netrid.virtual_observer import ( + VirtualObserver, +) from monitoring.uss_qualifier.scenarios.scenario import GenericTestScenario @@ -39,6 +40,7 @@ class Misbehavior(GenericTestScenario): _flights_data: FlightDataResource _service_providers: NetRIDServiceProviders _evaluation_configuration: EvaluationConfigurationResource + _injected_flights: List[InjectedFlight] _injected_tests: List[InjectedTest] @@ -53,8 +55,6 @@ def __init__( self._flights_data = flights_data self._service_providers = service_providers self._evaluation_configuration = evaluation_configuration - self._injected_flights = [] - self._injected_tests = [] if len(dss_pool.dss_instances) == 0: raise ValueError( "The Misbehavior Scenario requires at least one DSS instance" @@ -85,77 +85,53 @@ def run(self): self.end_test_scenario() def _inject_flights(self): - ( - self._injected_flights, - self._injected_tests, - ) = nominal_behavior.inject_flights( - test_scenario=self, - flights_data=self._flights_data, - service_providers=self._service_providers, - evaluation_configuration=self._evaluation_configuration, - realtime_period=self._rid_version.realtime_period, + (self._injected_flights, self._injected_tests) = injection.inject_flights( + self, self._flights_data, self._service_providers ) def _poll_unauthenticated_during_flights(self): config = self._evaluation_configuration.configuration + virtual_observer = VirtualObserver( + injected_flights=InjectedFlightCollection(self._injected_flights), + repeat_query_rect_period=config.repeat_query_rect_period, + min_query_diagonal_m=config.min_query_diagonal, + relevant_past_data_period=self._rid_version.realtime_period + + config.max_propagation_latency.timedelta, + ) - t_end = self._virtual_observer.get_last_time_of_interest() - t_now = arrow.utcnow() + remaining_injection_ids = set( + inj_flight.flight.injection_id for inj_flight in self._injected_flights + ) - if t_now > t_end: - raise RuntimeError( - f"Cannot evaluate RID system: injected test flights ended at {t_end}, which is before now ({t_now})" - ) + def poll_fct(rect: LatLngRect) -> bool: + nonlocal remaining_injection_ids - logger.debug(f"Polling from {t_now} until {t_end}") - for f in self._injected_flights: - span = f.flight.get_span() - logger.debug( - f"Flight {f.uss_participant_id}/{f.flight.injection_id} {span[0].isoformat()} to {span[1].isoformat()}", - ) + tested_inj_ids = self._evaluate_and_test_authentication(rect) + remaining_injection_ids -= tested_inj_ids - t_next = arrow.utcnow() - dt = config.min_polling_interval.timedelta - while arrow.utcnow() < t_end: - # Evaluate the system at an instant in time for various areas - diagonals_m = [ + # interrupt polling if there are no more injection IDs to cover + return len(remaining_injection_ids) == 0 + + virtual_observer.start_polling( + config.min_polling_interval.timedelta, + [ self._rid_version.max_diagonal_km * 1000 + 500, # too large self._rid_version.max_diagonal_km * 1000 - 100, # clustered self._rid_version.max_details_diagonal_km * 1000 - 100, # details - ] - auth_tests = [] - for diagonal_m in diagonals_m: - rect = self._virtual_observer.get_query_rect(diagonal_m) - auth_tests.append(self._evaluate_and_test_authentication(rect)) - - # If we checked for all diagonals that flights queries are properly authenticated, - # we can stop polling - if all(auth_tests): - logger.debug( - "Authentication check is complete, ending polling now.", - ) - break - - # Wait until minimum polling interval elapses - while t_next < arrow.utcnow(): - t_next += dt - if t_next > t_end: - break - delay = t_next - arrow.utcnow() - if delay.total_seconds() > 0: - logger.debug( - f"Waiting {delay.total_seconds()} seconds before polling RID system again..." - ) - time.sleep(delay.total_seconds()) + ], + poll_fct, + ) def _evaluate_and_test_authentication( self, rect: s2sphere.LatLngRect, - ) -> bool: + ) -> Set[str]: """Queries all flights in the expected way, then repeats the queries to SPs without credentials. returns true once queries to SPS have been made without credentials. False otherwise, such as when no flights were yet returned by the authenticated queries. + + :returns: set of injection IDs that were encountered and tested """ # We grab all flights from the SP's (which we know how to reach by first querying the DSS). @@ -166,70 +142,83 @@ def _evaluate_and_test_authentication( get_details=True, rid_version=self._rid_version, session=self._dss.client, - dss_server_id=self._dss.participant_id, + dss_participant_id=self._dss.participant_id, ) - # Set the participant ID on the SP queries wherever possible - _attribute_sp_queries_to_participant_id(self._injected_flights, sp_observation) - - # We fish out the queries that were used to grab the flights from the SP, - # and attempt to re-query without credentials. This should fail. - unauthenticated_session = UTMClientSession( - prefix_url=self._dss.client.get_prefix_url(), - auth_adapter=None, - timeout_seconds=self._dss.client.timeout_seconds, + mapping_by_injection_id = ( + display_data_evaluator.map_fetched_to_injected_flights( + self._injected_flights, list(sp_observation.uss_flight_queries.values()) + ) ) + for q in sp_observation.queries: + self.record_query(q) - queries_to_repeat = list(sp_observation.uss_flight_queries.values()) + list( - sp_observation.uss_flight_details_queries.values() - ) + for injection_id, mapping in mapping_by_injection_id.items(): + participant_id = mapping.injected_flight.uss_participant_id + flights_url = mapping.observed_flight.query.flights_url + unauthenticated_session = UTMClientSession(flights_url, None) - if len(queries_to_repeat) == 0: - logger.debug("no flights queries to repeat at this point.") - return False + self.record_note( + f"{participant_id}/{injection_id}/missing_credentials_queries", + f"Will attempt querying with missing credentials at flights URL {flights_url} for a flights list and {len(mapping.observed_flight.query.flights)} flight details.", + ) - logger.debug( - f"about to repeat {len(queries_to_repeat)} flights queries without credentials" - ) + with self.check("Missing credentials", [participant_id]) as check: - # Attempt to re-query the flights and flight details URLs: - for fq in queries_to_repeat: - sp_server_id = fq.query.get("server_id", "unknown") - # We may well have queried SP's for real flights that happen - # to be present in the area in which we inject flights; - if sp_server_id == "unknown": - # TODO we could skip these once development on the qualifier has stopped, - # however, unattributed queries are likely the cause of a bug, - # and we want to find out. - logger.error(f"got unattributed SP query to: {fq.query.request.url}") - - failed_q = fetch.query_and_describe( - client=unauthenticated_session, - verb=fq.query.request.method, - url=fq.query.request.url, - json=fq.query.request.json, - data=fq.query.request.body, - server_id=sp_server_id, - ) - logger.info( - f"Repeating query to {fq.query.request.url} without credentials" - ) - with self.check("Missing credentials", [sp_server_id]) as check: - if failed_q.response.code not in [401, 403]: + # check uss flights query + uss_flights_query = rid.uss_flights( + flights_url, + rect, + True, + self._rid_version, + unauthenticated_session, + participant_id, + ) + self.record_query(uss_flights_query.query) + + if uss_flights_query.success: check.record_failed( - "unauthenticated request was fulfilled", - participants=[sp_server_id], - severity=Severity.MEDIUM, - details=f"queried flights on {fq.query.request.url} with no credentials, expected a failure but got a success reply", + "Unauthenticated request for flights to USS was fulfilled", + participants=[participant_id], + severity=Severity.Medium, + details=f"Queried flights on {flights_url} for USS {participant_id} with no credentials, expected a failure but got a success reply.", ) - else: - logger.info( - f"participant with id {sp_server_id} properly authenticated the request" + elif uss_flights_query.status_code != 401: + check.record_failed( + "Unauthenticated request for flights failed with wrong HTTP code", + participants=[participant_id], + severity=Severity.Medium, + details=f"Queried flights on {flights_url} for USS {participant_id} with no credentials, expected an HTTP 401 but got an HTTP {uss_flights_query.status_code}.", ) - # Keep track of the failed queries, too - self.record_query(failed_q) - return True + # check flight details query + for flight in mapping.observed_flight.query.flights: + uss_flight_details_query = rid.flight_details( + flights_url, + flight.id, + False, + self._rid_version, + unauthenticated_session, + participant_id, + ) + self.record_query(uss_flight_details_query.query) + + if uss_flight_details_query.success: + check.record_failed( + "Unauthenticated request for flight details to USS was fulfilled", + participants=[participant_id], + severity=Severity.Medium, + details=f"Queried flight details on {flights_url} for USS {participant_id} for flight {flight.id} with no credentials, expected a failure but got a success reply.", + ) + elif uss_flight_details_query.status_code != 401: + check.record_failed( + "Unauthenticated request for flight details failed with wrong HTTP code", + participants=[participant_id], + severity=Severity.Medium, + details=f"Queried flight details on {flights_url} for USS {participant_id} for flight {flight.id} with no credentials, expected an HTTP 401 but got an HTTP {uss_flight_details_query.status_code}.", + ) + + return set(mapping_by_injection_id.keys()) def cleanup(self): self.begin_cleanup() @@ -265,24 +254,3 @@ def cleanup(self): details=f"While trying to delete a test flight from {sp.participant_id}, encountered error:\n{stacktrace}", ) self.end_cleanup() - - -def _attribute_sp_queries_to_participant_id( - injected_flights: List[InjectedFlight], - fetched_flights: FetchedFlights, -): - observed_flights = [] - for uss_query in fetched_flights.uss_flight_queries.values(): - for f in range(len(uss_query.flights)): - observed_flights.append(DPObservedFlight(query=uss_query, flight=f)) - - mapping_by_injection_id = map_observations_to_injected_flights( - injected_flights, observed_flights - ) - - for telemetry_mapping in mapping_by_injection_id.values(): - # For flights that were mapped to an injection ID, - # update the observation queries with the participant id for future use in the aggregate checks - telemetry_mapping.observed_flight.query.set_server_id( - telemetry_mapping.injected_flight.uss_participant_id - ) diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/common/nominal_behavior.py b/monitoring/uss_qualifier/scenarios/astm/netrid/common/nominal_behavior.py index 928954c285..0bbca48d26 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/common/nominal_behavior.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/common/nominal_behavior.py @@ -1,20 +1,10 @@ -import time import traceback -import uuid -from datetime import timedelta -from typing import List, Optional, Tuple +from typing import List, Optional -import arrow -from implicitdict import ImplicitDict -from loguru import logger from requests.exceptions import RequestException -from uas_standards.interuss.automated_testing.rid.v1.injection import ChangeTestResponse +from s2sphere import LatLngRect from monitoring.monitorlib.rid import RIDVersion -from monitoring.monitorlib.rid_automated_testing.injection_api import ( - CreateTestParameters, -) -from monitoring.monitorlib.rid_automated_testing.injection_api import TestFlight from monitoring.uss_qualifier.common_data_definitions import Severity from monitoring.uss_qualifier.resources.astm.f3411.dss import DSSInstancesResource from monitoring.uss_qualifier.resources.netrid import ( @@ -23,7 +13,10 @@ NetRIDObserversResource, EvaluationConfigurationResource, ) -from monitoring.uss_qualifier.scenarios.astm.netrid import display_data_evaluator +from monitoring.uss_qualifier.scenarios.astm.netrid import ( + display_data_evaluator, + injection, +) from monitoring.uss_qualifier.scenarios.astm.netrid.injected_flight_collection import ( InjectedFlightCollection, ) @@ -34,10 +27,7 @@ from monitoring.uss_qualifier.scenarios.astm.netrid.virtual_observer import ( VirtualObserver, ) -from monitoring.uss_qualifier.scenarios.scenario import ( - GenericTestScenario, - TestScenario, -) +from monitoring.uss_qualifier.scenarios.scenario import GenericTestScenario class NominalBehavior(GenericTestScenario): @@ -62,8 +52,6 @@ def __init__( self._service_providers = service_providers self._observers = observers self._evaluation_configuration = evaluation_configuration - self._injected_flights = [] - self._injected_tests = [] self._dss_pool = dss_pool @property @@ -86,18 +74,19 @@ def run(self): self.end_test_scenario() def _inject_flights(self): - (self._injected_flights, self._injected_tests) = inject_flights( - test_scenario=self, - flights_data=self._flights_data, - service_providers=self._service_providers, - evaluation_configuration=self._evaluation_configuration, - realtime_period=self._rid_version.realtime_period, + (self._injected_flights, self._injected_tests) = injection.inject_flights( + self, self._flights_data, self._service_providers ) def _poll_during_flights(self): config = self._evaluation_configuration.configuration - - # Evaluate observed RID system states + virtual_observer = VirtualObserver( + injected_flights=InjectedFlightCollection(self._injected_flights), + repeat_query_rect_period=config.repeat_query_rect_period, + min_query_diagonal_m=config.min_query_diagonal, + relevant_past_data_period=self._rid_version.realtime_period + + config.max_propagation_latency.timedelta, + ) evaluator = display_data_evaluator.RIDObservationEvaluator( self, self._injected_flights, @@ -106,46 +95,19 @@ def _poll_during_flights(self): self._dss_pool.dss_instances[0] if self._dss_pool else None, ) - t_end = self._virtual_observer.get_last_time_of_interest() - t_now = arrow.utcnow() - if t_now > t_end: - raise RuntimeError( - f"Cannot evaluate RID system: injected test flights ended at {t_end}, which is before now ({t_now})" - ) - - logger.debug(f"Polling from {t_now} until {t_end}") - for f in self._injected_flights: - span = f.flight.get_span() - logger.debug( - f"Flight {f.uss_participant_id}/{f.flight.injection_id} {span[0].isoformat()} to {span[1].isoformat()}", - ) + def poll_fct(rect: LatLngRect) -> bool: + evaluator.evaluate_system_instantaneously(self._observers.observers, rect) + return False - t_next = arrow.utcnow() - dt = config.min_polling_interval.timedelta - while arrow.utcnow() < t_end: - # Evaluate the system at an instant in time for various areas - diagonals_m = [ + virtual_observer.start_polling( + config.min_polling_interval.timedelta, + [ self._rid_version.max_diagonal_km * 1000 + 500, # too large self._rid_version.max_diagonal_km * 1000 - 100, # clustered self._rid_version.max_details_diagonal_km * 1000 - 100, # details - ] - for diagonal_m in diagonals_m: - rect = self._virtual_observer.get_query_rect(diagonal_m) - evaluator.evaluate_system_instantaneously( - self._observers.observers, rect - ) - - # Wait until minimum polling interval elapses - while t_next < arrow.utcnow(): - t_next += dt - if t_next > t_end: - break - delay = t_next - arrow.utcnow() - if delay.total_seconds() > 0: - logger.debug( - f"Waiting {delay.total_seconds()} seconds before polling RID system again..." - ) - time.sleep(delay.total_seconds()) + ], + poll_fct, + ) def cleanup(self): self.begin_cleanup() @@ -181,101 +143,3 @@ def cleanup(self): details=f"While trying to delete a test flight from {sp.participant_id}, encountered error:\n{stacktrace}", ) self.end_cleanup() - - -def inject_flights( - test_scenario: TestScenario, - flights_data: FlightDataResource, - service_providers: NetRIDServiceProviders, - evaluation_configuration: EvaluationConfigurationResource, - realtime_period: timedelta, -) -> Tuple[List[InjectedFlight], List[InjectedTest]]: - test_id = str(uuid.uuid4()) - test_flights = flights_data.get_test_flights() - service_providers = service_providers.service_providers - - injected_flights: List[InjectedFlight] = [] - injected_tests: List[InjectedTest] = [] - - if len(service_providers) > len(test_flights): - raise ValueError( - "{} service providers were specified, but data for only {} test flights were provided".format( - len(service_providers), len(test_flights) - ) - ) - for i, target in enumerate(service_providers): - p = CreateTestParameters(requested_flights=[test_flights[i]]) - check = test_scenario.check("Successful injection", [target.participant_id]) - try: - query = target.submit_test(p, test_id) - except RequestException as e: - stacktrace = "".join( - traceback.format_exception(type(e), value=e, tb=e.__traceback__) - ) - check.record_failed( - summary="Error while trying to inject test flight", - severity=Severity.High, - details=f"While trying to inject a test flight into {target.participant_id}, encountered error:\n{stacktrace}", - ) - raise RuntimeError("High-severity issue did not abort test scenario") - test_scenario.record_query(query) - try: - if query.status_code != 200: - raise ValueError( - f"Expected response code 200 but received {query.status_code} instead" - ) - if "json" not in query.response: - raise ValueError("Response did not contain a JSON body") - changed_test: ChangeTestResponse = ImplicitDict.parse( - query.response.json, ChangeTestResponse - ) - injected_tests.append( - InjectedTest( - participant_id=target.participant_id, - test_id=test_id, - version=changed_test.version, - ) - ) - injections = changed_test.injected_flights - check.record_passed() - except ValueError as e: - check.record_failed( - summary="Error injecting test flight", - severity=Severity.High, - details=f"Attempting to inject a test flight into {target.participant_id}, encountered status code {query.status_code}: {str(e)}", - query_timestamps=[query.request.timestamp], - ) - raise RuntimeError("High-severity issue did not abort test scenario") - - for flight in injections: - injected_flights.append( - InjectedFlight( - uss_participant_id=target.participant_id, - test_id=test_id, - flight=TestFlight(flight), - query_timestamp=query.request.timestamp, - ) - ) - - # Make sure the injected flights can be identified correctly by the test harness - with test_scenario.check("Identifiable flights") as check: - errors = display_data_evaluator.injected_flights_errors(injected_flights) - if errors: - check.record_failed( - "Injected flights not suitable for test", - Severity.High, - details="When checking the suitability of the flights (as injected) for the test, found:\n" - + "\n".join(errors), - query_timestamps=[f.query_timestamp for f in injected_flights], - ) - raise RuntimeError("High-severity issue did not abort test scenario") - - config = evaluation_configuration.configuration - test_scenario._virtual_observer = VirtualObserver( - injected_flights=InjectedFlightCollection(injected_flights), - repeat_query_rect_period=config.repeat_query_rect_period, - min_query_diagonal_m=config.min_query_diagonal, - relevant_past_data_period=realtime_period - + config.max_propagation_latency.timedelta, - ) - return injected_flights, injected_tests diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py index 89aa0536b5..0db59a97f5 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/display_data_evaluator.py @@ -6,11 +6,12 @@ import math import s2sphere from s2sphere import LatLng, LatLngRect + from monitoring.uss_qualifier.scenarios.astm.netrid.common_dictionary_evaluator import ( RIDCommonDictionaryEvaluator, ) -from monitoring.monitorlib.fetch import Query, QueryType +from monitoring.monitorlib.fetch import Query from monitoring.monitorlib.fetch.rid import ( all_flights, FetchedFlights, @@ -18,7 +19,6 @@ Position, ) from monitoring.uss_qualifier.resources.astm.f3411.dss import DSSInstance -from uas_standards.interuss.automated_testing.rid.v1.injection import RIDAircraftState from uas_standards.interuss.automated_testing.rid.v1.observation import ( Flight, GetDisplayDataResponse, @@ -36,15 +36,9 @@ from monitoring.uss_qualifier.scenarios.astm.netrid.virtual_observer import ( VirtualObserver, ) -from monitoring.uss_qualifier.scenarios.scenario import ( - TestScenarioType, - TestScenario, -) +from monitoring.uss_qualifier.scenarios.scenario import TestScenario from monitoring.uss_qualifier.scenarios.astm.netrid.injection import InjectedFlight -DISTANCE_TOLERANCE_M = 0.01 -COORD_TOLERANCE_DEG = 360 / geo.EARTH_CIRCUMFERENCE_M * DISTANCE_TOLERANCE_M - def _rect_str(rect) -> str: return "({}, {})-({}, {})".format( @@ -55,41 +49,6 @@ def _rect_str(rect) -> str: ) -def _telemetry_match(t1: RIDAircraftState, t2: RIDAircraftState) -> bool: - """Determine whether two telemetry points may be mistaken for each other.""" - return ( - abs(t1.position.lat - t2.position.lat) < COORD_TOLERANCE_DEG - and abs(t1.position.lng == t2.position.lng) < COORD_TOLERANCE_DEG - ) - - -def injected_flights_errors(injected_flights: List[InjectedFlight]) -> List[str]: - """Determine whether each telemetry in each injected flight can be easily distinguished from each other. - - Args: - injected_flights: Full set of flights injected into Service Providers. - - Returns: List of error messages, or an empty list if no errors. - """ - errors: List[str] = [] - for f1, injected_flight in enumerate(injected_flights): - for t1, injected_telemetry in enumerate(injected_flight.flight.telemetry): - for t2, other_telmetry in enumerate( - injected_flight.flight.telemetry[t1 + 1 :] - ): - if _telemetry_match(injected_telemetry, other_telmetry): - errors.append( - f"{injected_flight.uss_participant_id}'s flight with injection ID {injected_flight.flight.injection_id} in test {injected_flight.test_id} has telemetry at indices {t1} and {t1 + 1 + t2} which can be mistaken for each other" - ) - for f2, other_flight in enumerate(injected_flights[f1 + 1 :]): - for t2, other_telemetry in enumerate(other_flight.flight.telemetry): - if _telemetry_match(injected_telemetry, other_telemetry): - errors.append( - f"{injected_flight.uss_participant_id}'s flight with injection ID {injected_flight.flight.injection_id} in test {injected_flight.test_id} has telemetry at index {t1} that can be mistaken for telemetry index {t2} in {other_flight.uss_participant_id}'s flight with injection ID {other_flight.flight.injection_id} in test {other_flight.test_id}" - ) - return errors - - @dataclass class DPObservedFlight(object): query: FetchedUSSFlights @@ -128,7 +87,7 @@ def map_observations_to_injected_flights( injected_flights: Flights injected into RID Service Providers under test. observed_flights: Flight observed from an RID Display Provider under test. - Returns: Mapping betweenInjectedFlight and observed Flight, indexed by injection_id. + Returns: Mapping between InjectedFlight and observed Flight, indexed by injection_id. """ mapping: Dict[str, TelemetryMapping] = {} for injected_flight in injected_flights: @@ -154,7 +113,7 @@ def map_observations_to_injected_flights( for t1, injected_telemetry in enumerate(injected_flight.flight.telemetry): dlat = abs(p.lat - injected_telemetry.position.lat) dlng = abs(p.lng - injected_telemetry.position.lng) - if dlat < COORD_TOLERANCE_DEG and dlng < COORD_TOLERANCE_DEG: + if dlat < geo.COORD_TOLERANCE_DEG and dlng < geo.COORD_TOLERANCE_DEG: new_distance = math.sqrt(math.pow(dlat, 2) + math.pow(dlng, 2)) if new_distance < smallest_distance: best_match = TelemetryMapping( @@ -181,6 +140,37 @@ def map_observations_to_injected_flights( return mapping +def map_fetched_to_injected_flights( + injected_flights: List[InjectedFlight], + fetched_flights: List[FetchedUSSFlights], +) -> Dict[str, TelemetryMapping]: + """Identify which of the fetched flights (if any) matches to each of the injected flights. + + See `map_observations_to_injected_flights`. + If it is not already set, sets the observation flight's server ID to the one of the matching injected flight. + + :param injected_flights: Flights injected into RID Service Providers under test. + :param fetched_flights: Flight observed from an RID Display Provider under test. + :return: Mapping between InjectedFlight and observed Flight, indexed by injection_id. + """ + observed_flights = [] + for uss_query in fetched_flights: + for f in range(len(uss_query.flights)): + observed_flights.append(DPObservedFlight(query=uss_query, flight=f)) + + tel_mapping = map_observations_to_injected_flights( + injected_flights, observed_flights + ) + + for mapping in tel_mapping.values(): + if mapping.observed_flight.query.participant_id is None: + mapping.observed_flight.query.participant_id = ( + mapping.injected_flight.uss_participant_id + ) + + return tel_mapping + + class RIDObservationEvaluator(object): """Evaluates observations of an RID system over time. @@ -239,15 +229,18 @@ def evaluate_system_instantaneously( get_details=True, rid_version=self._rid_version, session=self._dss.client, - dss_server_id=self._dss.participant_id, + dss_participant_id=self._dss.participant_id, + ) + + # map observed flights to injected flight and attribute participant ID + mapping_by_injection_id = map_fetched_to_injected_flights( + self._injected_flights, list(sp_observation.uss_flight_queries.values()) ) for q in sp_observation.queries: self._test_scenario.record_query(q) # Evaluate observations - # (Note this also takes care of setting the server_id to the relevant - # participant_id on queries where possible) - self._evaluate_sp_observation(sp_observation, rect) + self._evaluate_sp_observation(rect, sp_observation, mapping_by_injection_id) step_report = self._test_scenario.end_test_step() perform_observation = step_report.successful() @@ -347,8 +340,6 @@ def _evaluate_normal_observation( mapping_by_injection_id = map_observations_to_injected_flights( self._injected_flights, observation.flights ) - # TODO check if we need to set some server ids on observation - # queries here (if we have unattributed queries this might be a source) self._evaluate_flight_presence( observer.participant_id, @@ -384,7 +375,7 @@ def _evaluate_normal_observation( ) as check: if ( abs(observed_position.alt - injected_position.alt) - > DISTANCE_TOLERANCE_M + > geo.DISTANCE_TOLERANCE_M ): check.record_failed( "Observed altitude does not match injected altitude", @@ -785,7 +776,7 @@ def _evaluate_obfuscated_clusters_observation( * geo.EARTH_CIRCUMFERENCE_M / 360 ) - if distance <= DISTANCE_TOLERANCE_M: + if distance <= geo.DISTANCE_TOLERANCE_M: # Flight was not obfuscated check.record_failed( summary="Error while evaluating obfuscation of individual flights. Flight was not obfuscated: it is at the center of the cluster.", @@ -818,8 +809,9 @@ def _evaluate_obfuscated_clusters_observation( def _evaluate_sp_observation( self, + requested_area: s2sphere.LatLngRect, sp_observation: FetchedFlights, - rect: s2sphere.LatLngRect, + mappings: Dict[str, TelemetryMapping], ) -> None: # Note: This step currently uses the DSS endpoint to perform a one-time query for ISAs, but this # endpoint is not strictly required. The PUT Subscription endpoint, followed immediately by the @@ -840,31 +832,18 @@ def _evaluate_sp_observation( ) return - observed_flights = [] - for uss_query in sp_observation.uss_flight_queries.values(): - for f in range(len(uss_query.flights)): - observed_flights.append(DPObservedFlight(query=uss_query, flight=f)) - mapping_by_injection_id = map_observations_to_injected_flights( - self._injected_flights, observed_flights - ) - - for telemetry_mapping in mapping_by_injection_id.values(): - # For flights that were mapped to an injection ID, - # update the observation queries with the participant id for future use in the aggregate checks - telemetry_mapping.observed_flight.query.set_server_id( - telemetry_mapping.injected_flight.uss_participant_id - ) - diagonal_km = ( - rect.lo().get_distance(rect.hi()).degrees * geo.EARTH_CIRCUMFERENCE_KM / 360 + requested_area.lo().get_distance(requested_area.hi()).degrees + * geo.EARTH_CIRCUMFERENCE_KM + / 360 ) if diagonal_km > self._rid_version.max_diagonal_km: self._evaluate_area_too_large_sp_observation( - mapping_by_injection_id, rect, diagonal_km + mappings, requested_area, diagonal_km ) else: self._evaluate_normal_sp_observation( - rect, sp_observation, mapping_by_injection_id + requested_area, sp_observation, mappings ) def _evaluate_normal_sp_observation( @@ -933,7 +912,7 @@ def _evaluate_normal_sp_observation( ) as check: if ( abs(observed_position.alt - injected_position.alt) - > DISTANCE_TOLERANCE_M + > geo.DISTANCE_TOLERANCE_M ): check.record_failed( "Altitude reported by Service Provider does not match injected altitude", diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py b/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py index 618613b581..799f5098bb 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/injection.py @@ -1,8 +1,21 @@ +import uuid from datetime import datetime +from typing import List, Tuple from implicitdict import ImplicitDict +from uas_standards.interuss.automated_testing.rid.v1.injection import ChangeTestResponse -from monitoring.monitorlib.rid_automated_testing.injection_api import TestFlight +from monitoring.monitorlib import geo +from monitoring.monitorlib.rid_automated_testing.injection_api import ( + TestFlight, + CreateTestParameters, +) +from monitoring.uss_qualifier.common_data_definitions import Severity +from monitoring.uss_qualifier.resources.netrid import ( + FlightDataResource, + NetRIDServiceProviders, +) +from monitoring.uss_qualifier.scenarios.scenario import TestScenario class InjectedFlight(ImplicitDict): @@ -16,3 +29,110 @@ class InjectedTest(ImplicitDict): participant_id: str test_id: str version: str + + +def inject_flights( + test_scenario: TestScenario, + flights_data_res: FlightDataResource, + service_providers_res: NetRIDServiceProviders, +) -> Tuple[List[InjectedFlight], List[InjectedTest]]: + test_id = str(uuid.uuid4()) + test_flights = flights_data_res.get_test_flights() + service_providers = service_providers_res.service_providers + + injected_flights: List[InjectedFlight] = [] + injected_tests: List[InjectedTest] = [] + + if len(service_providers) > len(test_flights): + raise ValueError( + f"{len(service_providers)} service providers were specified, but data for only {len(test_flights)} test flights were provided" + ) + for i, target in enumerate(service_providers): + p = CreateTestParameters(requested_flights=[test_flights[i]]) + with test_scenario.check( + "Successful injection", [target.participant_id] + ) as check: + query = target.submit_test(p, test_id) + test_scenario.record_query(query) + + if query.status_code != 200: + check.record_failed( + summary="Error while trying to inject test flight", + severity=Severity.High, + details=f"Expected response code 200 from {target.participant_id} but received {query.status_code} while trying to inject a test flight", + query_timestamps=[query.request.timestamp], + ) + + if "json" not in query.response or query.response.json is None: + check.record_failed( + summary="Response to test flight injection request did not contain a JSON body", + severity=Severity.High, + details=f"Expected a JSON body in response to flight injection request", + query_timestamps=[query.request.timestamp], + ) + + changed_test: ChangeTestResponse = ImplicitDict.parse( + query.response.json, ChangeTestResponse + ) + injected_tests.append( + InjectedTest( + participant_id=target.participant_id, + test_id=test_id, + version=changed_test.version, + ) + ) + + for flight in changed_test.injected_flights: + injected_flights.append( + InjectedFlight( + uss_participant_id=target.participant_id, + test_id=test_id, + flight=TestFlight(flight), + query_timestamp=query.request.timestamp, + ) + ) + + # Make sure the injected flights can be identified correctly by the test harness + with test_scenario.check("Identifiable flights") as check: + errors = injected_flights_errors(injected_flights) + if errors: + check.record_failed( + summary="Injected flights not suitable for test", + severity=Severity.High, + details="When checking the suitability of the flights (as injected) for the test, found:\n" + + "\n".join(errors), + query_timestamps=[f.query_timestamp for f in injected_flights], + ) + + return injected_flights, injected_tests + + +def injected_flights_errors(injected_flights: List[InjectedFlight]) -> List[str]: + """Determine whether each telemetry in each injected flight can be easily distinguished from each other. + + Args: + injected_flights: Full set of flights injected into Service Providers. + + Returns: List of error messages, or an empty list if no errors. + """ + errors: List[str] = [] + for f1, injected_flight in enumerate(injected_flights): + for t1, injected_telemetry in enumerate(injected_flight.flight.telemetry): + for t2, other_telemetry in enumerate( + injected_flight.flight.telemetry[t1 + 1 :] + ): + if geo.LatLngPoint.from_f3411(injected_telemetry).match( + geo.LatLngPoint.from_f3411(other_telemetry) + ): + errors.append( + f"{injected_flight.uss_participant_id}'s flight with injection ID {injected_flight.flight.injection_id} in test {injected_flight.test_id} has telemetry at indices {t1} and {t1 + 1 + t2} which can be mistaken for each other" + ) + for f2, other_flight in enumerate(injected_flights[f1 + 1 :]): + for t2, other_telemetry in enumerate(other_flight.flight.telemetry): + if geo.LatLngPoint.from_f3411(injected_telemetry).match( + geo.LatLngPoint.from_f3411(other_telemetry) + ): + errors.append( + f"{injected_flight.uss_participant_id}'s flight with injection ID {injected_flight.flight.injection_id} in test {injected_flight.test_id} has telemetry at index {t1} that can be mistaken for telemetry index {t2} in {other_flight.uss_participant_id}'s flight with injection ID {other_flight.flight.injection_id} in test {other_flight.test_id}" + ) + return errors diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/virtual_observer.py b/monitoring/uss_qualifier/scenarios/astm/netrid/virtual_observer.py index fad4ca7759..4c7033cf21 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/virtual_observer.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/virtual_observer.py @@ -1,5 +1,7 @@ +import time from datetime import timedelta, datetime -from typing import Optional +from typing import Optional, Callable, List +from loguru import logger import arrow from s2sphere import LatLngRect @@ -69,3 +71,50 @@ def get_last_time_of_interest(self) -> datetime: self._injected_flights.get_end_of_injected_data() + self._relevant_past_data_period ) + + def start_polling( + self, + interval: timedelta, + diagonals_m: List[float], + poll_fct: Callable[[LatLngRect], bool], + ) -> None: + """ + Start polling of the RID system. + + :param interval: polling interval. + :param diagonals_m: list of the query rectangle diagonals (in meters). + :param poll_fct: polling function to invoke. If it returns True, the polling will be immediately interrupted before the end. + """ + t_end = self.get_last_time_of_interest() + t_now = arrow.utcnow() + if t_now > t_end: + raise ValueError( + f"Cannot poll RID system: instructed to poll until {t_end}, which is before now ({t_now})" + ) + + logger.info(f"Polling from {t_now} until {t_end} every {interval}") + t_next = arrow.utcnow() + while arrow.utcnow() < t_end: + interrupt_polling = False + for diagonal_m in diagonals_m: + rect = self.get_query_rect(diagonal_m) + interrupt_polling = poll_fct(rect) + if interrupt_polling: + break + + if interrupt_polling: + logger.info(f"Polling ended early at {arrow.utcnow()}.") + break + + # Wait until minimum polling interval elapses + while t_next < arrow.utcnow(): + t_next += interval + if t_next > t_end: + logger.info(f"Polling ended normally at {t_end}.") + break + delay = t_next - arrow.utcnow() + if delay.total_seconds() > 0: + logger.debug( + f"Waiting {delay.total_seconds()} seconds before polling RID system again..." + ) + time.sleep(delay.total_seconds())