Skip to content

Commit

Permalink
[uss_qualifier] Implement MSL altitude scenario (#450)
Browse files Browse the repository at this point in the history
Implement MSL altitude scenario
  • Loading branch information
BenjaminPelletier authored Jan 3, 2024
1 parent 5559408 commit 13f62fb
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 10 deletions.
14 changes: 11 additions & 3 deletions monitoring/mock_uss/riddp/routes_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
)
from monitoring.mock_uss import webapp
from monitoring.mock_uss.auth import requires_scope
from uas_standards.interuss.automated_testing.rid.v1.observation import (
AltitudeReference,
MSLAltitude,
)
from . import clustering, database, utm_client
from .behavior import DisplayProviderBehavior
from .config import KEY_RID_VERSION
Expand Down Expand Up @@ -62,8 +66,8 @@ def _make_flight_observation(
paths.append(current_path)

p = flight.most_recent_position
msl_alt = p.alt - egm96_geoid_offset(s2sphere.LatLng.from_degrees(p.lat, p.lng))
# TODO: Return msl_alt in observation information
msl_alt_m = p.alt - egm96_geoid_offset(s2sphere.LatLng.from_degrees(p.lat, p.lng))
msl_alt = MSLAltitude(meters=msl_alt_m, reference_datum=AltitudeReference.EGM96)
current_state = observation_api.CurrentState(
timestamp=p.time.isoformat(),
operational_status=flight.operational_status,
Expand All @@ -76,7 +80,11 @@ def _make_flight_observation(
return observation_api.Flight(
id=flight.id,
most_recent_position=observation_api.Position(
lat=p.lat, lng=p.lng, alt=p.alt, height=h
lat=p.lat,
lng=p.lng,
alt=p.alt,
height=h,
msl_alt=msl_alt,
),
recent_paths=[observation_api.Path(positions=path) for path in paths],
current_state=current_state,
Expand Down
8 changes: 8 additions & 0 deletions monitoring/monitorlib/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ class QueryType(str, Enum):
"interuss.automated_testing.flight_planning.v1.DeleteFlightPlan"
)

# InterUSS RID observation interface
InterUSSRIDObservationV1GetDisplayData = (
"interuss.automated_testing.rid.v1.observation.getDisplayData"
)
InterUSSRIDObservationV1GetDetails = (
"interuss.automated_testing.rid.v1.observation.getDetails"
)

def __str__(self):
return self.value

Expand Down
2 changes: 2 additions & 0 deletions monitoring/uss_qualifier/resources/netrid/observers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def observe_system(
url,
scope=Scope.Observe,
participant_id=self.participant_id,
query_type=QueryType.InterUSSRIDObservationV1GetDisplayData,
)
try:
result = (
Expand All @@ -72,6 +73,7 @@ def observe_flight_details(
f"/display_data/{flight_id}",
scope=Scope.Observe,
participant_id=self.participant_id,
query_type=QueryType.InterUSSRIDObservationV1GetDetails,
)
# Record query metadata for later use in the aggregate checks
query.participant_id = self.participant_id
Expand Down
6 changes: 5 additions & 1 deletion monitoring/uss_qualifier/scenarios/uspace/netrid/msl.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ If an appropriate test report cannot be found, this scenario will be discontinue

#### ⚠️ Message contains MSL altitude check

If the response message for the remote identification observation made by the virtual/automated authorised user does not contain the UAS's MSL altitude, the USSP will have failed to comply with **[uspace.article8.MSLAltitude](../../../requirements/uspace/article8.md)**.
If a response message for the remote identification observation made by the virtual/automated authorised user does not contain the UAS's MSL altitude, the USSP will have failed to comply with **[uspace.article8.MSLAltitude](../../../requirements/uspace/article8.md)**.

#### ⚠️ MSL altitude is reported using an acceptable datum check

While the WGS84 ellipsoid is one approximation of sea level, it is not included as a recommended definition of mean sea level in the Guidance Material. If the USSP reports MSL altitude relative to a datum not recommended in the Guidance Material, the USSP will have failed to comply with **[uspace.article8.MSLAltitude](../../../requirements/uspace/article8.md)** according to the Guidance Material.

#### ⚠️ MSL altitude is correct check

Expand Down
118 changes: 114 additions & 4 deletions monitoring/uss_qualifier/scenarios/uspace/netrid/msl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
from typing import List
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Set

from implicitdict import ImplicitDict
import s2sphere

from uas_standards.interuss.automated_testing.rid.v1.observation import (
AltitudeReference,
GetDisplayDataResponse,
)

from monitoring.monitorlib.fetch import Query, QueryType
from monitoring.monitorlib.geo import egm96_geoid_offset
from monitoring.uss_qualifier.configurations.configuration import ParticipantID
from monitoring.uss_qualifier.resources.netrid import NetRIDObserversResource
from monitoring.uss_qualifier.scenarios.astm.netrid.common.nominal_behavior import (
NominalBehavior,
)
from monitoring.uss_qualifier.scenarios.scenario import TestScenario


MAXIMUM_MSL_ERROR_M = 0.5 # meters maximum difference between expected MSL altitude and reported MSL altitude
ACCEPTABLE_DATUMS = {AltitudeReference.EGM96, AltitudeReference.EGM2008}


class MSLAltitude(TestScenario):
_ussps: List[ParticipantID]

Expand All @@ -18,14 +37,105 @@ def run(self, context):
self.begin_test_case("UAS observations evaluation")

self.begin_test_step("Find nominal behavior report")
# TODO: Find test report for NetRID nominal behavior scenario
reports = context.find_test_scenario_reports(NominalBehavior)
self.end_test_step()

if not reports:
self.record_note(
"Skip reason",
f"Nominal behavior test scenario report could not be found for any of the scenario types {', '.join(SCENARIO_TYPES)}",
)
self.end_test_scenario()
return

self.begin_test_step("Evaluate UAS observations")
# TODO: Examine observation queries in test report to see if MSL was present
# TODO: When MSL is present, verify that its value matches injected altitude above ellipsoid
for report in reports:
self._evaluate_msl_altitude(report.queries())
self.end_test_step()

self.end_test_case()

self.end_test_scenario()

def _evaluate_msl_altitude(self, queries: List[Query]):
for query in queries:
if (
"query_type" not in query
or query.query_type != QueryType.InterUSSRIDObservationV1GetDisplayData
):
continue
if "json" not in query.response or not query.response.json:
# Invalid observation; this should already have been recorded as a failure
continue
try:
resp: GetDisplayDataResponse = ImplicitDict.parse(
query.response.json, GetDisplayDataResponse
)
except ValueError as e:
# Invalid observation; this should already have been recorded as a failure
continue
if "flights" not in resp or not resp.flights:
continue

self.record_query(query)
participant_id = query.participant_id if "participant_id" in query else None
q = query.request.timestamp
for flight in resp.flights:
with self.check(
"Message contains MSL altitude", participant_id
) as check:
if (
"msl_alt" not in flight.most_recent_position
or flight.most_recent_position.msl_alt is None
):
check.record_failed(
summary="MSL altitude missing from observation",
details=f"Flight {flight.id} was missing `msl_alt` field in the noted RID observation",
query_timestamps=[q],
)
continue

with self.check(
"MSL altitude is reported using an acceptable datum", participant_id
) as check:
if (
"reference_datum" not in flight.most_recent_position.msl_alt
or flight.most_recent_position.msl_alt.reference_datum
not in ACCEPTABLE_DATUMS
):
check.record_failed(
summary=f"MSL altitude reported relative to {flight.most_recent_position.msl_alt.reference_datum.value} rather than {' or '.join(ACCEPTABLE_DATUMS)}",
details=f"The only acceptable MSL altitude datums for U-space are {' or '.join(ACCEPTABLE_DATUMS)}, and {flight.most_recent_position.msl_alt.reference_datum.value} reported for flight {flight.id} is not one of them",
query_timestamps=[q],
)
continue

if (
"alt" in flight.most_recent_position
and flight.most_recent_position is not None
):
with self.check("MSL altitude is correct", participant_id) as check:
geoid_offset = egm96_geoid_offset(
s2sphere.LatLng.from_degrees(
flight.most_recent_position.lat,
flight.most_recent_position.lng,
)
)
expected_msl_alt = (
flight.most_recent_position.alt - geoid_offset
)
if (
abs(
expected_msl_alt
- flight.most_recent_position.msl_alt.meters
)
> MAXIMUM_MSL_ERROR_M
):
check.record_failed(
summary=f"Reported MSL altitude {flight.most_recent_position.msl_alt.meters:.1f}m does not match expected MSL altitude {expected_msl_alt:.1f}m",
details=f"Altitude for flight {flight.id} at {flight.most_recent_position.lat}, {flight.most_recent_position.lng} was reported as {flight.most_recent_position.alt} meters above the WGS84 ellipsoid, and the EGM96 geoid is {geoid_offset} meters above the WGS84 ellipsoid at this point, but the MSL altitude was reported as {flight.most_recent_position.msl_alt.meters} meters above {flight.most_recent_position.msl_alt.reference_datum} rather than the expected {expected_msl_alt} meters",
query_timestamps=[q],
)
else:
pass
# TODO: check MSL altitude against injection if WGS84 altitude is not specified in observation
27 changes: 27 additions & 0 deletions monitoring/uss_qualifier/suites/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import re
from typing import Dict, List, Optional, Union, Iterator
from typing import Type

import arrow

Expand Down Expand Up @@ -52,6 +53,7 @@
ScenarioCannotContinueError,
TestRunCannotContinueError,
)
from monitoring.uss_qualifier.scenarios.scenario import get_scenario_type_by_name
from monitoring.uss_qualifier.suites.definitions import (
TestSuiteActionDeclaration,
TestSuiteDefinition,
Expand Down Expand Up @@ -407,6 +409,31 @@ def sibling_queries(self) -> Iterator[Query]:
for q in child.report.queries():
yield q

def find_test_scenario_reports(
self, scenario_type: Type[TestScenario]
) -> List[TestScenarioReport]:
"""Find reports for all currently-completed instances of the specified test scenario type."""
return self._find_test_scenario_reports(scenario_type, self.top_frame)

def _find_test_scenario_reports(
self, scenario_type: Type[TestScenario], frame: ActionStackFrame
) -> List[TestScenarioReport]:
results = []
if (
frame.report is not None
and "test_scenario" in frame.report
and frame.report.test_scenario is not None
):
report_scenario_type = get_scenario_type_by_name(
frame.report.test_scenario.scenario_type
)
if issubclass(report_scenario_type, scenario_type):
results.append(frame.report.test_scenario)
for child in frame.children:
new_results = self._find_test_scenario_reports(scenario_type, child)
results.extend(new_results)
return results

@property
def stop_fast(self) -> bool:
if (
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ scipy==1.10.1
shapely==1.7.1
structlog==21.5.0 # deployment_manager
termcolor==1.1.0
uas_standards==3.0.0
uas_standards==3.1.0
4 changes: 3 additions & 1 deletion schemas/monitoring/monitorlib/fetch/Query.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
"interuss.automated_testing.flight_planning.v1.GetStatus",
"interuss.automated_testing.flight_planning.v1.ClearArea",
"interuss.automated_testing.flight_planning.v1.UpsertFlightPlan",
"interuss.automated_testing.flight_planning.v1.DeleteFlightPlan"
"interuss.automated_testing.flight_planning.v1.DeleteFlightPlan",
"interuss.automated_testing.rid.v1.observation.getDisplayData",
"interuss.automated_testing.rid.v1.observation.getDetails"
],
"type": [
"string",
Expand Down

0 comments on commit 13f62fb

Please sign in to comment.