Skip to content

Commit

Permalink
[mock_uss] Mitigate #28 (#172)
Browse files Browse the repository at this point in the history
Mitigate #28
  • Loading branch information
BenjaminPelletier authored Aug 18, 2023
1 parent aeb59ec commit 2c3a704
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 59 deletions.
130 changes: 87 additions & 43 deletions monitoring/mock_uss/scdsc/routes_injection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
import traceback
from datetime import datetime, timedelta
import time
from typing import List, Tuple
from typing import List, Tuple, Dict, Optional
import uuid

import flask
from loguru import logger
import requests.exceptions

from monitoring.mock_uss.scdsc.routes_scdsc import op_intent_from_flightrecord
from uas_standards.astm.f3548.v21.constants import OiMaxPlanHorizonDays, OiMaxVertices

from monitoring.mock_uss.config import KEY_BASE_URL, KEY_BEHAVIOR_LOCALITY
Expand Down Expand Up @@ -34,14 +37,14 @@
from monitoring.mock_uss import webapp, require_config_value
from monitoring.mock_uss.auth import requires_scope
from monitoring.mock_uss.scdsc import database, utm_client
from monitoring.mock_uss.scdsc.database import db
from monitoring.mock_uss.scdsc.database import db, FlightRecord
from monitoring.monitorlib.uspace import problems_with_flight_authorisation


require_config_value(KEY_BASE_URL)
require_config_value(KEY_BEHAVIOR_LOCALITY)

DEADLOCK_TIMEOUT = timedelta(seconds=60)
DEADLOCK_TIMEOUT = timedelta(seconds=5)


def _make_stacktrace(e) -> str:
Expand All @@ -63,12 +66,21 @@ def query_operational_intents(
)
tx = db.value
get_details_for = []
own_flights = {f.op_intent_reference.id: f for f in tx.flights.values() if f}
result = []
for op_intent_ref in op_intent_refs:
if (
op_intent_ref.id not in tx.cached_operations
or tx.cached_operations[op_intent_ref.id].reference.version
!= op_intent_ref.version
if op_intent_ref.id in own_flights:
# This is our own flight
result.append(op_intent_from_flightrecord(own_flights[op_intent_ref.id]))
elif (
op_intent_ref.id in tx.cached_operations
and tx.cached_operations[op_intent_ref.id].reference.version
== op_intent_ref.version
):
# We have a current version of this op intent cached
result.append(tx.cached_operations[op_intent_ref.id])
else:
# We need to get the details for this op intent
get_details_for.append(op_intent_ref)

updated_op_intents = []
Expand All @@ -78,13 +90,13 @@ def query_operational_intents(
utm_client, op_intent_ref.uss_base_url, op_intent_ref.id
)
)
result.extend(updated_op_intents)

with db as tx:
for op_intent in updated_op_intents:
tx.cached_operations[op_intent.reference.id] = op_intent
return [
tx.cached_operations[op_intent_ref.id] for op_intent_ref in op_intent_refs
]

return result


@webapp.route("/scdsc/v1/status", methods=["GET"])
Expand Down Expand Up @@ -127,7 +139,7 @@ def scd_capabilities() -> Tuple[dict, int]:
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scdsc_inject_flight(flight_id: str) -> Tuple[str, int]:
"""Implements flight injection in SCD automated testing injection API."""
logger.debug(f"[inject_flight:{flight_id}] Starting handler")
logger.debug(f"[inject_flight/{os.getpid()}:{flight_id}] Starting handler")
try:
json = flask.request.json
if json is None:
Expand All @@ -141,11 +153,14 @@ def scdsc_inject_flight(flight_id: str) -> Tuple[str, int]:


def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict, int]:
pid = os.getpid()
locality = webapp.config[KEY_BEHAVIOR_LOCALITY]

