From ce82dbc3335cc7728957d2877e643fa7811ca7b4 Mon Sep 17 00:00:00 2001 From: Punam Date: Wed, 18 Dec 2024 16:19:49 -0800 Subject: [PATCH] Position Reporter - initial implementation --- monitoring/position_reporter/README.md | 3 + monitoring/position_reporter/__init__.py | 82 ++++++++ .../api/pos_report_plan.yaml | 195 ++++++++++++++++++ .../position_reporter/docker-compose.yaml | 30 +++ monitoring/position_reporter/gunicorn.conf.py | 70 +++++++ .../position_reports/__init__.py | 0 .../position_reports/position_report.py | 26 +++ .../position_reports/sender.py | 75 +++++++ .../position_reporter/positionreporter.py | 12 ++ monitoring/position_reporter/routes.py | 48 +++++ monitoring/position_reporter/run_locally.sh | 40 ++++ monitoring/position_reporter/server.py | 27 +++ monitoring/position_reporter/start.sh | 29 +++ 13 files changed, 637 insertions(+) create mode 100644 monitoring/position_reporter/README.md create mode 100644 monitoring/position_reporter/__init__.py create mode 100644 monitoring/position_reporter/api/pos_report_plan.yaml create mode 100644 monitoring/position_reporter/docker-compose.yaml create mode 100644 monitoring/position_reporter/gunicorn.conf.py create mode 100644 monitoring/position_reporter/position_reports/__init__.py create mode 100644 monitoring/position_reporter/position_reports/position_report.py create mode 100644 monitoring/position_reporter/position_reports/sender.py create mode 100644 monitoring/position_reporter/positionreporter.py create mode 100644 monitoring/position_reporter/routes.py create mode 100755 monitoring/position_reporter/run_locally.sh create mode 100644 monitoring/position_reporter/server.py create mode 100755 monitoring/position_reporter/start.sh diff --git a/monitoring/position_reporter/README.md b/monitoring/position_reporter/README.md new file mode 100644 index 0000000000..0f08295763 --- /dev/null +++ b/monitoring/position_reporter/README.md @@ -0,0 +1,3 @@ +Position Reporter + +Reports positions as per the position report plan sent to it. diff --git a/monitoring/position_reporter/__init__.py b/monitoring/position_reporter/__init__.py new file mode 100644 index 0000000000..4c38b24257 --- /dev/null +++ b/monitoring/position_reporter/__init__.py @@ -0,0 +1,82 @@ +import inspect +import os +from typing import Optional, Callable, Any +import logging +from monitoring.position_reporter.server import PositionReporter +from monitoring.monitorlib import auth + +webapp = PositionReporter(__name__) + + +def import_environment_variable( + var_name: str, + required: bool = True, + default: Optional[str] = None, + mutator: Optional[Callable[[str], Any]] = None, +) -> None: + """Import a value from a named environment variable into the webapp configuration. + + Args: + var_name: Environment variable name (key). Also used as the webapp configuration key for that variable. + required: Whether the variable must be specified by the user. If True, a ValueError will be raised if the + variable is not specified by the user. If False, the webapp configuration will not be populated if no + default is provided. If default is specified, the default value is treated as specification by the user. + default: If the variable is not required, then use this value when it is not specified by the user. The default + value should be the string from the environment variable rather than the output of the mutator, if present. + mutator: If specified, apply this function to the string value of the environment variable to obtain the + variable to actually store in the configuration. + """ + if var_name in os.environ: + str_value = os.environ[var_name] + elif default is not None: + str_value = default + elif required: + stack = inspect.stack() + raise ValueError( + f"System cannot proceed because required environment variable '{var_name}' was not found. Required from {stack[1].filename}:{stack[1].lineno}" + ) + else: + str_value = None + + if str_value is not None: + webapp.config[var_name] = str_value if mutator is None else mutator(str_value) + + +def require_config_value(config_key: str) -> None: + if config_key not in webapp.config: + stack = inspect.stack() + raise ValueError( + f"System cannot proceed because required configuration key '{config_key}' was not found. Required from {stack[1].filename}:{stack[1].lineno}" + ) + + +fmt = "%(asctime)s, %(filename)s:%(lineno)-4s | %(message)s" +datefmt = "%Y-%m-%d:%H:%M:%S" +# Configure the root logger +logging.basicConfig( + format=fmt, + datefmt=datefmt, + level=logging.INFO +) + +AUTH_SPEC = "POS_REP_AUTH_SPEC" +import_environment_variable(AUTH_SPEC) +require_config_value(AUTH_SPEC) + +msg = ( + "################################################################################\n" + + "################################ Configuration ################################\n" + + "\n".join("## {}: {}".format(key, webapp.config[key]) for key in webapp.config) + + "\n" + + "################################################################################" +) +logging.info("Configuration:\n" + msg) + +auth_client = auth.make_auth_adapter(webapp.config[AUTH_SPEC]) + + +def get_auth_client(): + return auth_client + + +from monitoring.position_reporter import routes diff --git a/monitoring/position_reporter/api/pos_report_plan.yaml b/monitoring/position_reporter/api/pos_report_plan.yaml new file mode 100644 index 0000000000..78afb84066 --- /dev/null +++ b/monitoring/position_reporter/api/pos_report_plan.yaml @@ -0,0 +1,195 @@ +openapi: 3.0.2 +info: + title: Position Report Plan Interface + version: 0.4.4 + description: >- + This interface is between uss_qualifier and position_reporter. + + Uss_qualifier posts position report plan to position reporter. Position reporter then send the position reports + to the specified url of a participant USS. + +components: + securitySchemes: + Authority: + type: oauth2 + flows: + clientCredentials: + tokenUrl: https://auth.example.com/oauth/token + scopes: + test.flight_data.direct_automated_test: |- + Test framework may determine the test-readiness of the Position Reporter. + test.flight_data.position_report_plan: |- + Test framework send the position report plan to the Position Reporter. + description: |- + Authorization from, or on behalf of, an authorization authority, augmenting standard strategic conflict detection or other similar authorization for the purpose of automated testing. + + This authority will issue access tokens that are JSON Web Tokens as defined in RFC 7519, using the `RS256` algorithm for the signature, and publish to all providers the public key for verifying that signature. + + The following fields must be included in the JWT claim for access tokens issued by this authority: + + * `exp`, with a time no further than 1 hour in the future. + + * `sub`, with unique ID of the client requesting the access token. + + * `scope`, with a string composed of a space-separated list of strings indicating the scopes granted, per RFC 6749. + + * `aud`, with the fully qualified domain name of the URL the access token will be used to access. For example, if a USS were querying the endpoint at https://uss.example.com:8888/flight_planning/v1/flight_plans/db41f454-b255-470e-98d6-4c5096a295a1, the access token included in the request should specify `"aud": "uss.example.com"`. + + Clients must provide these access tokens in an `Authorization` header in the form `Bearer ` in accordance with RFC 6750. + + schemas: + FlightPlanID: + description: >- + String identifying a user flight plan. Format matches a version-4 UUID according to RFC 4122. + maxLength: 36 + minLength: 36 + type: string + format: uuid + pattern: >- + ^[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-4[0-9a-fA-F]{3}\-[8-b][0-9a-fA-F]{3}\-[0-9a-fA-F]{12}$ + example: 03e5572a-f733-49af-bc14-8a18bd53ee39 + + StatusResponse: + type: object + required: + - status + properties: + status: + description: >- + The status of this automated testing interface. + + - `Starting`: the interface is starting and the automated test driver should wait before sending requests. + + - `Ready`: the interface is ready to receive test requests. + type: string + enum: [Starting, Ready] + example: Ready + api_name: + description: |- + Indication of the API implemented at this URL. Must be "Position Report Plan Interface". + type: string + example: Position Report Plan Interface + api_version: + description: |- + Indication of the API version implemented at this URL. Must be "v0.4.4" when implementing this version of the API. + type: string + example: v0.4.4 + + PositionReportPlan: + description: >- + Position report plan . + required: + - id + - pos_url + - positions + type: object + properties: + latitude: + type: number + format: float + maximum: 90 + exclusiveMaximum: false + minimum: -90 + exclusiveMinimum: false + longitude: + type: number + format: float + minimum: -180 + exclusiveMaximum: false + maximum: 180 + exclusiveMinimum: false + altitude: + type: number + format: float + minimum: -8000 + exclusiveMinimum: false + maximum: 100000 + exclusiveMaximum: false + offset_ms: + description: >- + Time offset, in milliseconds, from the previous position + type: integer + speed: + type: number + format: float + track: + type: number + format: float + + PostPositionReportPlanRequest: + description: >- + Post the flight position plan to the client + type: object + required: + - id + - pos_url + - positions + properties: + id: + description: participant id of the USS to which the position reports are to be sent. + type: string + pos_url: + description: >- + The url to which position reports have to be sent + type: string + positions: + type: array + items: + $ref: '#/components/schemas/PositionReportPlan' + +paths: + /status: + get: + security: + - Authority: + - interuss.flight_planning.direct_automated_test + responses: + '200': + content: + application/json: + schema: + $ref: '#/components/schemas/StatusResponse' + description: >- + This automated testing interface is available and its status was retrieved successfully. + '401': + description: Bearer access token was not provided in Authorization header, token could not be decoded, or token was invalid. + '403': + description: The access token was decoded successfully but did not include a scope appropriate to this endpoint. + '404': + description: This automated testing interface is not available. + summary: Status of this automated testing interface + description: Get the status of this automated testing interface. + operationId: GetStatus + + /position_report_plan/{flight_plan_id}: + parameters: + - name: flight_plan_id + in: path + required: true + description: A UUID-formatted string identifying the user's flight plan intent. + schema: + $ref: '#/components/schemas/FlightPlanID' + post: + security: + - Authority: + - interuss.flight_data.position_report_plan + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PostPositionReportPlanRequest' + required: true + responses: + '200': + description: Requested data was processed successfully + '401': + description: Bearer access token was not provided in Authorization header, token could not be decoded, or token was invalid. + '403': + description: The access token was decoded successfully but did not include a scope appropriate to this endpoint. + '409': + description: The request contains a duplicate request_id and the response for that request is not available, or another conflict condition occurred. + + summary: Post flight's position reports plan + operationId: PostPositionReportPlan + description: >- + This endpoint simulates position reports being submitted for flight. diff --git a/monitoring/position_reporter/docker-compose.yaml b/monitoring/position_reporter/docker-compose.yaml new file mode 100644 index 0000000000..aa028a4af0 --- /dev/null +++ b/monitoring/position_reporter/docker-compose.yaml @@ -0,0 +1,30 @@ +version: '3.8' + +services: + + position_reporter: + container_name: position_reporter_trial1 + hostname: scdsc.position_reporter_trial1 + image: interuss/monitoring + command: position_reporter/start.sh + environment: + - POS_REP_AUTH_SPEC=DummyOAuth(http://oauth.authority.localutm:8085/token,uss1) + - MOCK_USS_BASE_URL=http://scdsc.position_reporter_trial1 + # TODO: remove interaction_logging once dedicated mock_uss is involved in tests + - POS_REP_PORT=80 + - MOCK_USS_PROXY_VALUES=x_for=1,x_proto=1,x_host=1,x_prefix=1,x_port=1 + expose: + - 80 + ports: + - 8074:80 + user: "${UID_GID}" + restart: always + networks: + - interop_ecosystem_network + extra_hosts: + - host.docker.internal:host-gateway + + +networks: + interop_ecosystem_network: + external: true diff --git a/monitoring/position_reporter/gunicorn.conf.py b/monitoring/position_reporter/gunicorn.conf.py new file mode 100644 index 0000000000..92552c9fb2 --- /dev/null +++ b/monitoring/position_reporter/gunicorn.conf.py @@ -0,0 +1,70 @@ +import os + +from gunicorn.arbiter import Arbiter +from gunicorn.http import Request +from gunicorn.http.wsgi import Response +from gunicorn.workers.base import Worker +import logging + +from monitoring.position_reporter import webapp + + +def on_starting(server: Arbiter): + """gunicorn server hook called just before master process is initialized.""" + logging.debug("on_starting") + webapp.setup() + + +def when_ready(server: Arbiter): + """gunicorn server hook called just after the server is started.""" + logging.debug("when_ready") + webapp.start_periodic_tasks_daemon() + + +def _skip_logging(req: Request) -> bool: + # Status endpoint is polled constantly for liveness; to avoid filling logs, we don't log it + if req.path == "/status" and req.method == "GET": + return True + return False + + +def pre_request(worker: Worker, req: Request): + """gunicorn server hook called just before a worker processes the request.""" + if not _skip_logging(req): + logging.debug( + "gunicorn pre_request from worker {} (OS PID {}): {} {}".format( + worker.pid, + os.getpid(), + req.method, + req.path, + ) + ) + + +def post_request(worker: Worker, req: Request, environ: dict, resp: Response): + """gunicorn server hook called after a worker processes the request.""" + if not _skip_logging(req): + logging.debug( + "gunicorn post_request from worker {} (OS PID {}): {} {} -> {}".format( + worker.pid, + os.getpid(), + req.method, + req.path, + resp.status_code, + ) + ) + + +def worker_abort(worker: Worker): + """gunicorn server hook called when a worker received the SIGABRT signal.""" + logging.debug( + "gunicorn worker_abort from worker {} (OS PID {})".format(worker.pid, os.getpid()) + ) + + +def on_exit(server: Arbiter): + """gunicorn server hook called just before exiting Gunicorn.""" + logging.debug( + f"on_exit from process {os.getpid()} with arbiter process {server.pid}" + ) + webapp.shutdown(None, None) diff --git a/monitoring/position_reporter/position_reports/__init__.py b/monitoring/position_reporter/position_reports/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/monitoring/position_reporter/position_reports/position_report.py b/monitoring/position_reporter/position_reports/position_report.py new file mode 100644 index 0000000000..12397125a6 --- /dev/null +++ b/monitoring/position_reporter/position_reports/position_report.py @@ -0,0 +1,26 @@ +from implicitdict import ImplicitDict, StringBasedDateTime +from typing import List, Optional + + +class PositionReport(ImplicitDict): + longitude: float + latitude: float + altitude: float + time_measured: StringBasedDateTime + speed: float + track: float + + +class PositionReportPlan(ImplicitDict): + longitude: float + latitude: float + altitude: float + offset_ms: int + speed: float + track: float + + +class PositionReportsPlan(ImplicitDict): + id: Optional[str] + pos_url: str + positions: List[PositionReportPlan] diff --git a/monitoring/position_reporter/position_reports/sender.py b/monitoring/position_reporter/position_reports/sender.py new file mode 100644 index 0000000000..59dc1a1ed0 --- /dev/null +++ b/monitoring/position_reporter/position_reports/sender.py @@ -0,0 +1,75 @@ +import asyncio +import aiohttp +import logging +import datetime +import time +import json + +from implicitdict import StringBasedDateTime +from monitoring.position_reporter.position_reports.position_report import PositionReport, PositionReportPlan, \ + PositionReportsPlan +from monitoring.monitorlib.infrastructure import AuthAdapter + + +def get_position_report_from_plan(pr: PositionReportPlan, time_measured: datetime.datetime) -> PositionReport: + return PositionReport( + latitude=pr.latitude, + longitude=pr.longitude, + altitude=pr.latitude, + time_measured=StringBasedDateTime(time_measured), + speed=pr.speed, + track=pr.track + ) + + +async def post_position(pos_base_url: str, pr: PositionReport, client: aiohttp.ClientSession): + logging.info(f"Posted position to {pos_base_url} : \n{json.dumps(pr, indent=4)}") + async with client.post(url=pos_base_url, data=json.dumps(pr)) as response: + content = await response.text() + logging.info(f"Response status received: {response.status}") + logging.debug(f"Response received: {content}") + + +async def send_positions_periodically(pos_base_url: str, prs: PositionReportsPlan, client: aiohttp.ClientSession, + time_start: datetime.datetime): + tasks = [] + prev_time = time_start + i = 0 + for pr in prs: + i += 1 + next_time = prev_time + datetime.timedelta(seconds=pr.offset_ms / 1000) + prev_time = next_time + position_report = get_position_report_from_plan(pr, next_time) + logging.info(f"\n Position {i} - {position_report}\n") + await asyncio.sleep(pr.offset_ms / 1000) + task = asyncio.create_task(post_position(pos_base_url, position_report, client)) + tasks.append(task) + + results = await asyncio.gather(*tasks) + + +async def send_position_reports_async(req_id: str, pos_base_url: str, auth_client: AuthAdapter, + prs: PositionReportsPlan, + time_start: datetime.datetime): + auth_token = auth_client.get_headers(url=pos_base_url, scopes=["interuss.flight_data.position"]) + async with aiohttp.ClientSession(headers=auth_token) as client: + task2 = asyncio.create_task(send_positions_periodically(pos_base_url, prs, client, time_start)) + await task2 + logging.info("All position reports sent") + + await client.close() + logging.debug("Closed session") + + logging.info(f"==Req id {req_id} : Sent all position reports.") + + +def send_position_reports(req_id: str, pos_url: str, auth_client: AuthAdapter, prs: PositionReportsPlan, + time_start: datetime.datetime): + logging.info(f"Start req id {req_id} at {time_start}") + wait_to_start = time_start - datetime.datetime.now() + if wait_to_start.total_seconds() >= 0: + time.sleep(wait_to_start.total_seconds()) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(send_position_reports_async(req_id, pos_url, auth_client, prs, time_start)) diff --git a/monitoring/position_reporter/positionreporter.py b/monitoring/position_reporter/positionreporter.py new file mode 100644 index 0000000000..531aa6ff6a --- /dev/null +++ b/monitoring/position_reporter/positionreporter.py @@ -0,0 +1,12 @@ +import sys +from monitoring.position_reporter import webapp + + +def main(argv): + del argv + webapp.setup() + webapp.run(host="localhost", port=5000) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/monitoring/position_reporter/routes.py b/monitoring/position_reporter/routes.py new file mode 100644 index 0000000000..51c4553546 --- /dev/null +++ b/monitoring/position_reporter/routes.py @@ -0,0 +1,48 @@ +import random + +import logging +import json +import threading +import datetime +import flask +from implicitdict import ImplicitDict + +from monitoring.position_reporter import get_auth_client +from monitoring.position_reporter.position_reports.position_report import PositionReportsPlan +from monitoring.position_reporter.position_reports.sender import send_position_reports +from monitoring.position_reporter import webapp + + +@webapp.route('/status', methods=["GET"]) +def status(): + # req = flask.request.json + res = { + "status": "Ready", + "api_name": "Position Report Plan Interface", + "api_version": "v0.1.0" + } + return flask.jsonify(res), 200 + + +@webapp.route('/send_pos', methods=['POST']) +def send_pos(): + # Start a new thread to send the periodic POST requests + req_plan = flask.request.json + + req_id = random.randint(1, 100) + + positions_plan = ImplicitDict.parse(req_plan, PositionReportsPlan) + + logging.info(f"******* Incoming req thread - {threading.current_thread().name}") + + time_start = datetime.datetime.now() + + logging.info(f"Time start for positions - {time_start}") + + threading.Thread(target=send_position_reports, + kwargs={"req_id": req_id, "pos_url": positions_plan.pos_url, "auth_client": get_auth_client(), + "prs": positions_plan.positions, "time_start": time_start}).start() + + logging.info(f"******* End of req id {req_id} main thread - {threading.current_thread().name}") + + return f"Completed POST requests to {positions_plan.pos_url} at {datetime.datetime.now()}" diff --git a/monitoring/position_reporter/run_locally.sh b/monitoring/position_reporter/run_locally.sh new file mode 100755 index 0000000000..75ccdba6f4 --- /dev/null +++ b/monitoring/position_reporter/run_locally.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -eo pipefail + +# Find and change to repo root directory +OS=$(uname) +if [[ "$OS" == "Darwin" ]]; then + # OSX uses BSD readlink + BASEDIR="$(dirname "$0")" +else + BASEDIR=$(readlink -e "$(dirname "$0")") +fi +cd "${BASEDIR}/../.." || exit 1 + +( +cd monitoring || exit 1 +make image +) + +AUTH_SPEC="DummyOAuth(http://host.docker.internal:8085/token,uss1)" +container_name="position_reporter" + +PORT=8074 +#BASE_URL="http://${MOCK_USS_TOKEN_AUDIENCE:-host.docker.internal}:${PORT}" + +if [ "$CI" == "true" ]; then + docker_args="--add-host host.docker.internal:host-gateway" # Required to reach other containers in Ubuntu (used for Github Actions) +else + docker_args="-it" +fi + +docker container rm -f ${container_name} || echo "No pre-existing ${container_name} container to remove" + +# shellcheck disable=SC2086 +docker run ${docker_args} --name ${container_name} \ + -e POS_REP_AUTH_SPEC="${AUTH_SPEC}" \ + -p ${PORT}:5000 \ + "$@" \ + interuss/monitoring \ + position_reporter/start.sh diff --git a/monitoring/position_reporter/server.py b/monitoring/position_reporter/server.py new file mode 100644 index 0000000000..b50044172f --- /dev/null +++ b/monitoring/position_reporter/server.py @@ -0,0 +1,27 @@ +import flask +import os +import logging +from typing import Optional + + +class PositionReporter(flask.Flask): + + def __init__(self, *args, **kwargs): + self._pid = os.getpid() + super(PositionReporter, self).__init__(*args, **kwargs) + + def setup(self): + logging.info(f"Position Reporter Setup from process {self._pid}") + + def start_periodic_tasks_daemon(self): + logging.info(f"Initiating periodic task daemon from process {self._pid}") + + def shutdown(self, signal_number: Optional[int], stack): + if os.getpid() != self._pid: + logging.debug(f"Process {os.getpid()} skipping shutdown procedure") + return + + logging.debug( + f"Process {os.getpid()} stopped with signal {signal_number} while MockUSS server was not stopping" + ) + diff --git a/monitoring/position_reporter/start.sh b/monitoring/position_reporter/start.sh new file mode 100755 index 0000000000..ab2bf5cfa2 --- /dev/null +++ b/monitoring/position_reporter/start.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +# This script is intended to be called from within a Docker container running +# mock_uss via the interuss/monitoring image. In that context, this script is +# the entrypoint into the mock_uss server. + +# Ensure mock_uss is the working directory +OS=$(uname) +if [[ $OS == "Darwin" ]]; then + # OSX uses BSD readlink + BASEDIR="$(dirname "$0")" +else + BASEDIR=$(readlink -e "$(dirname "$0")") +fi +cd "${BASEDIR}" || exit 1 + +# Use mock_uss's health check +cp health_check.sh /app + +# Start mock_uss server +port=${POS_PEP_PORT:-5000} +export PYTHONUNBUFFERED=TRUE +gunicorn \ + --preload \ + --config ./gunicorn.conf.py \ + --workers=4 \ + --worker-tmp-dir="/dev/shm" \ + "--bind=0.0.0.0:${port}" \ + monitoring.position_reporter:webapp