From ef76e72f860f18ac5d823ff9ea9b8f1fe702bc02 Mon Sep 17 00:00:00 2001 From: Benjamin Pelletier Date: Fri, 3 Nov 2023 12:54:20 -0700 Subject: [PATCH] [mock_uss] Clean up mock_uss flight planning (#311) * Clean up mock_uss flight planning * Factor common functionality into business object --- .../mock_uss/f3548v21/flight_planning.py | 247 ++++++++++++++++-- .../scd_injection/routes_injection.py | 211 +++------------ .../clients/flight_planning/flight_info.py | 19 ++ .../flight_planning/flight_info_template.py | 33 +-- .../conflict_equal_priority_not_permitted.py | 2 +- 5 files changed, 285 insertions(+), 227 deletions(-) diff --git a/monitoring/mock_uss/f3548v21/flight_planning.py b/monitoring/mock_uss/f3548v21/flight_planning.py index c477ce1067..7faddc4880 100644 --- a/monitoring/mock_uss/f3548v21/flight_planning.py +++ b/monitoring/mock_uss/f3548v21/flight_planning.py @@ -1,18 +1,26 @@ +import uuid from datetime import datetime from typing import Optional, List, Callable import arrow +from monitoring.mock_uss import webapp +from monitoring.mock_uss.config import KEY_BASE_URL +from monitoring.monitorlib.clients.flight_planning.flight_info import ( + FlightInfo, + AirspaceUsageState, + UasState, +) from monitoring.uss_qualifier.resources.overrides import apply_overrides from uas_standards.astm.f3548.v21 import api as f3548_v21 -from uas_standards.astm.f3548.v21.api import OperationalIntentDetails, OperationalIntent from uas_standards.astm.f3548.v21.constants import OiMaxVertices, OiMaxPlanHorizonDays from uas_standards.interuss.automated_testing.scd.v1 import api as scd_api -from monitoring.mock_uss.flights.database import FlightRecord +from monitoring.mock_uss.f3548v21 import utm_client +from monitoring.mock_uss.flights.database import FlightRecord, db +from monitoring.monitorlib.clients import scd as scd_client from monitoring.monitorlib.geotemporal import Volume4DCollection from monitoring.monitorlib.locality import Locality -from uas_standards.interuss.automated_testing.scd.v1.api import OperationalIntentState class PlanningError(Exception): @@ -24,7 +32,6 @@ def validate_request(req_body: scd_api.InjectFlightRequest) -> None: Args: req_body: Information about the requested flight. - locality: Jurisdictional requirements which the mock_uss should follow. """ # Validate max number of vertices nb_vertices = 0 @@ -67,7 +74,7 @@ def validate_request(req_body: scd_api.InjectFlightRequest) -> None: # Validate intent is currently active if in Activated state # I.e. at least one volume has start time in the past and end time in the future - if req_body.operational_intent.state == OperationalIntentState.Activated: + if req_body.operational_intent.state == scd_api.OperationalIntentState.Activated: now = arrow.utcnow().datetime active_volume = Volume4DCollection.from_interuss_scd_api( req_body.operational_intent.volumes @@ -80,7 +87,7 @@ def validate_request(req_body: scd_api.InjectFlightRequest) -> None: def check_for_disallowed_conflicts( - req_body: scd_api.InjectFlightRequest, + new_op_intent: f3548_v21.OperationalIntent, existing_flight: Optional[FlightRecord], op_intents: List[f3548_v21.OperationalIntent], locality: Locality, @@ -89,7 +96,7 @@ def check_for_disallowed_conflicts( """Raise a PlannerError if there are any disallowed conflicts. Args: - req_body: Information about the requested flight. + new_op_intent: The prospective operational intent. existing_flight: The existing state of the flight (to be changed by the request), or None if this request is to create a new flight. op_intents: Full information for all potentially-relevant operational intents. @@ -99,14 +106,14 @@ def check_for_disallowed_conflicts( if log is None: log = lambda msg: None - if req_body.operational_intent.state not in ( - OperationalIntentState.Accepted, - OperationalIntentState.Activated, + if new_op_intent.reference.state not in ( + scd_api.OperationalIntentState.Accepted, + scd_api.OperationalIntentState.Activated, ): # No conflicts are disallowed if the flight is not nominal return - v1 = Volume4DCollection.from_interuss_scd_api(req_body.operational_intent.volumes) + v1 = Volume4DCollection.from_interuss_scd_api(new_op_intent.details.volumes) for op_intent in op_intents: if ( @@ -117,16 +124,14 @@ def check_for_disallowed_conflicts( f"intersection with {op_intent.reference.id} not considered: intersection with a past version of this flight" ) continue - if req_body.operational_intent.priority > op_intent.details.priority: + if new_op_intent.details.priority > op_intent.details.priority: log( f"intersection with {op_intent.reference.id} not considered: intersection with lower-priority operational intents" ) continue if ( - req_body.operational_intent.priority == op_intent.details.priority - and locality.allows_same_priority_intersections( - req_body.operational_intent.priority - ) + new_op_intent.details.priority == op_intent.details.priority + and locality.allows_same_priority_intersections(op_intent.details.priority) ): log( f"intersection with {op_intent.reference.id} not considered: intersection with same-priority operational intents (if allowed)" @@ -141,8 +146,7 @@ def check_for_disallowed_conflicts( existing_flight and existing_flight.op_intent.reference.state == scd_api.OperationalIntentState.Activated - and req_body.operational_intent.state - == scd_api.OperationalIntentState.Activated + and op_intent.reference.state == scd_api.OperationalIntentState.Activated ) if modifying_activated: preexisting_conflict = Volume4DCollection.from_interuss_scd_api( @@ -156,7 +160,7 @@ def check_for_disallowed_conflicts( if v1.intersects_vol4s(v2): raise PlanningError( - f"Requested flight (priority {req_body.operational_intent.priority}) intersected {op_intent.reference.manager}'s operational intent {op_intent.reference.id} (priority {op_intent.details.priority})" + f"Requested flight (priority {new_op_intent.details.priority}) intersected {op_intent.reference.manager}'s operational intent {op_intent.reference.id} (priority {op_intent.details.priority})" ) @@ -217,14 +221,55 @@ def op_intent_transition_valid( return False -def op_intent_from_flightrecord(flight: FlightRecord, method: str) -> OperationalIntent: +def op_intent_from_flightinfo( + flight_info: FlightInfo, flight_id: str +) -> f3548_v21.OperationalIntent: + volumes = [v.to_f3548v21() for v in flight_info.basic_information.area] + off_nominal_volumes = [] + + state = flight_info.basic_information.f3548v21_op_intent_state() + if state in ( + f3548_v21.OperationalIntentState.Nonconforming, + f3548_v21.OperationalIntentState.Contingent, + ): + off_nominal_volumes = volumes + volumes = [] + + v4c = Volume4DCollection(volumes=flight_info.basic_information.area) + + reference = f3548_v21.OperationalIntentReference( + id=f3548_v21.EntityID(flight_id), + manager="UNKNOWN", + uss_availability=f3548_v21.UssAvailabilityState.Unknown, + version=0, + state=state, + ovn="UNKNOWN", + time_start=v4c.time_start.to_f3548v21(), + time_end=v4c.time_end.to_f3548v21(), + uss_base_url="{}/mock/scd".format(webapp.config[KEY_BASE_URL]), + subscription_id="UNKNOWN", + ) + details = f3548_v21.OperationalIntentDetails( + volumes=volumes, + off_nominal_volumes=off_nominal_volumes, + priority=flight_info.astm_f3548_21.priority, + ) + return f3548_v21.OperationalIntent( + reference=reference, + details=details, + ) + + +def op_intent_from_flightrecord( + flight: FlightRecord, method: str +) -> f3548_v21.OperationalIntent: ref = flight.op_intent.reference - details = OperationalIntentDetails( + details = f3548_v21.OperationalIntentDetails( volumes=flight.op_intent.details.volumes, off_nominal_volumes=flight.op_intent.details.off_nominal_volumes, priority=flight.op_intent.details.priority, ) - op_intent = OperationalIntent(reference=ref, details=details) + op_intent = f3548_v21.OperationalIntent(reference=ref, details=details) if flight.mod_op_sharing_behavior: mod_op_sharing_behavior = flight.mod_op_sharing_behavior if mod_op_sharing_behavior.modify_sharing_methods is not None: @@ -235,3 +280,161 @@ def op_intent_from_flightrecord(flight: FlightRecord, method: str) -> Operationa ) return op_intent + + +def query_operational_intents( + area_of_interest: f3548_v21.Volume4D, +) -> List[f3548_v21.OperationalIntent]: + """Retrieve a complete set of operational intents in an area, including details. + + :param area_of_interest: Area where intersecting operational intents must be discovered + :return: Full definition for every operational intent discovered + """ + op_intent_refs = scd_client.query_operational_intent_references( + utm_client, area_of_interest + ) + tx = db.value + get_details_for = [] + own_flights = {f.op_intent.reference.id: f for f in tx.flights.values() if f} + result = [] + for op_intent_ref in op_intent_refs: + if op_intent_ref.id in own_flights: + # This is our own flight + result.append( + op_intent_from_flightrecord(own_flights[op_intent_ref.id], "GET") + ) + elif ( + op_intent_ref.id in tx.cached_operations + and tx.cached_operations[op_intent_ref.id].reference.version + == op_intent_ref.version + ): + # We have a current version of this op intent cached + result.append(tx.cached_operations[op_intent_ref.id]) + else: + # We need to get the details for this op intent + get_details_for.append(op_intent_ref) + + updated_op_intents = [] + for op_intent_ref in get_details_for: + op_intent, _ = scd_client.get_operational_intent_details( + utm_client, op_intent_ref.uss_base_url, op_intent_ref.id + ) + updated_op_intents.append(op_intent) + result.extend(updated_op_intents) + + with db as tx: + for op_intent in updated_op_intents: + tx.cached_operations[op_intent.reference.id] = op_intent + + return result + + +def check_op_intent( + new_flight: FlightRecord, + existing_flight: Optional[FlightRecord], + locality: Locality, + log: Callable[[str], None], +) -> List[f3548_v21.EntityOVN]: + # Check the transition is valid + state_transition_from = ( + f3548_v21.OperationalIntentState(existing_flight.op_intent.reference.state) + if existing_flight + else None + ) + state_transition_to = f3548_v21.OperationalIntentState( + new_flight.op_intent.reference.state + ) + if not op_intent_transition_valid(state_transition_from, state_transition_to): + raise PlanningError( + f"Operational intent state transition from {state_transition_from} to {state_transition_to} is invalid" + ) + + if new_flight.op_intent.reference.state in ( + f3548_v21.OperationalIntentState.Accepted, + f3548_v21.OperationalIntentState.Activated, + ): + # Check for intersections if the flight is nominal + + # Check for operational intents in the DSS + log("Obtaining latest operational intent information") + v1 = Volume4DCollection.from_interuss_scd_api( + new_flight.op_intent.details.volumes + + new_flight.op_intent.details.off_nominal_volumes + ) + vol4 = v1.bounding_volume.to_f3548v21() + op_intents = query_operational_intents(vol4) + + # Check for intersections + log( + f"Checking for intersections with {', '.join(op_intent.reference.id for op_intent in op_intents)}" + ) + check_for_disallowed_conflicts( + new_flight.op_intent, existing_flight, op_intents, locality, log + ) + + key = [f3548_v21.EntityOVN(op.reference.ovn) for op in op_intents] + else: + # Flight is not nominal and therefore doesn't need to check intersections + key = [] + + return key + + +def share_op_intent( + new_flight: FlightRecord, + existing_flight: Optional[FlightRecord], + key: List[f3548_v21.EntityOVN], + log: Callable[[str], None], +): + # Create operational intent in DSS + log("Sharing operational intent with DSS") + base_url = new_flight.op_intent.reference.uss_base_url + req = f3548_v21.PutOperationalIntentReferenceParameters( + extents=new_flight.op_intent.details.volumes + + new_flight.op_intent.details.off_nominal_volumes, + key=key, + state=new_flight.op_intent.reference.state, + uss_base_url=base_url, + new_subscription=f3548_v21.ImplicitSubscriptionParameters( + uss_base_url=base_url + ), + ) + if existing_flight: + id = existing_flight.op_intent.reference.id + log(f"Updating existing operational intent {id} in DSS") + result = scd_client.update_operational_intent_reference( + utm_client, + id, + existing_flight.op_intent.reference.ovn, + req, + ) + else: + id = str(uuid.uuid4()) + log(f"Creating new operational intent {id} in DSS") + result = scd_client.create_operational_intent_reference(utm_client, id, req) + + # Notify subscribers + true_op_intent = f3548_v21.OperationalIntent( + reference=result.operational_intent_reference, + details=new_flight.op_intent.details, + ) + record = FlightRecord( + op_intent=true_op_intent, + flight_info=new_flight.flight_info, + mod_op_sharing_behavior=new_flight.mod_op_sharing_behavior, + ) + operational_intent = op_intent_from_flightrecord(record, "POST") + for subscriber in result.subscribers: + if subscriber.uss_base_url == base_url: + # Do not notify ourselves + continue + update = f3548_v21.PutOperationalIntentDetailsParameters( + operational_intent_id=result.operational_intent_reference.id, + operational_intent=operational_intent, + subscriptions=subscriber.subscriptions, + ) + log(f"Notifying subscriber at {subscriber.uss_base_url}") + scd_client.notify_operational_intent_details_changed( + utm_client, subscriber.uss_base_url, update + ) + return record diff --git a/monitoring/mock_uss/scd_injection/routes_injection.py b/monitoring/mock_uss/scd_injection/routes_injection.py index db7e8d7afe..4b14e6bdf9 100644 --- a/monitoring/mock_uss/scd_injection/routes_injection.py +++ b/monitoring/mock_uss/scd_injection/routes_injection.py @@ -2,8 +2,7 @@ import traceback from datetime import datetime, timedelta import time -from typing import List, Tuple, Optional -import uuid +from typing import Tuple, Optional import flask from implicitdict import ImplicitDict, StringBasedDateTime @@ -12,14 +11,12 @@ from monitoring.mock_uss.flights.planning import lock_flight, release_flight_lock from monitoring.mock_uss.f3548v21 import utm_client -from monitoring.monitorlib.clients.flight_planning.flight_info import FlightInfo -from uas_standards.astm.f3548.v21 import api +from monitoring.monitorlib.clients.flight_planning.flight_info import ( + FlightInfo, + AirspaceUsageState, +) from uas_standards.astm.f3548.v21.api import ( - OperationalIntent, PutOperationalIntentDetailsParameters, - ImplicitSubscriptionParameters, - PutOperationalIntentReferenceParameters, - OperationalIntentDetails, ) from uas_standards.interuss.automated_testing.scd.v1.api import ( InjectFlightResponse, @@ -31,35 +28,32 @@ ClearAreaResponse, Capability, CapabilitiesResponse, - OperationalIntentState, ) from monitoring.mock_uss import webapp, require_config_value, uspace from monitoring.mock_uss.auth import requires_scope from monitoring.mock_uss.config import KEY_BASE_URL from monitoring.mock_uss.dynamic_configuration.configuration import get_locality -from monitoring.mock_uss.flights import database from monitoring.mock_uss.flights.database import db, FlightRecord from monitoring.mock_uss.f3548v21.flight_planning import ( validate_request, - check_for_disallowed_conflicts, PlanningError, - op_intent_from_flightrecord, - op_intent_transition_valid, + check_op_intent, + share_op_intent, + op_intent_from_flightinfo, ) import monitoring.mock_uss.uspace.flight_auth from monitoring.monitorlib import versioning from monitoring.monitorlib.clients import scd as scd_client from monitoring.monitorlib.fetch import QueryError from monitoring.monitorlib.geo import Polygon -from monitoring.monitorlib.geotemporal import Volume4D, Volume4DCollection +from monitoring.monitorlib.geotemporal import Volume4D from monitoring.monitorlib.idempotency import idempotent_request from monitoring.monitorlib.scd_automated_testing.scd_injection_api import ( SCOPE_SCD_QUALIFIER_INJECT, ) from monitoring.monitorlib.clients.mock_uss.mock_uss_scd_injection_api import ( MockUSSInjectFlightRequest, - MockUssFlightBehavior, ) require_config_value(KEY_BASE_URL) @@ -73,53 +67,6 @@ def _make_stacktrace(e) -> str: ) -def query_operational_intents( - area_of_interest: api.Volume4D, -) -> List[OperationalIntent]: - """Retrieve a complete set of operational intents in an area, including details. - - :param area_of_interest: Area where intersecting operational intents must be discovered - :return: Full definition for every operational intent discovered - """ - op_intent_refs = scd_client.query_operational_intent_references( - utm_client, area_of_interest - ) - tx = db.value - get_details_for = [] - own_flights = {f.op_intent.reference.id: f for f in tx.flights.values() if f} - result = [] - for op_intent_ref in op_intent_refs: - if op_intent_ref.id in own_flights: - # This is our own flight - result.append( - op_intent_from_flightrecord(own_flights[op_intent_ref.id], "GET") - ) - elif ( - op_intent_ref.id in tx.cached_operations - and tx.cached_operations[op_intent_ref.id].reference.version - == op_intent_ref.version - ): - # We have a current version of this op intent cached - result.append(tx.cached_operations[op_intent_ref.id]) - else: - # We need to get the details for this op intent - get_details_for.append(op_intent_ref) - - updated_op_intents = [] - for op_intent_ref in get_details_for: - op_intent, _ = scd_client.get_operational_intent_details( - utm_client, op_intent_ref.uss_base_url, op_intent_ref.id - ) - updated_op_intents.append(op_intent) - result.extend(updated_op_intents) - - with db as tx: - for op_intent in updated_op_intents: - tx.cached_operations[op_intent.reference.id] = op_intent - - return result - - @webapp.route("/scdsc/v1/status", methods=["GET"]) @requires_scope(SCOPE_SCD_QUALIFIER_INJECT) def scdsc_injection_status() -> Tuple[str, int]: @@ -182,15 +129,6 @@ def log(msg): return flask.jsonify(json), code -def _mock_uss_flight_behavior_in_req( - req_body: MockUSSInjectFlightRequest, -) -> Optional[MockUssFlightBehavior]: - if "behavior" in req_body: - return req_body.behavior - else: - return None - - def inject_flight( flight_id: str, req_body: MockUSSInjectFlightRequest, @@ -202,8 +140,16 @@ def inject_flight( def log(msg: str): logger.debug(f"[inject_flight/{pid}:{flight_id}] {msg}") + # Construct potential new flight + flight_info = FlightInfo.from_scd_inject_flight_request(req_body) + op_intent = op_intent_from_flightinfo(flight_info, flight_id) + new_flight = FlightRecord( + flight_info=flight_info, + op_intent=op_intent, + mod_op_sharing_behavior=req_body.behavior if "behavior" in req_body else None, + ) + # Validate request - log("Validating request") try: if locality.is_uspace_applicable(): uspace.flight_auth.validate_request(req_body) @@ -218,120 +164,20 @@ def log(msg: str): step_name = "performing unknown operation" try: - # Check the transition is valid - state_transition_from = ( - OperationalIntentState(existing_flight.op_intent.reference.state) - if existing_flight - else None - ) - state_transition_to = OperationalIntentState(req_body.operational_intent.state) - if not op_intent_transition_valid(state_transition_from, state_transition_to): + step_name = "checking F3548-21 operational intent" + try: + key = check_op_intent(new_flight, existing_flight, locality, log) + except PlanningError as e: return ( InjectFlightResponse( result=InjectFlightResponseResult.Rejected, - notes=f"Operational intent state transition from {state_transition_from} to {state_transition_to} is invalid", + notes=str(e), ), 200, ) - if req_body.operational_intent.state in ( - OperationalIntentState.Accepted, - OperationalIntentState.Activated, - ): - # Check for intersections if the flight is nominal - - # Check for operational intents in the DSS - step_name = "querying for operational intents" - log("Obtaining latest operational intent information") - v1 = Volume4DCollection.from_interuss_scd_api( - req_body.operational_intent.volumes - + req_body.operational_intent.off_nominal_volumes - ) - vol4 = v1.bounding_volume.to_f3548v21() - op_intents = query_operational_intents(vol4) - - # Check for intersections - step_name = "checking for intersections" - log( - f"Checking for intersections with {', '.join(op_intent.reference.id for op_intent in op_intents)}" - ) - try: - check_for_disallowed_conflicts( - req_body, existing_flight, op_intents, locality, log - ) - except PlanningError as e: - return ( - InjectFlightResponse( - result=InjectFlightResponseResult.ConflictWithFlight, - notes=str(e), - ), - 200, - ) - - key = [op.reference.ovn for op in op_intents] - else: - # Flight is not nominal and therefore doesn't need to check intersections - key = [] - - # Create operational intent in DSS step_name = "sharing operational intent in DSS" - log("Sharing operational intent with DSS") - base_url = "{}/mock/scd".format(webapp.config[KEY_BASE_URL]) - req = PutOperationalIntentReferenceParameters( - extents=req_body.operational_intent.volumes - + req_body.operational_intent.off_nominal_volumes, - key=key, - state=req_body.operational_intent.state, - uss_base_url=base_url, - new_subscription=ImplicitSubscriptionParameters(uss_base_url=base_url), - ) - if existing_flight: - id = existing_flight.op_intent.reference.id - step_name = f"updating existing operational intent {id} in DSS" - log(step_name) - result = scd_client.update_operational_intent_reference( - utm_client, - id, - existing_flight.op_intent.reference.ovn, - req, - ) - else: - id = str(uuid.uuid4()) - step_name = f"creating new operational intent {id} in DSS" - log(step_name) - result = scd_client.create_operational_intent_reference(utm_client, id, req) - - # Notify subscribers - subscriber_list = ", ".join(s.uss_base_url for s in result.subscribers) - step_name = f"notifying subscribers {{{subscriber_list}}}" - op_intent = OperationalIntent( - reference=result.operational_intent_reference, - details=OperationalIntentDetails( - volumes=req_body.operational_intent.volumes, - off_nominal_volumes=req_body.operational_intent.off_nominal_volumes, - priority=req_body.operational_intent.priority, - ), - ) - record = database.FlightRecord( - op_intent=op_intent, - flight_info=FlightInfo.from_scd_inject_flight_request(req_body), - mod_op_sharing_behavior=_mock_uss_flight_behavior_in_req(req_body), - ) - operational_intent = op_intent_from_flightrecord(record, "POST") - for subscriber in result.subscribers: - if subscriber.uss_base_url == base_url: - # Do not notify ourselves - continue - update = PutOperationalIntentDetailsParameters( - operational_intent_id=result.operational_intent_reference.id, - operational_intent=operational_intent, - subscriptions=subscriber.subscriptions, - ) - log(f"Notifying subscriber at {subscriber.uss_base_url}") - step_name = f"notifying subscriber {{{subscriber.uss_base_url}}}" - scd_client.notify_operational_intent_details_changed( - utm_client, subscriber.uss_base_url, update - ) + record = share_op_intent(new_flight, existing_flight, key, log) # Store flight in database step_name = "storing flight in database" @@ -343,14 +189,17 @@ def log(msg: str): log("Complete.") if ( - result.operational_intent_reference.state - == OperationalIntentState.Activated + new_flight.flight_info.basic_information.usage_state + == AirspaceUsageState.InUse ): injection_result = InjectFlightResponseResult.ReadyToFly else: injection_result = InjectFlightResponseResult.Planned return ( - InjectFlightResponse(result=injection_result, operational_intent_id=id), + InjectFlightResponse( + result=injection_result, + operational_intent_id=new_flight.op_intent.reference.id, + ), 200, ) except (ValueError, ConnectionError) as e: diff --git a/monitoring/monitorlib/clients/flight_planning/flight_info.py b/monitoring/monitorlib/clients/flight_planning/flight_info.py index 1df3136909..477d93ac84 100644 --- a/monitoring/monitorlib/clients/flight_planning/flight_info.py +++ b/monitoring/monitorlib/clients/flight_planning/flight_info.py @@ -4,6 +4,7 @@ from implicitdict import ImplicitDict from uas_standards.ansi_cta_2063_a import SerialNumber +from uas_standards.astm.f3548.v21 import api as f3548v21 from uas_standards.en4709_02 import OperatorRegistrationNumber from uas_standards.interuss.automated_testing.scd.v1 import api as scd_api from uas_standards.interuss.automated_testing.flight_planning.v1 import api as fp_api @@ -228,6 +229,24 @@ def to_flight_planning_api(self) -> fp_api.BasicFlightPlanInformation: area=[v.to_flight_planning_api() for v in self.area], ) + def f3548v21_op_intent_state(self) -> f3548v21.OperationalIntentState: + usage_state = self.usage_state + if usage_state == AirspaceUsageState.Planned: + state = f3548v21.OperationalIntentState.Accepted + elif usage_state == AirspaceUsageState.InUse: + uas_state = self.uas_state + if uas_state == UasState.Nominal: + state = f3548v21.OperationalIntentState.Activated + elif uas_state == UasState.OffNominal: + state = f3548v21.OperationalIntentState.Nonconforming + elif uas_state == UasState.Contingent: + state = f3548v21.OperationalIntentState.Contingent + else: + raise ValueError(f"Unknown uas_state '{uas_state}'") + else: + raise ValueError(f"Unknown usage_state '{usage_state}'") + return state + class FlightInfo(ImplicitDict): """Details of user's intent to create or modify a flight plan.""" diff --git a/monitoring/monitorlib/clients/flight_planning/flight_info_template.py b/monitoring/monitorlib/clients/flight_planning/flight_info_template.py index a46c7dec2d..cc112fe6b6 100644 --- a/monitoring/monitorlib/clients/flight_planning/flight_info_template.py +++ b/monitoring/monitorlib/clients/flight_planning/flight_info_template.py @@ -71,29 +71,16 @@ def scd_inject_request( f"Legacy SCD injection API requires uspace_flight_authorisation to be specified in FlightInfo" ) volumes = [v.to_interuss_scd_api() for v in info.basic_information.area] - if info.basic_information.usage_state == AirspaceUsageState.Planned: - state = scd_api.OperationalIntentState.Accepted - off_nominal_volumes = [] - elif info.basic_information.usage_state == AirspaceUsageState.InUse: - if info.basic_information.uas_state == UasState.Nominal: - state = scd_api.OperationalIntentState.Activated - off_nominal_volumes = [] - elif info.basic_information.uas_state == UasState.OffNominal: - state = scd_api.OperationalIntentState.Nonconforming - off_nominal_volumes = volumes - volumes = [] - elif info.basic_information.uas_state == UasState.Contingent: - state = scd_api.OperationalIntentState.Contingent - off_nominal_volumes = volumes - volumes = [] - else: - raise ValueError( - f"Unrecognized uas_state '{info.basic_information.uas_state}'" - ) - else: - raise ValueError( - f"Unrecognized usage_state '{info.basic_information.usage_state}'" - ) + off_nominal_volumes = [] + state = scd_api.OperationalIntentState( + info.basic_information.f3548v21_op_intent_state() + ) + if state in ( + scd_api.OperationalIntentState.Nonconforming, + scd_api.OperationalIntentState.Contingent, + ): + off_nominal_volumes = volumes + volumes = [] operational_intent = scd_api.OperationalIntentTestInjection( state=state, priority=scd_api.Priority(info.astm_f3548_21.priority), diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/nominal_planning/conflict_equal_priority_not_permitted/conflict_equal_priority_not_permitted.py b/monitoring/uss_qualifier/scenarios/astm/utm/nominal_planning/conflict_equal_priority_not_permitted/conflict_equal_priority_not_permitted.py index 06e9c7d22f..75f99a550a 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/nominal_planning/conflict_equal_priority_not_permitted/conflict_equal_priority_not_permitted.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/nominal_planning/conflict_equal_priority_not_permitted/conflict_equal_priority_not_permitted.py @@ -487,7 +487,7 @@ def _modify_activated_flight_preexisting_conflict( "Declare flight 2 non-conforming", "Successful transition to non-conforming state", { - InjectFlightResponseResult.Planned, + InjectFlightResponseResult.ReadyToFly, InjectFlightResponseResult.NotSupported, }, {InjectFlightResponseResult.Failed: "Failure"},