if locality.is_uspace_applicable():
# Validate flight authorisation
logger.debug(f"[inject_flight:{flight_id}] Validating flight authorisation")
logger.debug(
f"[inject_flight/{pid}:{flight_id}] Validating flight authorisation"
)
problems = problems_with_flight_authorisation(req_body.flight_authorisation)
if problems:
return (
Expand Down Expand Up @@ -212,12 +227,14 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
existing_flight = tx.flights[flight_id]
if existing_flight and not existing_flight.locked:
logger.debug(
f"[inject_flight:{flight_id}] Existing flight locked for update"
f"[inject_flight/{pid}:{flight_id}] Existing flight locked for update"
)
existing_flight.locked = True
break
else:
logger.debug(f"[inject_flight:{flight_id}] Request is for a new flight")
logger.debug(
f"[inject_flight/{pid}:{flight_id}] Request is for a new flight (lock established)"
)
tx.flights[flight_id] = None
existing_flight = None
break
Expand Down Expand Up @@ -249,9 +266,9 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
step_name = "performing unknown operation"
try:
# Check for operational intents in the DSS
step_name = "querying DSS for operational intents"
step_name = "querying for operational intents"
logger.debug(
f"[inject_flight:{flight_id}] Checking for operational intents in the DSS"
f"[inject_flight/{pid}:{flight_id}] Obtaining latest operational intent information"
)
start_time = scd.start_of(req_body.operational_intent.volumes)
end_time = scd.end_of(req_body.operational_intent.volumes)
Expand All @@ -271,7 +288,7 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
# Check for intersections
step_name = "checking for intersections"
logger.debug(
f"[inject_flight:{flight_id}] Checking for intersections with {', '.join(op_intent.reference.id for op_intent in op_intents)}"
f"[inject_flight/{pid}:{flight_id}] Checking for intersections with {', '.join(op_intent.reference.id for op_intent in op_intents)}"
)
v1 = req_body.operational_intent.volumes
for op_intent in op_intents:
Expand All @@ -280,12 +297,12 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
and existing_flight.op_intent_reference.id == op_intent.reference.id
):
logger.debug(
f"[inject_flight:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with a past version of this flight"
f"[inject_flight/{pid}:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with a past version of this flight"
)
continue
if req_body.operational_intent.priority > op_intent.details.priority:
logger.debug(
f"[inject_flight:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with lower-priority operational intents"
f"[inject_flight/{pid}:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with lower-priority operational intents"
)
continue
if (
Expand All @@ -295,7 +312,7 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
)
):
logger.debug(
f"[inject_flight:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with same-priority operational intents (if allowed)"
f"[inject_flight/{pid}:{flight_id}] intersection with {op_intent.reference.id} not considered: intersection with same-priority operational intents (if allowed)"
)
continue

Expand All @@ -312,7 +329,7 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
)
):
logger.debug(
f"[inject_flight:{flight_id}] intersection with {op_intent.reference.id} not considered: modification of Activated operational intent with a pre-existing conflict"
f"[inject_flight/{pid}:{flight_id}] intersection with {op_intent.reference.id} not considered: modification of Activated operational intent with a pre-existing conflict"
)
continue

Expand All @@ -327,7 +344,9 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,

