Skip to content

Commit

Permalink
[uss_qualifier] DSS0030 heavy_traffic_concurrent migration
Browse files Browse the repository at this point in the history
  • Loading branch information
Shastick committed Nov 27, 2023
1 parent 3295d62 commit a1a8210
Show file tree
Hide file tree
Showing 23 changed files with 1,286 additions and 79 deletions.
15 changes: 15 additions & 0 deletions monitoring/monitorlib/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from enum import Enum
from urllib.parse import urlparse

import aiohttp
import flask
from loguru import logger
import requests
Expand Down Expand Up @@ -145,6 +146,20 @@ def describe_response(resp: requests.Response) -> ResponseDescription:
return ResponseDescription(**kwargs)


def describe_aiohttp_response(
status: int, headers: Dict, resp_json: Dict, duration: datetime.timedelta
) -> ResponseDescription:
kwargs = {
"code": status,
"headers": headers,
"elapsed_s": duration.total_seconds(),
"reported": StringBasedDateTime(datetime.datetime.utcnow()),
"json": resp_json,
}

return ResponseDescription(**kwargs)


def describe_flask_response(resp: flask.Response, elapsed_s: float):
headers = {k: v for k, v in resp.headers.items()}
kwargs = {
Expand Down
38 changes: 31 additions & 7 deletions monitoring/monitorlib/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
from typing import Dict, List, Optional
import urllib.parse
from aiohttp import ClientSession
from aiohttp import ClientSession, ClientResponse

import jwt
import requests
Expand Down Expand Up @@ -189,19 +189,35 @@ def adjust_request_kwargs(self, url, method, kwargs):
kwargs["timeout"] = self.timeout_seconds
return kwargs

async def put(self, url, **kwargs):
async def put_with_headers(self, url, **kwargs):
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "PUT", kwargs)
async with self._client.put(url, **kwargs) as response:
return response.status, await response.json()
return (
response.status,
{k: v for k, v in response.headers.items()},
await response.json(),
)

async def get(self, url, **kwargs):
async def put(self, url, **kwargs):
(status, _, json) = await self.put_with_headers(url, **kwargs)
return status, json

async def get_with_headers(self, url, **kwargs):
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "GET", kwargs)
async with self._client.get(url, **kwargs) as response:
return response.status, await response.json()
return (
response.status,
{k: v for k, v in response.headers.items()},
await response.json(),
)

async def get(self, url, **kwargs):
(status, _, json) = await self.get_with_headers(url, **kwargs)
return status, json

async def post(self, url, **kwargs):
url = self._prefix_url + url
Expand All @@ -210,12 +226,20 @@ async def post(self, url, **kwargs):
async with self._client.post(url, **kwargs) as response:
return response.status, await response.json()

async def delete(self, url, **kwargs):
async def delete_with_headers(self, url, **kwargs):
url = self._prefix_url + url
if "auth" not in kwargs:
kwargs = self.adjust_request_kwargs(url, "DELETE", kwargs)
async with self._client.delete(url, **kwargs) as response:
return response.status, await response.json()
return (
response.status,
{k: v for k, v in response.headers.items()},
await response.json(),
)

async def delete(self, url, **kwargs):
(status, _, json) = await self.delete_with_headers(url, **kwargs)
return status, json


def default_scopes(scopes: List[str]):
Expand Down
100 changes: 72 additions & 28 deletions monitoring/monitorlib/mutate/rid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from implicitdict import ImplicitDict
import s2sphere
from uas_standards import Operation

from monitoring.monitorlib.fetch.rid import RIDQuery, Subscription, ISA
from monitoring.monitorlib.rid import RIDVersion
Expand Down Expand Up @@ -439,22 +440,19 @@ class ISAChange(ImplicitDict):
"""Mapping from USS base URL to change notification query"""


