Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[monitorlib] Make RID mutation operations version-independent #41

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions monitoring/atproxy/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from gunicorn.workers.base import Worker
from loguru import logger


def pre_request(worker: Worker, req: Request):
"""gunicorn server hook called just before a worker processes the request."""
logger.debug("gunicorn pre_request from worker {} (OS PID {}): {} {}", worker.pid, os.getpid(), req.method, req.path)


def post_request(worker: Worker, req: Request, environ: dict, resp: Response):
"""gunicorn server hook called after a worker processes the request."""
logger.debug("gunicorn post_request from worker {} (OS PID {}): {} {} -> {}", worker.pid, os.getpid(), req.method, req.path, resp.status_code)


def worker_abort(worker: Worker):
"""gunicorn server hook called when a worker received the SIGABRT signal."""
logger.debug("gunicorn worker_abort from worker {} (OS PID {})", worker.pid, os.getpid())
2 changes: 2 additions & 0 deletions monitoring/atproxy/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ gunicorn \
--workers=4 \
--timeout 60 \
--bind=0.0.0.0:5000 \
--log-level debug \
--config ./gunicorn.conf.py \
monitoring.atproxy.app:webapp
33 changes: 33 additions & 0 deletions monitoring/mock_uss/gunicorn.conf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os

from gunicorn.arbiter import Arbiter
from gunicorn.http import Request
from gunicorn.http.wsgi import Response
from gunicorn.workers.base import Worker
from loguru import logger

from monitoring.mock_uss import webapp
Expand All @@ -18,6 +21,36 @@ def when_ready(server: Arbiter):
webapp.start_periodic_tasks_daemon()


def pre_request(worker: Worker, req: Request):
"""gunicorn server hook called just before a worker processes the request."""
logger.debug(
"gunicorn pre_request from worker {} (OS PID {}): {} {}",
worker.pid,
os.getpid(),
req.method,
req.path,
)


def post_request(worker: Worker, req: Request, environ: dict, resp: Response):
"""gunicorn server hook called after a worker processes the request."""
logger.debug(
"gunicorn post_request from worker {} (OS PID {}): {} {} -> {}",
worker.pid,
os.getpid(),
req.method,
req.path,
resp.status_code,
)


def worker_abort(worker: Worker):
"""gunicorn server hook called when a worker received the SIGABRT signal."""
logger.debug(
"gunicorn worker_abort from worker {} (OS PID {})", worker.pid, os.getpid()
)


def on_exit(server: Arbiter):
"""gunicorn server hook called just before exiting Gunicorn."""
logger.debug(
Expand Down
65 changes: 40 additions & 25 deletions monitoring/mock_uss/ridsp/routes_injection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

# Time after the last position report during which the created ISA will still
# exist. This value must be at least 60 seconds per NET0610.
from ...monitorlib.rid import RIDVersion

RECENT_POSITIONS_BUFFER = datetime.timedelta(seconds=60.2)


Expand All @@ -45,32 +47,42 @@ def ridsp_create_test(test_id: str) -> Tuple[str, int]:
(t0, t1) = req_body.get_span()
t1 += RECENT_POSITIONS_BUFFER
rect = req_body.get_rect()
flights_url = "{}/mock/ridsp/v1/uss/flights".format(
webapp.config.get(config.KEY_BASE_URL)
)
uss_base_url = "{}/mock/ridsp".format(webapp.config.get(config.KEY_BASE_URL))
mutated_isa = mutate.put_isa(
resources.utm_client, rect, t0, t1, flights_url, record.version
area=rect,
start_time=t0,
end_time=t1,
uss_base_url=uss_base_url,
isa_id=record.version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.utm_client,
)
if not mutated_isa.dss_response.success:
logger.error("Unable to create ISA in DSS")
response = ErrorResponse(message="Unable to create ISA in DSS")
response["errors"] = mutated_isa.dss_response.errors
if not mutated_isa.dss_query.success:
errors = "\n".join(mutated_isa.dss_query.errors)
msg = f"Unable to create ISA in DSS ({mutated_isa.dss_query.status_code}): {errors}"
logger.error(msg)
response = ErrorResponse(message=msg)
response["errors"] = mutated_isa.dss_query.errors
response["dss_query"] = mutated_isa.dss_query
return flask.jsonify(response), 412
bounds = f"(lat {rect.lat_lo().degrees}, lng {rect.lng_lo().degrees})-(lat {rect.lat_hi().degrees}, lng {rect.lng_hi().degrees})"
isa = mutated_isa.dss_query.isa
logger.info(
f"Created ISA {mutated_isa.dss_response.isa.id} from {t0} to {t1} at {bounds}"
f"Created ISA {isa.id} version {isa.version} from {t0} to {t1} at {bounds}"
)
record.isa_version = mutated_isa.dss_response.isa.version
record.isa_version = mutated_isa.dss_query.isa.version
for (url, notification) in mutated_isa.notifications.items():
code = notification.response.status_code
code = notification.query.status_code
if code == 200:
logger.warning(
f"Notification to {notification.request['url']} incorrectly returned 200 rather than 204"
f"Notification to {notification.query.request.url} incorrectly returned 200 rather than 204"
)
elif code != 204:
logger.error(
f"Notification failure {code} to {notification.request['url']}: "
)
msg = f"Notification failure {code} to {notification.query.request.url}"
logger.error(msg)
response = ErrorResponse(message=msg)
response["query"] = notification.query
return flask.jsonify(response), 412

with db as tx:
tx.tests[test_id] = record
Expand All @@ -91,27 +103,30 @@ def ridsp_delete_test(test_id: str) -> Tuple[str, int]:

# Delete ISA from DSS
deleted_isa = mutate.delete_isa(
resources.utm_client, record.version, record.isa_version
isa_id=record.version,
isa_version=record.isa_version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.utm_client,
)
if not deleted_isa.dss_response.success:
if not deleted_isa.dss_query.success:
logger.error(f"Unable to delete ISA {record.version} from DSS")
response = ErrorResponse(message="Unable to delete ISA from DSS")
response["errors"] = deleted_isa.dss_response.errors
response["errors"] = deleted_isa.dss_query.errors
return flask.jsonify(response), 412
logger.info(f"Created ISA {deleted_isa.dss_response.isa.id}")
logger.info(f"Created ISA {deleted_isa.dss_query.isa.id}")
result = ChangeTestResponse(version=record.version, injected_flights=record.flights)
for (url, notification) in deleted_isa.notifications.items():
code = notification.response.status_code
code = notification.query.status_code
if code == 200:
logger.warning(
f"Notification to {notification.request['url']} incorrectly returned 200 rather than 204"
f"Notification to {notification.query.request.url} incorrectly returned 200 rather than 204"
)
elif code != 204:
logger.error(
f"Notification failure {code} to {notification.request['url']}: "
f"Notification failure {code} to {notification.query.request.url}"
)
result["query"] = notification.query

with db as tx:
del tx.tests[test_id]
return flask.jsonify(
ChangeTestResponse(version=record.version, injected_flights=record.flights)
)
return flask.jsonify(result)
15 changes: 9 additions & 6 deletions monitoring/mock_uss/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
MAX_PERIODIC_LATENCY = timedelta(seconds=5)


def _get_trace(e: Exception) -> str:
return "".join(
traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)
)