# Create operational intent in DSS
step_name = "sharing operational intent in DSS"
logger.debug(f"[inject_flight:{flight_id}] Sharing operational intent with DSS")
logger.debug(
f"[inject_flight/{pid}:{flight_id}] Sharing operational intent with DSS"
)
base_url = "{}/mock/scd".format(webapp.config[KEY_BASE_URL])
req = scd.PutOperationalIntentReferenceParameters(
extents=req_body.operational_intent.volumes
Expand All @@ -354,22 +373,30 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
# Notify subscribers
subscriber_list = ", ".join(s.uss_base_url for s in result.subscribers)
step_name = f"notifying subscribers {{{subscriber_list}}}"
logger.debug(
f"[inject_flight:{flight_id}] Notifying subscribers {subscriber_list}"
)
scd_client.notify_subscribers(
utm_client,
result.operational_intent_reference.id,
scd.OperationalIntent(
reference=result.operational_intent_reference,
details=req_body.operational_intent,
),
result.subscribers,
operational_intent = scd.OperationalIntent(
reference=result.operational_intent_reference,
details=req_body.operational_intent,
)
for subscriber in result.subscribers:
if subscriber.uss_base_url == base_url:
# Do not notify ourselves
continue
update = scd.PutOperationalIntentDetailsParameters(
operational_intent_id=result.operational_intent_reference.id,
operational_intent=operational_intent,
subscriptions=subscriber.subscriptions,
)
logger.debug(
f"[inject_flight/{pid}:{flight_id}] Notifying subscriber at {subscriber.uss_base_url}"
)
step_name = f"notifying subscriber {{{subscriber.uss_base_url}}}"
scd_client.notify_operational_intent_details_changed(
utm_client, subscriber.uss_base_url, update
)

# Store flight in database
step_name = "storing flight in database"
logger.debug(f"[inject_flight:{flight_id}] Storing flight in database")
logger.debug(f"[inject_flight/{pid}:{flight_id}] Storing flight in database")
record = database.FlightRecord(
op_intent_reference=result.operational_intent_reference,
op_intent_injection=req_body.operational_intent,
Expand All @@ -379,7 +406,7 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
tx.flights[flight_id] = record

step_name = "returning final successful result"
logger.debug(f"[inject_flight:{flight_id}] Complete.")
logger.debug(f"[inject_flight/{pid}:{flight_id}] Complete.")

if (
result.operational_intent_reference.state
Expand Down Expand Up @@ -416,13 +443,13 @@ def inject_flight(flight_id: str, req_body: InjectFlightRequest) -> Tuple[dict,
if tx.flights[flight_id]:
# FlightRecord was a true existing flight
logger.debug(
f"[inject_flight] Releasing placeholder for flight_id {flight_id}"
f"[inject_flight/{pid}] Releasing placeholder for flight_id {flight_id}"
)
tx.flights[flight_id].locked = False
else:
# FlightRecord was just a placeholder for a new flight
logger.debug(
f"[inject_flight] Releasing lock on existing flight_id {flight_id}"
f"[inject_flight/{pid}] Releasing lock on existing flight_id {flight_id}"
)
del tx.flights[flight_id]

Expand All @@ -436,6 +463,8 @@ def scdsc_delete_flight(flight_id: str) -> Tuple[str, int]:


def delete_flight(flight_id) -> Tuple[dict, int]:
pid = os.getpid()
logger.debug(f"[delete_flight/{pid}:{flight_id}] Acquiring flight")
deadline = datetime.utcnow() + DEADLOCK_TIMEOUT
while True:
with db as tx:
Expand All @@ -453,6 +482,7 @@ def delete_flight(flight_id) -> Tuple[dict, int]:
time.sleep(0.5)

if datetime.utcnow() > deadline:
logger.debug(f"[delete_flight/{pid}:{flight_id}] Deadlock")
raise RuntimeError(
f"Deadlock in delete_flight while attempting to gain access to flight {flight_id}"
)
Expand All @@ -470,39 +500,53 @@ def delete_flight(flight_id) -> Tuple[dict, int]:
step_name = "performing unknown operation"
try:
step_name = f"deleting operational intent {flight.op_intent_reference.id} with OVN {flight.op_intent_reference.ovn} from DSS"
logger.debug(f"[delete_flight/{pid}:{flight_id}] {step_name}")
result = scd_client.delete_operational_intent_reference(
utm_client,
flight.op_intent_reference.id,
flight.op_intent_reference.ovn,
)

step_name = "notifying subscribers"
scd_client.notify_subscribers(
utm_client,
result.operational_intent_reference.id,
None,
result.subscribers,
)
base_url = "{}/mock/scd".format(webapp.config[KEY_BASE_URL])
for subscriber in result.subscribers:
if subscriber.uss_base_url == base_url:
# Do not notify ourselves
continue
update = scd.PutOperationalIntentDetailsParameters(
operational_intent_id=result.operational_intent_reference.id,
subscriptions=subscriber.subscriptions,
)
logger.debug(
f"[delete_flight/{pid}:{flight_id}] Notifying {subscriber.uss_base_url}"
)
scd_client.notify_operational_intent_details_changed(
utm_client, subscriber.uss_base_url, update
)
except (ValueError, ConnectionError) as e:
notes = (
f"{e.__class__.__name__} while {step_name} for flight {flight_id}: {str(e)}"
)
logger.debug(f"[delete_flight/{pid}:{flight_id}] {notes}")
return (
DeleteFlightResponse(result=DeleteFlightResult.Failed, notes=notes),
200,
)
except requests.exceptions.ConnectionError as e:
notes = f"Connection error to {e.request.method} {e.request.url} while {step_name} for flight {flight_id}: {str(e)}"
logger.debug(f"[delete_flight/{pid}:{flight_id}] {notes}")
response = DeleteFlightResponse(result=DeleteFlightResult.Failed, notes=notes)
response["stacktrace"] = _make_stacktrace(e)
return response, 200
except QueryError as e:
notes = f"Unexpected response from remote server while {step_name} for flight {flight_id}: {str(e)}"
logger.debug(f"[delete_flight/{pid}:{flight_id}] {notes}")
response = DeleteFlightResponse(result=DeleteFlightResult.Failed, notes=notes)
response["queries"] = e.queries
response["stacktrace"] = e.stacktrace
return response, 200

logger.debug(f"[delete_flight/{pid}:{flight_id}] Complete.")
return DeleteFlightResponse(result=DeleteFlightResult.Closed), 200


Expand Down
22 changes: 13 additions & 9 deletions monitoring/mock_uss/scdsc/routes_scdsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from monitoring.monitorlib import scd
from monitoring.mock_uss import webapp
from monitoring.mock_uss.auth import requires_scope
from monitoring.mock_uss.scdsc.database import db
from monitoring.mock_uss.scdsc.database import db, FlightRecord


@webapp.route("/mock/scd/uss/v1/operational_intents/<entityid>", methods=["GET"])
Expand Down Expand Up @@ -34,18 +34,22 @@ def scdsc_get_operational_intent_details(entityid: str):

# Return nominal response with details
response = scd.GetOperationalIntentDetailsResponse(
operational_intent=scd.OperationalIntent(
reference=flight.op_intent_reference,
details=scd.OperationalIntentDetails(
volumes=flight.op_intent_injection.volumes,
off_nominal_volumes=flight.op_intent_injection.off_nominal_volumes,
priority=flight.op_intent_injection.priority,
),
)
operational_intent=op_intent_from_flightrecord(flight),
)
return flask.jsonify(response), 200


def op_intent_from_flightrecord(flight: FlightRecord) -> scd.OperationalIntent:
return scd.OperationalIntent(
reference=flight.op_intent_reference,
details=scd.OperationalIntentDetails(
volumes=flight.op_intent_injection.volumes,
off_nominal_volumes=flight.op_intent_injection.off_nominal_volumes,
priority=flight.op_intent_injection.priority,
),
)


@webapp.route("/mock/scd/uss/v1/operational_intents", methods=["POST"])
@requires_scope([scd.SCOPE_SC])
def scdsc_notify_operational_intent_details_changed():
Expand Down
Loading

0 comments on commit 2c3a704

Please sign in to comment.