Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier/scenarios/utm] Add requirement SCD0100 check; Fix some validation issues #444

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion monitoring/mock_uss/f3548v21/routes_scd.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from typing import Optional

import flask

from monitoring.mock_uss.f3548v21.flight_planning import op_intent_from_flightrecord
from monitoring.monitorlib import scd
from monitoring.mock_uss import webapp
from monitoring.mock_uss.auth import requires_scope
from monitoring.mock_uss.flights.database import db
from monitoring.mock_uss.flights.database import db, FlightRecord
from uas_standards.astm.f3548.v21.api import (
ErrorResponse,
GetOperationalIntentDetailsResponse,
GetOperationalIntentTelemetryResponse,
OperationalIntentState,
)


Expand Down Expand Up @@ -44,6 +48,58 @@ def scdsc_get_operational_intent_details(entityid: str):
return flask.jsonify(response), 200


@webapp.route(
"/mock/scd/uss/v1/operational_intents/<entityid>/telemetry", methods=["GET"]
)
@requires_scope(scd.SCOPE_CM_SA)
def scdsc_get_operational_intent_telemetry(entityid: str):
"""Implements getOperationalIntentTelemetry in ASTM SCD API."""

# Look up entityid in database
tx = db.value
flight: Optional[FlightRecord] = None
for f in tx.flights.values():
if f and f.op_intent.reference.id == entityid:
flight = f
break

# If requested operational intent doesn't exist, return 404
if flight is None:
return (
flask.jsonify(
ErrorResponse(
message="Operational intent {} not known by this USS".format(
entityid
)
)
),
404,
)

elif flight.op_intent.reference.state not in {
OperationalIntentState.Contingent,
OperationalIntentState.Nonconforming,
}:
return (
flask.jsonify(
ErrorResponse(
message=f"Operational intent {entityid} is not in a state that provides telemetry ({flight.op_intent.reference.state})"
)
),
409,
)

# TODO: implement support for telemetry
return (
flask.jsonify(
ErrorResponse(
message=f"Operational intent {entityid} has no telemetry data available."
)
),
412,
)


@webapp.route("/mock/scd/uss/v1/operational_intents", methods=["POST"])
@requires_scope(scd.SCOPE_SC)
def scdsc_notify_operational_intent_details_changed():
Expand Down
27 changes: 26 additions & 1 deletion monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from monitoring.monitorlib import infrastructure, fetch
from monitoring.monitorlib.fetch import QueryType
from monitoring.monitorlib.scd import SCOPE_SC, SCOPE_AA
from monitoring.monitorlib.scd import SCOPE_SC, SCOPE_AA, SCOPE_CM_SA
from monitoring.uss_qualifier.resources.resource import Resource
from monitoring.uss_qualifier.resources.communications import AuthAdapterResource
from uas_standards.astm.f3548.v21.api import (
Expand All @@ -30,6 +30,8 @@
GetOperationalIntentReferenceResponse,
OPERATIONS,
OperationID,
GetOperationalIntentTelemetryResponse,
VehicleTelemetry,
)

# A base URL for a USS that is not expected to be ever called
Expand Down Expand Up @@ -164,6 +166,29 @@ def get_full_op_intent_without_validation(

return result, query

def get_op_intent_telemetry(
self,
op_intent_ref: OperationalIntentReference,
uss_participant_id: Optional[str] = None,
) -> Tuple[Optional[VehicleTelemetry], fetch.Query]:
op = OPERATIONS[OperationID.GetOperationalIntentTelemetry]
query = fetch.query_and_describe(
self.client,
op.verb,
f"{op_intent_ref.uss_base_url}{op.path.format(entityid=op_intent_ref.id)}",
QueryType.F3548v21USSGetOperationalIntentTelemetry,
uss_participant_id,
scope=SCOPE_CM_SA,
)
if query.status_code == 200:
result: GetOperationalIntentTelemetryResponse = ImplicitDict.parse(
query.response.json, GetOperationalIntentTelemetryResponse
)
telemetry = result.telemetry if "telemetry" in result else None
return telemetry, query
else:
return None, query

def put_op_intent(
self,
extents: List[Volume4D],
Expand Down
Loading
Loading