class TaskTrigger(str, Enum):
Setup = "Setup"
Shutdown = "Shutdown"
Expand Down Expand Up @@ -99,11 +105,11 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
tx.task_errors.append(TaskError.from_exception(trigger, e))
if trigger == TaskTrigger.Shutdown:
logger.error(
f"{type(e).__name__} error in '{task_name}' on process ID {os.getpid()} while shutting down mock_uss: {str(e)}"
f"{type(e).__name__} error in '{task_name}' on process ID {os.getpid()} while shutting down mock_uss: {str(e)}\n{_get_trace(e)}"
)
else:
logger.error(
f"Stopping mock_uss due to {type(e).__name__} error in '{task_name}' {trigger} task on process ID {os.getpid()}: {str(e)}"
f"Stopping mock_uss due to {type(e).__name__} error in '{task_name}' {trigger} task on process ID {os.getpid()}: {str(e)}\n{_get_trace(e)}"
)
self.stop()
return
Expand Down Expand Up @@ -229,11 +235,8 @@ def _periodic_tasks_daemon_loop(self):
dt = MAX_PERIODIC_LATENCY
time.sleep(dt.total_seconds())
except Exception as e:
trace = "".join(
traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)
)
logger.error(
f"Shutting down mock_uss due to {type(e).__name__} error while executing '{task_to_execute}' periodic task: {str(e)}\n{trace}"
f"Shutting down mock_uss due to {type(e).__name__} error while executing '{task_to_execute}' periodic task: {str(e)}\n{_get_trace(e)}"
)
with db as tx:
tx.task_errors.append(TaskError.from_exception(TaskTrigger.Setup, e))
Expand Down
28 changes: 14 additions & 14 deletions monitoring/mock_uss/tracer/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ def _subscribe(
if base_url.endswith("/"):
base_url = base_url[0:-1]
if monitor_rid:
_subscribe_rid(
resources, base_url + "/tracer/f3411v19/v1/uss/identification_service_areas"
)
_subscribe_rid(resources, base_url + "/tracer/f3411v19")
if monitor_scd:
_subscribe_scd(resources, base_url)

Expand All @@ -138,16 +136,17 @@ def _rid_subscription_id() -> str:
RID_SUBSCRIPTION_KEY = "subscribe_ridsubscription"


def _subscribe_rid(resources: ResourceSet, callback_url: str) -> None:
def _subscribe_rid(resources: ResourceSet, uss_base_url: str) -> None:
_clear_existing_rid_subscription(resources, "old")

create_result = mutate.rid.put_subscription(
resources.dss_client,
resources.area,
resources.start_time,
resources.end_time,
callback_url,
_rid_subscription_id(),
create_result = mutate.rid.upsert_subscription(
area=resources.area,
start_time=resources.start_time,
end_time=resources.end_time,
uss_base_url=uss_base_url,
subscription_id=_rid_subscription_id(),
rid_version=RIDVersion.f3411_19,
utm_client=resources.dss_client,
)
resources.logger.log_new(RID_SUBSCRIPTION_KEY, create_result)
if not create_result.success:
Expand All @@ -168,9 +167,10 @@ def _clear_existing_rid_subscription(resources: ResourceSet, suffix: str) -> Non

if existing_result.subscription is not None:
del_result = mutate.rid.delete_subscription(
resources.dss_client,
_rid_subscription_id(),
existing_result.subscription.version,
subscription_id=_rid_subscription_id(),
subscription_version=existing_result.subscription.version,
rid_version=RIDVersion.f3411_19,
utm_client=resources.dss_client,
)
logfile = resources.logger.log_new(
"{}_{}_del".format(RID_SUBSCRIPTION_KEY, suffix), del_result
Expand Down
2 changes: 1 addition & 1 deletion monitoring/mock_uss/tracer/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,4 @@ def tracer_catch_all(u_path) -> Tuple[str, int]:
label = colored("Bad route", "red")
logger.error("{} to {} ({}): {}".format(label, u_path, owner, log_name))

return RESULT
return f"Path is not a supported endpoint: {u_path}", 404
Loading