Skip to content

Commit

Permalink
Adjust timeouts and improve logging (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenjaminPelletier authored Feb 24, 2023
1 parent 422b92b commit e9a69df
Show file tree
Hide file tree
Showing 17 changed files with 90 additions and 48 deletions.
3 changes: 3 additions & 0 deletions monitoring/atproxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
ENV_KEY_PUBLIC_KEY = '{}_PUBLIC_KEY'.format(ENV_KEY_PREFIX)
ENV_KEY_TOKEN_AUDIENCE = '{}_TOKEN_AUDIENCE'.format(ENV_KEY_PREFIX)
ENV_KEY_CLIENT_BASIC_AUTH = '{}_CLIENT_BASIC_AUTH'.format(ENV_KEY_PREFIX)
ENV_KEY_QUERY_TIMEOUT = '{}_QUERY_TIMEOUT'.format(ENV_KEY_PREFIX)

# These keys map to entries in the Config class
KEY_TOKEN_PUBLIC_KEY = 'TOKEN_PUBLIC_KEY'
KEY_TOKEN_AUDIENCE = 'TOKEN_AUDIENCE'
KEY_CLIENT_BASIC_AUTH = 'CLIENT_BASIC_AUTH'
KEY_QUERY_TIMEOUT = 'QUERY_TIMEOUT'

KEY_CODE_VERSION = 'MONITORING_VERSION'

Expand All @@ -25,6 +27,7 @@ class Config(object):
os.environ.get(ENV_KEY_PUBLIC_KEY, '')).encode('utf-8')
TOKEN_AUDIENCE = os.environ.get(ENV_KEY_TOKEN_AUDIENCE, '')
CLIENT_BASIC_AUTH = os.environ[ENV_KEY_CLIENT_BASIC_AUTH]
QUERY_TIMEOUT = float(os.environ.get(ENV_KEY_QUERY_TIMEOUT, "59"))
CODE_VERSION = os.environ.get(KEY_CODE_VERSION, 'Unknown')


Expand Down
4 changes: 2 additions & 2 deletions monitoring/atproxy/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class PutQueryRequest(ImplicitDict):
"""HTTP return code."""


def fulfill_query(req: ImplicitDict) -> Tuple[str, int]:
def fulfill_query(req: ImplicitDict, timeout: timedelta) -> Tuple[str, int]:
"""Fulfill an incoming automated testing query.
:param req: Request descriptor from requests.py.
:param timeout: Maximum amount of time client has to fulfill query.
:return: Flask endpoint handler result (content, HTTP code).
"""
t_start = datetime.utcnow()
query = Query(type=req.request_type_name(), request=req)
timeout = timedelta(seconds=59)
id = str(uuid.uuid4())
logger.debug('Attempting to fulfill {} query {} from worker {}', query.type, id, os.getpid())

Expand Down
8 changes: 6 additions & 2 deletions monitoring/atproxy/routes_rid_injection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from datetime import timedelta
from typing import Tuple

import flask

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import RIDInjectionCreateTestRequest, RIDInjectionDeleteTestRequest
from monitoring.monitorlib.rid_automated_testing import injection_api
from implicitdict import ImplicitDict

timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/ridsp/injection/tests/<test_id>', methods=['PUT'])
@requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT])
Expand All @@ -23,11 +27,11 @@ def rid_injection_create_test(test_id: str) -> Tuple[str, int]:
msg = 'Create test {} unable to parse JSON: {}'.format(test_id, e)
return msg, 400

return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body))
return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body), timeout)


@webapp.route('/ridsp/injection/tests/<test_id>/<version>', methods=['DELETE'])
@requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT])
def rid_injection_delete_test(test_id: str, version: str) -> Tuple[str, int]:
"""Implements test deletion in RID automated testing injection API."""
return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version))
return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version), timeout)
8 changes: 6 additions & 2 deletions monitoring/atproxy/routes_rid_observation.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
from datetime import timedelta
from typing import Tuple

import flask
from uas_standards.astm.f3411.v19.constants import Scope

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import RIDObservationGetDisplayDataRequest, RIDObservationGetDetailsRequest

timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/riddp/observation/display_data', methods=['GET'])
@requires_scope([Scope.Read])
def rid_observation_display_data() -> Tuple[str, int]:
"""Implements retrieval of current display data per automated testing API."""
return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view']))
return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view']), timeout)


@webapp.route('/riddp/observation/display_data/<flight_id>', methods=['GET'])
@requires_scope([Scope.Read])
def rid_observation_flight_details(flight_id: str) -> Tuple[str, int]:
"""Implements get flight details endpoint per automated testing API."""
return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id))
return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id), timeout)
15 changes: 10 additions & 5 deletions monitoring/atproxy/routes_scd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Tuple

