Skip to content

Commit

Permalink
[uss_qualifier/scenarios/netrid] Move recent position timings checks …
Browse files Browse the repository at this point in the history
…to display_data_evaluator from common_data_dictionary
  • Loading branch information
mickmis committed Dec 10, 2024
1 parent b280061 commit 6f9fbf9
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import datetime
import math
from typing import List, Optional

import s2sphere
from arrow import ParserError
from implicitdict import StringBasedDateTime
from uas_standards.ansi_cta_2063_a import SerialNumber
Expand All @@ -25,22 +23,15 @@
)

from monitoring.monitorlib.fetch.rid import (
FetchedFlights,
FlightDetails,
)
from monitoring.monitorlib.fetch.rid import Flight, Position
from monitoring.monitorlib.geo import validate_lat, validate_lng, Altitude, LatLngPoint
from monitoring.monitorlib.rid import RIDVersion
from monitoring.uss_qualifier.common_data_definitions import Severity
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration
from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType, PendingCheck

# SP responses to /flights endpoint's p99 should be below this:
SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60
_POSITION_TIMESTAMP_MAX_AGE_SEC = (
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC
)
from monitoring.uss_qualifier.scenarios.scenario import TestScenarioType


class RIDCommonDictionaryEvaluator(object):
Expand All @@ -54,34 +45,17 @@ def __init__(
self._test_scenario = test_scenario
self._rid_version = rid_version

def evaluate_sp_flights(
def evaluate_sp_flight(
self,
requested_area: s2sphere.LatLngRect,
observed_flights: FetchedFlights,
participants: List[str],
observed_flight: Flight,
participant_id: ParticipantID,
):
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight.md`."""

for url, uss_flights in observed_flights.uss_flight_queries.items():
# For the timing checks, we want to look at the flights relative to the query
# they came from, as they may be provided from different SP's.
for f in uss_flights.flights:
self.evaluate_sp_flight_recent_positions_times(
f,
uss_flights.query.response.reported.datetime,
participants,
)

self.evaluate_sp_flight_recent_positions_crossing_area_boundary(
requested_area, f, participants
)

for f in observed_flights.flights:
# Evaluate on all flights regardless of where they came from
self._evaluate_operational_status(
f.operational_status,
participants,
)
self._evaluate_operational_status(
observed_flight.operational_status,
[participant_id],
)

def evaluate_dp_flight(
self,
Expand Down Expand Up @@ -121,95 +95,6 @@ def evaluate_dp_flight(
participants,
)

def _evaluate_recent_position_time(
self, p: Position, query_time: datetime.datetime, check: PendingCheck
):
"""Check that the position's timestamp is at most 60 seconds before the request time."""
if (query_time - p.time).total_seconds() > _POSITION_TIMESTAMP_MAX_AGE_SEC:
check.record_failed(
"A Position timestamp was older than the tolerance.",
details=f"Position timestamp: {p.time}, query time: {query_time}",
severity=Severity.Medium,
)

def evaluate_sp_flight_recent_positions_times(
self, f: Flight, query_time: datetime.datetime, participants: List[str]
):
with self._test_scenario.check(
"Recent positions timestamps", participants
) as check:
for p in f.recent_positions:
self._evaluate_recent_position_time(p, query_time, check)

def _chronological_positions(self, f: Flight) -> List[s2sphere.LatLng]:
"""
Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last.
"""
return [
s2sphere.LatLng.from_degrees(p.lat, p.lng)
for p in sorted(f.recent_positions, key=lambda p: p.time)
]

def _sliding_triples(
self, points: List[s2sphere.LatLng]
) -> List[List[s2sphere.LatLng]]:
"""
Returns a list of triples of consecutive positions in passed the list.
"""
return [
(points[i], points[i + 1], points[i + 2]) for i in range(len(points) - 2)
]

def evaluate_sp_flight_recent_positions_crossing_area_boundary(
self, requested_area: s2sphere.LatLngRect, f: Flight, participants: List[str]
):
with self._test_scenario.check(
"Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing",
participants,
) as check:

def fail_check():
check.record_failed(
"A position outside the area was neither preceded nor followed by a position inside the area.",
details=f"Positions: {f.recent_positions}, requested_area: {requested_area}",
severity=Severity.Medium,
)

positions = self._chronological_positions(f)
if len(positions) < 2:
# Check does not apply in this case
return

if len(positions) == 2:
# Only one of the positions can be outside the area. If both are, we fail.
if not requested_area.contains(
positions[0]
) and not requested_area.contains(positions[1]):
fail_check()
return

# For each sliding triple we check that if the middle position is outside the area, then either
# the first or the last position is inside the area. This means checking for any point that is inside the
# area in the triple and failing otherwise
for triple in self._sliding_triples(self._chronological_positions(f)):
if not (
requested_area.contains(triple[0])
or requested_area.contains(triple[1])
or requested_area.contains(triple[2])
):
fail_check()

# Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside.
# (These won't be caught by the iteration on the triples above)
if (
not requested_area.contains(positions[0])
and not requested_area.contains(positions[1])
) or (
not requested_area.contains(positions[-1])
and not requested_area.contains(positions[-2])
):
fail_check()

def evaluate_sp_details(self, details: FlightDetails, participants: List[str]):
"""Implements fragment documented in `common_dictionary_evaluator_sp_flight_details.md`."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import datetime

import math
from dataclasses import dataclass
from typing import List, Optional, Dict, Union, Set, Tuple
from typing import List, Optional, Dict, Union, Set, Tuple, cast

import arrow
import s2sphere
from loguru import logger
from s2sphere import LatLng, LatLngRect
from uas_standards.astm.f3411.v22a.api import RIDHeightReference

from uas_standards.interuss.automated_testing.rid.v1.observation import (
Flight,
Expand All @@ -24,6 +25,7 @@
)
from monitoring.monitorlib.rid import RIDVersion
from monitoring.uss_qualifier.common_data_definitions import Severity
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.astm.f3411.dss import DSSInstance
from monitoring.uss_qualifier.resources.netrid.evaluation import EvaluationConfiguration
from monitoring.uss_qualifier.resources.netrid.observers import RIDSystemObserver
Expand Down Expand Up @@ -54,6 +56,13 @@ def _rect_str(rect) -> str:
TIMESTAMP_ACCURACY_PRECISION = 0.05
HEIGHT_PRECISION_M = 1

# SP responses to /flights endpoint's p99 should be below this:
SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC = 3
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC = 60
_POSITION_TIMESTAMP_MAX_AGE_SEC = (
NET_MAX_NEAR_REAL_TIME_DATA_PERIOD_SEC + SP_FLIGHTS_RESPONSE_TIME_TOLERANCE_SEC
)


@dataclass
class DPObservedFlight(object):
Expand Down Expand Up @@ -879,25 +888,28 @@ def _evaluate_normal_sp_observation(
set(),
)

# Verify that flights queries returned correctly-formatted data
for mapping in mappings.values():
participant_id = mapping.injected_flight.uss_participant_id
observed_flight = mapping.observed_flight.flight
flights_queries = [
q
for flight_url, q in sp_observation.uss_flight_queries.items()
if mapping.observed_flight.id in {f.id for f in q.flights}
]
if len(flights_queries) != 1:
raise RuntimeError(
f"Found {len(flights_queries)} flights queries (instead of the expected 1) for flight {mapping.observed_flight.id} corresponding to injection ID {mapping.injected_flight.flight.injection_id} for {mapping.injected_flight.uss_participant_id}"
f"Found {len(flights_queries)} flights queries (instead of the expected 1) for flight {mapping.observed_flight.id} corresponding to injection ID {mapping.injected_flight.flight.injection_id} for {participant_id}"
)
flights_query = flights_queries[0]

# Verify that flights queries returned correctly-formatted data
errors = schema_validation.validate(
self._rid_version.openapi_path,
self._rid_version.openapi_flights_response_path,
flights_query.query.response.json,
)
with self._test_scenario.check(
"Flights data format", [mapping.injected_flight.uss_participant_id]
"Flights data format", participant_id
) as check:
if errors:
check.record_failed(
Expand All @@ -910,10 +922,21 @@ def _evaluate_normal_sp_observation(
),
query_timestamps=[flights_query.query.request.timestamp],
)
self._common_dictionary_evaluator.evaluate_sp_flights(
requested_area,
sp_observation,
participants=[mapping.injected_flight.uss_participant_id],

# Check recent positions timings
self._evaluate_sp_flight_recent_positions_times(
observed_flight,
flights_query.query.response.reported.datetime,
participant_id,
)
self._evaluate_sp_flight_recent_positions_crossing_area_boundary(
requested_area, observed_flight, participant_id
)

# Check flight consistency with common data dictionary
self._common_dictionary_evaluator.evaluate_sp_flight(
observed_flight,
participant_id,
)

# Check that required fields are present and match for any observed flights matching injected flights
Expand Down Expand Up @@ -1199,3 +1222,87 @@ def _evaluate_area_too_large_sp_observation(
mapping.observed_flight.query.query.request.timestamp
],
)

def _evaluate_sp_flight_recent_positions_times(
self, f: Flight, query_time: datetime.datetime, participant: ParticipantID
):
with self._test_scenario.check(
"Recent positions timestamps", participant
) as check:
for p in f.recent_positions:
# check that the position's timestamp is at most 60 seconds before the request time
if (
query_time - p.time
).total_seconds() > _POSITION_TIMESTAMP_MAX_AGE_SEC:
check.record_failed(
"A Position timestamp was older than the tolerance.",
details=f"Position timestamp: {p.time}, query time: {query_time}",
severity=Severity.Medium,
)

def _evaluate_sp_flight_recent_positions_crossing_area_boundary(
self, requested_area: s2sphere.LatLngRect, f: Flight, participant: ParticipantID
):
with self._test_scenario.check(
"Recent positions for aircraft crossing the requested area boundary show only one position before or after crossing",
participant,
) as check:

def fail_check():
check.record_failed(
"A position outside the area was neither preceded nor followed by a position inside the area.",
details=f"Positions: {f.recent_positions}, requested_area: {requested_area}",
severity=Severity.Medium,
)

positions = _chronological_positions(f)
if len(positions) < 2:
# Check does not apply in this case
return

if len(positions) == 2:
# Only one of the positions can be outside the area. If both are, we fail.
if not requested_area.contains(
positions[0]
) and not requested_area.contains(positions[1]):
fail_check()
return

# For each sliding triple we check that if the middle position is outside the area, then either
# the first or the last position is inside the area. This means checking for any point that is inside the
# area in the triple and failing otherwise
for triple in _sliding_triples(_chronological_positions(f)):
if not (
requested_area.contains(triple[0])
or requested_area.contains(triple[1])
or requested_area.contains(triple[2])
):
fail_check()

# Finally we need to check for the forbidden corner cases of having the two first or two last positions being outside.
# (These won't be caught by the iteration on the triples above)
if (
not requested_area.contains(positions[0])
and not requested_area.contains(positions[1])
) or (
not requested_area.contains(positions[-1])
and not requested_area.contains(positions[-2])
):
fail_check()


def _chronological_positions(f: Flight) -> List[s2sphere.LatLng]:
"""
Returns the recent positions of the flight, ordered by time with the oldest first, and the most recent last.
"""
return [
s2sphere.LatLng.from_degrees(p.lat, p.lng)
for p in sorted(f.recent_positions, key=lambda p: p.time)
]


def _sliding_triples(points: List[s2sphere.LatLng]) -> List[List[s2sphere.LatLng]]:
"""
Returns a list of triples of consecutive positions in passed the list.
"""
return [[points[i], points[i + 1], points[i + 2]] for i in range(len(points) - 2)]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ASTM NetRID v19 Service Provider flight consistency with Common Data Dictionary test step fragment

This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flights`.
This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flight`.

## Service Provider altitude check

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ASTM NetRID v22a Service Provider flight consistency with Common Data Dictionary test step fragment

This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flights`.
This fragment is implemented in `common_dictionary_evaluator.py:RIDCommonDictionaryEvaluator.evaluate_sp_flight`.

## Service Provider altitude check

Expand Down

0 comments on commit 6f9fbf9

Please sign in to comment.