Skip to content

Commit

Permalink
[monitorlib] Make RID mutation operations version-independent (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenjaminPelletier authored Feb 15, 2023
1 parent 5ca2a84 commit 2a01f84
Show file tree
Hide file tree
Showing 11 changed files with 705 additions and 370 deletions.
21 changes: 21 additions & 0 deletions monitoring/atproxy/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from gunicorn.workers.base import Worker
from loguru import logger


def pre_request(worker: Worker, req: Request):
"""gunicorn server hook called just before a worker processes the request."""
logger.debug("gunicorn pre_request from worker {} (OS PID {}): {} {}", worker.pid, os.getpid(), req.method, req.path)


def post_request(worker: Worker, req: Request, environ: dict, resp: Response):
"""gunicorn server hook called after a worker processes the request."""
logger.debug("gunicorn post_request from worker {} (OS PID {}): {} {} -> {}", worker.pid, os.getpid(), req.method, req.path, resp.status_code)


def worker_abort(worker: Worker):
"""gunicorn server hook called when a worker received the SIGABRT signal."""
logger.debug("gunicorn worker_abort from worker {} (OS PID {})", worker.pid, os.getpid())
2 changes: 2 additions & 0 deletions monitoring/atproxy/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ gunicorn \
--workers=4 \
--timeout 60 \
--bind=0.0.0.0:5000 \
--log-level debug \
--config ./gunicorn.conf.py \
monitoring.atproxy.app:webapp
33 changes: 33 additions & 0 deletions monitoring/mock_uss/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os

from gunicorn.arbiter import Arbiter
from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from gunicorn.workers.base import Worker
from loguru import logger

from monitoring.mock_uss import webapp
Expand All @@ -18,6 +21,36 @@ def when_ready(server: Arbiter):
webapp.start_periodic_tasks_daemon()


def pre_request(worker: Worker, req: Request):
"""gunicorn server hook called just before a worker processes the request."""
logger.debug(
"gunicorn pre_request from worker {} (OS PID {}): {} {}",
worker.pid,
os.getpid(),
req.method,
req.path,
)


def post_request(worker: Worker, req: Request, environ: dict, resp: Response):
"""gunicorn server hook called after a worker processes the request."""
logger.debug(
"gunicorn post_request from worker {} (OS PID {}): {} {} -> {}",
worker.pid,
os.getpid(),
req.method,
req.path,
resp.status_code,
)


def worker_abort(worker: Worker):
"""gunicorn server hook called when a worker received the SIGABRT signal."""
logger.debug(
"gunicorn worker_abort from worker {} (OS PID {})", worker.pid, os.getpid()
)


def on_exit(server: Arbiter):
"""gunicorn server hook called just before exiting Gunicorn."""
logger.debug(
Expand Down
65 changes: 40 additions & 25 deletions monitoring/mock_uss/ridsp/routes_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

# Time after the last position report during which the created ISA will still
# exist. This value must be at least 60 seconds per NET0610.
from ...monitorlib.rid import RIDVersion

RECENT_POSITIONS_BUFFER = datetime.timedelta(seconds=60.2)


Expand All @@ -45,32 +47,42 @@ def ridsp_create_test(test_id: str) -> Tuple[str, int]:
(t0, t1) = req_body.get_span()
t1 += RECENT_POSITIONS_BUFFER
rect = req_body.get_rect()
flights_url = "{}/mock/ridsp/v1/uss/flights".format(
webapp.config.get(config.KEY_BASE_URL)
)
uss_base_url = "{}/mock/ridsp".format(webapp.config.get(config.KEY_BASE_URL))
mutated_isa = mutate.put_isa(
resources.utm_client, rect, t0, t1, flights_url, record.version
area=rect,
start_time=t0,
end_time=t1,
uss_base_url=uss_base_url,
isa_id=record.version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.utm_client,
)
if not mutated_isa.dss_response.success:
logger.error("Unable to create ISA in DSS")
response = ErrorResponse(message="Unable to create ISA in DSS")
response["errors"] = mutated_isa.dss_response.errors
if not mutated_isa.dss_query.success:
errors = "\n".join(mutated_isa.dss_query.errors)
msg = f"Unable to create ISA in DSS ({mutated_isa.dss_query.status_code}): {errors}"
logger.error(msg)
response = ErrorResponse(message=msg)
response["errors"] = mutated_isa.dss_query.errors
response["dss_query"] = mutated_isa.dss_query
return flask.jsonify(response), 412
bounds = f"(lat {rect.lat_lo().degrees}, lng {rect.lng_lo().degrees})-(lat {rect.lat_hi().degrees}, lng {rect.lng_hi().degrees})"
isa = mutated_isa.dss_query.isa
logger.info(
f"Created ISA {mutated_isa.dss_response.isa.id} from {t0} to {t1} at {bounds}"
f"Created ISA {isa.id} version {isa.version} from {t0} to {t1} at {bounds}"
)
record.isa_version = mutated_isa.dss_response.isa.version
record.isa_version = mutated_isa.dss_query.isa.version
for (url, notification) in mutated_isa.notifications.items():
code = notification.response.status_code
code = notification.query.status_code
if code == 200:
logger.warning(
f"Notification to {notification.request['url']} incorrectly returned 200 rather than 204"
f"Notification to {notification.query.request.url} incorrectly returned 200 rather than 204"
)
elif code != 204:
logger.error(
f"Notification failure {code} to {notification.request['url']}: "
)
msg = f"Notification failure {code} to {notification.query.request.url}"
logger.error(msg)
response = ErrorResponse(message=msg)
response["query"] = notification.query
return flask.jsonify(response), 412

with db as tx:
tx.tests[test_id] = record
Expand All @@ -91,27 +103,30 @@ def ridsp_delete_test(test_id: str) -> Tuple[str, int]:

# Delete ISA from DSS
deleted_isa = mutate.delete_isa(
resources.utm_client, record.version, record.isa_version
isa_id=record.version,
isa_version=record.isa_version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.utm_client,
)
if not deleted_isa.dss_response.success:
if not deleted_isa.dss_query.success:
logger.error(f"Unable to delete ISA {record.version} from DSS")
response = ErrorResponse(message="Unable to delete ISA from DSS")
response["errors"] = deleted_isa.dss_response.errors
response["errors"] = deleted_isa.dss_query.errors
return flask.jsonify(response), 412
logger.info(f"Created ISA {deleted_isa.dss_response.isa.id}")
logger.info(f"Created ISA {deleted_isa.dss_query.isa.id}")
result = ChangeTestResponse(version=record.version, injected_flights=record.flights)
for (url, notification) in deleted_isa.notifications.items():
code = notification.response.status_code
code = notification.query.status_code
if code == 200:
logger.warning(
f"Notification to {notification.request['url']} incorrectly returned 200 rather than 204"
f"Notification to {notification.query.request.url} incorrectly returned 200 rather than 204"
)
elif code != 204:
logger.error(
f"Notification failure {code} to {notification.request['url']}: "
f"Notification failure {code} to {notification.query.request.url}"
)
result["query"] = notification.query

with db as tx:
del tx.tests[test_id]
return flask.jsonify(
ChangeTestResponse(version=record.version, injected_flights=record.flights)
)
return flask.jsonify(result)
15 changes: 9 additions & 6 deletions monitoring/mock_uss/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
MAX_PERIODIC_LATENCY = timedelta(seconds=5)


def _get_trace(e: Exception) -> str:
return "".join(
traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)
)


class TaskTrigger(str, Enum):
Setup = "Setup"
Shutdown = "Shutdown"
Expand Down Expand Up @@ -99,11 +105,11 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
tx.task_errors.append(TaskError.from_exception(trigger, e))
if trigger == TaskTrigger.Shutdown:
logger.error(
f"{type(e).__name__} error in '{task_name}' on process ID {os.getpid()} while shutting down mock_uss: {str(e)}"
f"{type(e).__name__} error in '{task_name}' on process ID {os.getpid()} while shutting down mock_uss: {str(e)}\n{_get_trace(e)}"
)
else:
logger.error(
f"Stopping mock_uss due to {type(e).__name__} error in '{task_name}' {trigger} task on process ID {os.getpid()}: {str(e)}"
f"Stopping mock_uss due to {type(e).__name__} error in '{task_name}' {trigger} task on process ID {os.getpid()}: {str(e)}\n{_get_trace(e)}"
)
self.stop()
return
Expand Down Expand Up @@ -229,11 +235,8 @@ def _periodic_tasks_daemon_loop(self):
dt = MAX_PERIODIC_LATENCY
time.sleep(dt.total_seconds())
except Exception as e:
trace = "".join(
traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)
)
logger.error(
f"Shutting down mock_uss due to {type(e).__name__} error while executing '{task_to_execute}' periodic task: {str(e)}\n{trace}"
f"Shutting down mock_uss due to {type(e).__name__} error while executing '{task_to_execute}' periodic task: {str(e)}\n{_get_trace(e)}"
)
with db as tx:
tx.task_errors.append(TaskError.from_exception(TaskTrigger.Setup, e))
Expand Down
28 changes: 14 additions & 14 deletions monitoring/mock_uss/tracer/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ def _subscribe(
if base_url.endswith("/"):
base_url = base_url[0:-1]
if monitor_rid:
_subscribe_rid(
resources, base_url + "/tracer/f3411v19/v1/uss/identification_service_areas"
)
_subscribe_rid(resources, base_url + "/tracer/f3411v19")
if monitor_scd:
_subscribe_scd(resources, base_url)

Expand All @@ -138,16 +136,17 @@ def _rid_subscription_id() -> str:
RID_SUBSCRIPTION_KEY = "subscribe_ridsubscription"


def _subscribe_rid(resources: ResourceSet, callback_url: str) -> None:
def _subscribe_rid(resources: ResourceSet, uss_base_url: str) -> None:
_clear_existing_rid_subscription(resources, "old")

create_result = mutate.rid.put_subscription(
resources.dss_client,
resources.area,
resources.start_time,
resources.end_time,
callback_url,
_rid_subscription_id(),
create_result = mutate.rid.upsert_subscription(
area=resources.area,
start_time=resources.start_time,
end_time=resources.end_time,
uss_base_url=uss_base_url,
subscription_id=_rid_subscription_id(),
rid_version=RIDVersion.f3411_19,
utm_client=resources.dss_client,
)
resources.logger.log_new(RID_SUBSCRIPTION_KEY, create_result)
if not create_result.success:
Expand All @@ -168,9 +167,10 @@ def _clear_existing_rid_subscription(resources: ResourceSet, suffix: str) -> Non

if existing_result.subscription is not None:
del_result = mutate.rid.delete_subscription(
resources.dss_client,
_rid_subscription_id(),
existing_result.subscription.version,
subscription_id=_rid_subscription_id(),
subscription_version=existing_result.subscription.version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.dss_client,
)
logfile = resources.logger.log_new(
"{}_{}_del".format(RID_SUBSCRIPTION_KEY, suffix), del_result
Expand Down
2 changes: 1 addition & 1 deletion monitoring/mock_uss/tracer/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,4 @@ def tracer_catch_all(u_path) -> Tuple[str, int]:
label = colored("Bad route", "red")
logger.error("{} to {} ({}): {}".format(label, u_path, owner, log_name))

return RESULT
return f"Path is not a supported endpoint: {u_path}", 404
Loading

0 comments on commit 2a01f84

Please sign in to comment.