import flask
Expand All @@ -7,25 +8,29 @@

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import SCDInjectionStatusRequest, \
SCDInjectionCapabilitiesRequest, SCDInjectionPutFlightRequest, \
SCDInjectionDeleteFlightRequest, SCDInjectionClearAreaRequest
from monitoring.monitorlib.scd_automated_testing.scd_injection_api import SCOPE_SCD_QUALIFIER_INJECT


timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/scd/v1/status', methods=['GET'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_status() -> Tuple[str, int]:
"""Implements status in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionStatusRequest())
return handling.fulfill_query(SCDInjectionStatusRequest(), timeout)


@webapp.route('/scd/v1/capabilities', methods=['GET'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_capabilities() -> Tuple[str, int]:
"""Implements capabilities in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionCapabilitiesRequest())
return handling.fulfill_query(SCDInjectionCapabilitiesRequest(), timeout)


@webapp.route('/scd/v1/flights/<flight_id>', methods=['PUT'])
Expand All @@ -40,14 +45,14 @@ def scd_injection_put_flight(flight_id: str) -> Tuple[str, int]:
except ValueError as e:
msg = 'Upsert flight {} unable to parse JSON: {}'.format(flight_id, e)
return msg, 400
return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body))
return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body), timeout)


@webapp.route('/scd/v1/flights/<flight_id>', methods=['DELETE'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_delete_flight(flight_id: str) -> Tuple[str, int]:
"""Implements flight deletion in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id))
return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id), timeout)


@webapp.route('/scd/v1/clear_area_requests', methods=['POST'])
Expand All @@ -63,4 +68,4 @@ def scd_injection_clear_area() -> Tuple[str, int]:
msg = 'Clear area request unable to parse JSON: {}'.format(e)
return msg, 400

return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body))
return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body), timeout)
4 changes: 2 additions & 2 deletions monitoring/atproxy/run_locally.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ PORT=8075
if [ "$CI" == "true" ]; then
docker_args="--add-host host.docker.internal:host-gateway" # Required to reach other containers in Ubuntu (used for Github Actions)
else
docker_args=""
docker_args="-it"
fi

# shellcheck disable=SC2086
docker run ${docker_args} --name atproxy \
--rm \
-e ATPROXY_CLIENT_BASIC_AUTH="${CLIENT_BASIC_AUTH}" \
-e ATPROXY_PUBLIC_KEY="${PUBLIC_KEY}" \
-e ATPROXY_TOKEN_AUDIENCE="${AUD}" \
-e ATPROXY_QUERY_TIMEOUT="${ATPROXY_QUERY_TIMEOUT:-5}" \
-e PYTHONUNBUFFERED=TRUE \
-p ${PORT}:5000 \
-v "$(pwd)/build/test-certs:/var/test-certs:ro" \
Expand Down
1 change: 1 addition & 0 deletions monitoring/atproxy/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cp health_check.sh /app
gunicorn \
--preload \
--workers=4 \
--threads=2 \
--timeout 60 \
--bind=0.0.0.0:5000 \
--log-level debug \
Expand Down
20 changes: 7 additions & 13 deletions monitoring/mock_uss/atproxy_client/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _wait_for_atproxy() -> None:
while not webapp.is_stopping():
resp = None
try:
resp = requests.get(status_url, auth=basic_auth)
resp = requests.get(status_url, auth=basic_auth, timeout=8)
if resp.status_code == 200:
break
logger.info(
Expand Down Expand Up @@ -82,17 +82,14 @@ def _poll_atproxy() -> None:
basic_auth = mock_uss.webapp.config[config.KEY_ATPROXY_BASIC_AUTH].tuple

# Poll atproxy to see if there are any requests pending
query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth)
query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth, timeout=8)
if query.status_code != 200:
logger.error(
"Error {} polling {}:\n{}",
query.status_code,
query_url,
"JSON: " + json.dumps(query.response.json, indent=2)
if query.response.json
else f"Body: {query.response.content}",
json.dumps(query, indent=2),
)
time.sleep(5)
return
try:
queries_resp: ListQueriesResponse = ImplicitDict.parse(
Expand All @@ -102,7 +99,6 @@ def _poll_atproxy() -> None:
logger.error(
"Error parsing atproxy response to request for queries: {}", str(e)
)
time.sleep(5)
return
if not queries_resp.requests:
logger.debug("No queries currently pending.")
Expand Down Expand Up @@ -145,6 +141,7 @@ def _poll_atproxy() -> None:
f"{query_url}/{request_to_handle.id}",
json=fulfillment,
auth=basic_auth,
timeout=(1, 2),
)
if query.status_code != 204:
logger.error(
Expand All @@ -153,14 +150,11 @@ def _poll_atproxy() -> None:
fulfillment.return_code,
request_to_handle.id,
attempt,
"JSON: " + json.dumps(query.response.json, indent=2)
if query.response.json
else f"Body: {query.response.content}",
)
logger.debug(
"Query details for failed attempt to report response to atproxy:\n{}",
json.dumps(query, indent=2),
)
if query.status_code != 999 and query.status_code != 500:
# Error response is not retryable
break
else:
logger.info(
f"Delivered response to request {request_to_handle.id} to atproxy on attempt {attempt}"
Expand Down
9 changes: 9 additions & 0 deletions monitoring/mock_uss/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
if not tasks:
logger.info(f"No {trigger} tasks to initiate from process ID {os.getpid()}")
return
logger.info(
"Running {} {} task{}", len(tasks), trigger, "s" if len(tasks) > 1 else ""
)
for task_name, setup_task in tasks.items():
logger.info(
f"Initiating '{task_name}' {trigger} task from process ID {os.getpid()}"
Expand All @@ -113,6 +116,12 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
)
self.stop()
return
logger.info(
"Completed running {} {} task{}",
len(tasks),
trigger,
"s" if len(tasks) > 1 else "",
)

def setup(self):
self._run_one_time_tasks(TaskTrigger.Setup)
Expand Down
3 changes: 2 additions & 1 deletion monitoring/monitorlib/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ def query_and_describe(
else:
utm_session = True
req_kwargs = kwargs.copy()
req_kwargs["timeout"] = TIMEOUTS
if "timeout" not in req_kwargs:
req_kwargs["timeout"] = TIMEOUTS
t0 = datetime.datetime.utcnow()
try:
return describe_query(client.request(verb, url, **req_kwargs), t0)
Expand Down
25 changes: 19 additions & 6 deletions monitoring/monitorlib/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

EPOCH = datetime.datetime.utcfromtimestamp(0)
TOKEN_REFRESH_MARGIN = datetime.timedelta(seconds=15)
CLIENT_TIMEOUT = 60 # seconds
CLIENT_TIMEOUT = 10 # seconds


class AuthAdapter(object):
Expand Down Expand Up @@ -75,12 +75,18 @@ class UTMClientSession(requests.Session):
DSS).
"""

def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None):
def __init__(
self,
prefix_url: str,
auth_adapter: Optional[AuthAdapter] = None,
timeout_seconds: Optional[float] = None,
):
super().__init__()

self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url
self.auth_adapter = auth_adapter
self.default_scopes = None
self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT

# Overrides method on requests.Session
def prepare_request(self, request, **kwargs):
Expand Down Expand Up @@ -113,7 +119,8 @@ def auth(
return prepared_request

kwargs["auth"] = auth
kwargs["timeout"] = CLIENT_TIMEOUT
if "timeout" not in kwargs:
kwargs["timeout"] = self.timeout_seconds
return kwargs

def request(self, method, url, **kwargs):
Expand All @@ -130,14 +137,20 @@ class AsyncUTMTestSession:
* Automatically applies authorization according to adapter, when present
"""

def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None):
def __init__(
self,
prefix_url: str,
auth_adapter: Optional[AuthAdapter] = None,
timeout_seconds: Optional[float] = None,
):
self._client = None
loop = asyncio.get_event_loop()
loop.run_until_complete(self.build_session())

self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url
self.auth_adapter = auth_adapter
self.default_scopes = None
self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT

async def build_session(self):
self._client = ClientSession()
Expand Down Expand Up @@ -168,8 +181,8 @@ def adjust_request_kwargs(self, url, method, kwargs):
if method == "PUT" and kwargs.get("data"):
kwargs["json"] = kwargs["data"]
del kwargs["data"]

kwargs["timeout"] = CLIENT_TIMEOUT
if "timeout" not in kwargs:
kwargs["timeout"] = self.timeout_seconds
return kwargs

async def put(self, url, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion monitoring/uss_qualifier/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def main() -> int:

if config.artifacts:
if config.artifacts.report:
logger.info(json.dumps(config.artifacts.report))
if config.artifacts.report.redact_access_tokens:
logger.info("Redacting access tokens in report")
redact_access_tokens(report)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class FlightPlannerConfiguration(ImplicitDict):
injection_base_url: str
"""Base URL for the flight planner's implementation of the interfaces/automated-testing/scd/scd.yaml API"""

timeout_seconds: Optional[float] = None
"""Number of seconds to allow for requests to this flight planner. If None, use default."""

def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
try:
Expand Down Expand Up @@ -58,7 +61,7 @@ def __init__(
):
self.config = config
self.client = infrastructure.UTMClientSession(
self.config.injection_base_url, auth_adapter
self.config.injection_base_url, auth_adapter, config.timeout_seconds
)

# Flights injected by this target.
Expand Down
3 changes: 2 additions & 1 deletion monitoring/uss_qualifier/resources/netrid/observers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Tuple

from loguru import logger
import s2sphere
from implicitdict import ImplicitDict

Expand Down Expand Up @@ -45,7 +46,7 @@ def observe_system(
else None
)
except ValueError as e:
print("Error parsing observation response: {}".format(e))
logger.error("Error parsing observation response: {}", e)
result = None
return result, query

Expand Down
Loading

0 comments on commit e9a69df

Please sign in to comment.