def put_isa(
def build_isa_payload(
area_vertices: List[s2sphere.LatLng],
alt_lo: float,
alt_hi: float,
start_time: datetime.datetime,
end_time: datetime.datetime,
uss_base_url: str,
isa_id: str,
rid_version: RIDVersion,
utm_client: infrastructure.UTMClientSession,
isa_version: Optional[str] = None,
participant_id: Optional[str] = None,
) -> ISAChange:
mutation = "create" if isa_version is None else "update"
) -> Dict[str, any]:
"""Build the payload expected to PUT or UPDATE an ISA on a DSS,
in accordance with the specified rid_version."""
if rid_version == RIDVersion.f3411_19:
body = {
return {
"extents": rid_v1.make_volume_4d(
area_vertices,
alt_lo,
Expand All @@ -465,25 +463,8 @@ def put_isa(
"flights_url": uss_base_url
+ v19.api.OPERATIONS[v19.api.OperationID.SearchFlights].path,
}
if isa_version is None:
op = v19.api.OPERATIONS[v19.api.OperationID.CreateIdentificationServiceArea]
url = op.path.format(id=isa_id)
else:
op = v19.api.OPERATIONS[v19.api.OperationID.UpdateIdentificationServiceArea]
url = op.path.format(id=isa_id, version=isa_version)
dss_response = ChangedISA(
mutation=mutation,
v19_query=fetch.query_and_describe(
utm_client,
op.verb,
url,
json=body,
scope=v19.constants.Scope.Write,
participant_id=participant_id,
),
)
elif rid_version == RIDVersion.f3411_22a:
body = {
return {
"extents": rid_v2.make_volume_4d(
area_vertices,
alt_lo,
Expand All @@ -493,16 +474,79 @@ def put_isa(
),
"uss_base_url": uss_base_url,
}
else:
raise NotImplementedError(
f"Cannot build ISA payload for RID version {rid_version}"
)


def build_isa_url(
rid_version: RIDVersion, isa_id: str, isa_version: Optional[str] = None
) -> (Operation, str):
"""Build the required URL to create, get, update or delete an ISA on a DSS,
in accordance with the specified rid_version and isa_version, if it is available.
Note that for mutations and deletions, isa_version must be provided.
"""
if rid_version == RIDVersion.f3411_19:
if isa_version is None:
op = v19.api.OPERATIONS[v19.api.OperationID.CreateIdentificationServiceArea]
return (op, op.path.format(id=isa_id))
else:
op = v19.api.OPERATIONS[v19.api.OperationID.UpdateIdentificationServiceArea]
return (op, op.path.format(id=isa_id, version=isa_version))
elif rid_version == RIDVersion.f3411_22a:
if isa_version is None:
op = v22a.api.OPERATIONS[
v22a.api.OperationID.CreateIdentificationServiceArea
]
url = op.path.format(id=isa_id)
return (op, op.path.format(id=isa_id))
else:
op = v22a.api.OPERATIONS[
v22a.api.OperationID.UpdateIdentificationServiceArea
]
url = op.path.format(id=isa_id, version=isa_version)
return (op, op.path.format(id=isa_id, version=isa_version))
else:
raise NotImplementedError(f"Cannot build ISA URL for RID version {rid_version}")


def put_isa(
area_vertices: List[s2sphere.LatLng],
alt_lo: float,
alt_hi: float,
start_time: datetime.datetime,
end_time: datetime.datetime,
uss_base_url: str,
isa_id: str,
rid_version: RIDVersion,
utm_client: infrastructure.UTMClientSession,
isa_version: Optional[str] = None,
participant_id: Optional[str] = None,
) -> ISAChange:
mutation = "create" if isa_version is None else "update"
body = build_isa_payload(
area_vertices,
alt_lo,
alt_hi,
start_time,
end_time,
uss_base_url,
rid_version,
)
(op, url) = build_isa_url(rid_version, isa_id, isa_version)
if rid_version == RIDVersion.f3411_19:
dss_response = ChangedISA(
mutation=mutation,
v19_query=fetch.query_and_describe(
utm_client,
op.verb,
url,
json=body,
scope=v19.constants.Scope.Write,
participant_id=participant_id,
),
)
elif rid_version == RIDVersion.f3411_22a:
dss_response = ChangedISA(
mutation=mutation,
v22a_query=fetch.query_and_describe(
Expand Down
9 changes: 9 additions & 0 deletions monitoring/monitorlib/rid.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ def openapi_search_isas_response_path(self) -> str:
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_get_isa_response_path(self) -> str:
if self == RIDVersion.f3411_19:
return schema_validation.F3411_19.GetIdentificationServiceAreaResponse
elif self == RIDVersion.f3411_22a:
return schema_validation.F3411_22a.GetIdentificationServiceAreaResponse
else:
raise ValueError(f"Unsupported RID version '{self}'")

@property
def openapi_put_isa_response_path(self) -> str:
if self == RIDVersion.f3411_19:
Expand Down
6 changes: 6 additions & 0 deletions monitoring/monitorlib/schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class F3411_19(str, Enum):
SearchIdentificationServiceAreasResponse = (
"components.schemas.SearchIdentificationServiceAreasResponse"
)
GetIdentificationServiceAreaResponse = (
"components.schemas.GetIdentificationServiceAreaResponse"
)
PutIdentificationServiceAreaResponse = (
"components.schemas.PutIdentificationServiceAreaResponse"
)
Expand All @@ -34,6 +37,9 @@ class F3411_22a(str, Enum):
SearchIdentificationServiceAreasResponse = (
"components.schemas.SearchIdentificationServiceAreasResponse"
)
GetIdentificationServiceAreaResponse = (
"components.schemas.GetIdentificationServiceAreaResponse"
)
PutIdentificationServiceAreaResponse = (
"components.schemas.PutIdentificationServiceAreaResponse"
)
Expand Down
2 changes: 1 addition & 1 deletion monitoring/prober/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def wrapper_default_scope(*args, **kwargs):
resource_type_code_descriptions: Dict[ResourceType, str] = {}


# Next code: 373
# Next code: 374
def register_resource_type(code: int, description: str) -> ResourceType:
"""Register that the specified code refers to the described resource.
Expand Down
Loading

0 comments on commit a1a8210

Please sign in to comment.