Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust timeouts and improve logging #53

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions monitoring/atproxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
ENV_KEY_PUBLIC_KEY = '{}_PUBLIC_KEY'.format(ENV_KEY_PREFIX)
ENV_KEY_TOKEN_AUDIENCE = '{}_TOKEN_AUDIENCE'.format(ENV_KEY_PREFIX)
ENV_KEY_CLIENT_BASIC_AUTH = '{}_CLIENT_BASIC_AUTH'.format(ENV_KEY_PREFIX)
ENV_KEY_QUERY_TIMEOUT = '{}_QUERY_TIMEOUT'.format(ENV_KEY_PREFIX)

# These keys map to entries in the Config class
KEY_TOKEN_PUBLIC_KEY = 'TOKEN_PUBLIC_KEY'
KEY_TOKEN_AUDIENCE = 'TOKEN_AUDIENCE'
KEY_CLIENT_BASIC_AUTH = 'CLIENT_BASIC_AUTH'
KEY_QUERY_TIMEOUT = 'QUERY_TIMEOUT'

KEY_CODE_VERSION = 'MONITORING_VERSION'

Expand All @@ -25,6 +27,7 @@ class Config(object):
os.environ.get(ENV_KEY_PUBLIC_KEY, '')).encode('utf-8')
TOKEN_AUDIENCE = os.environ.get(ENV_KEY_TOKEN_AUDIENCE, '')
CLIENT_BASIC_AUTH = os.environ[ENV_KEY_CLIENT_BASIC_AUTH]
QUERY_TIMEOUT = float(os.environ.get(ENV_KEY_QUERY_TIMEOUT, "59"))
CODE_VERSION = os.environ.get(KEY_CODE_VERSION, 'Unknown')


Expand Down
4 changes: 2 additions & 2 deletions monitoring/atproxy/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ class PutQueryRequest(ImplicitDict):
"""HTTP return code."""


def fulfill_query(req: ImplicitDict) -> Tuple[str, int]:
def fulfill_query(req: ImplicitDict, timeout: timedelta) -> Tuple[str, int]:
"""Fulfill an incoming automated testing query.

:param req: Request descriptor from requests.py.
:param timeout: Maximum amount of time client has to fulfill query.
:return: Flask endpoint handler result (content, HTTP code).
"""
t_start = datetime.utcnow()
query = Query(type=req.request_type_name(), request=req)
timeout = timedelta(seconds=59)
id = str(uuid.uuid4())
logger.debug('Attempting to fulfill {} query {} from worker {}', query.type, id, os.getpid())

Expand Down
8 changes: 6 additions & 2 deletions monitoring/atproxy/routes_rid_injection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from datetime import timedelta
from typing import Tuple

import flask

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import RIDInjectionCreateTestRequest, RIDInjectionDeleteTestRequest
from monitoring.monitorlib.rid_automated_testing import injection_api
from implicitdict import ImplicitDict

timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/ridsp/injection/tests/<test_id>', methods=['PUT'])
@requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT])
Expand All @@ -23,11 +27,11 @@ def rid_injection_create_test(test_id: str) -> Tuple[str, int]:
msg = 'Create test {} unable to parse JSON: {}'.format(test_id, e)
return msg, 400

return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body))
return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body), timeout)


@webapp.route('/ridsp/injection/tests/<test_id>/<version>', methods=['DELETE'])
@requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT])
def rid_injection_delete_test(test_id: str, version: str) -> Tuple[str, int]:
"""Implements test deletion in RID automated testing injection API."""
return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version))
return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version), timeout)
8 changes: 6 additions & 2 deletions monitoring/atproxy/routes_rid_observation.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
from datetime import timedelta
from typing import Tuple

import flask
from uas_standards.astm.f3411.v19.constants import Scope

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import RIDObservationGetDisplayDataRequest, RIDObservationGetDetailsRequest

timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/riddp/observation/display_data', methods=['GET'])
@requires_scope([Scope.Read])
def rid_observation_display_data() -> Tuple[str, int]:
"""Implements retrieval of current display data per automated testing API."""
return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view']))
return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view']), timeout)


@webapp.route('/riddp/observation/display_data/<flight_id>', methods=['GET'])
@requires_scope([Scope.Read])
def rid_observation_flight_details(flight_id: str) -> Tuple[str, int]:
"""Implements get flight details endpoint per automated testing API."""
return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id))
return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id), timeout)
15 changes: 10 additions & 5 deletions monitoring/atproxy/routes_scd.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Tuple

import flask
Expand All @@ -7,25 +8,29 @@

from . import handling
from .app import webapp
from .config import KEY_QUERY_TIMEOUT
from .oauth import requires_scope
from .requests import SCDInjectionStatusRequest, \
SCDInjectionCapabilitiesRequest, SCDInjectionPutFlightRequest, \
SCDInjectionDeleteFlightRequest, SCDInjectionClearAreaRequest
from monitoring.monitorlib.scd_automated_testing.scd_injection_api import SCOPE_SCD_QUALIFIER_INJECT


timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT])


@webapp.route('/scd/v1/status', methods=['GET'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_status() -> Tuple[str, int]:
"""Implements status in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionStatusRequest())
return handling.fulfill_query(SCDInjectionStatusRequest(), timeout)


@webapp.route('/scd/v1/capabilities', methods=['GET'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_capabilities() -> Tuple[str, int]:
"""Implements capabilities in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionCapabilitiesRequest())
return handling.fulfill_query(SCDInjectionCapabilitiesRequest(), timeout)


@webapp.route('/scd/v1/flights/<flight_id>', methods=['PUT'])
Expand All @@ -40,14 +45,14 @@ def scd_injection_put_flight(flight_id: str) -> Tuple[str, int]:
except ValueError as e:
msg = 'Upsert flight {} unable to parse JSON: {}'.format(flight_id, e)
return msg, 400
return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body))
return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body), timeout)


@webapp.route('/scd/v1/flights/<flight_id>', methods=['DELETE'])
@requires_scope([SCOPE_SCD_QUALIFIER_INJECT])
def scd_injection_delete_flight(flight_id: str) -> Tuple[str, int]:
"""Implements flight deletion in SCD automated testing injection API."""
return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id))
return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id), timeout)


@webapp.route('/scd/v1/clear_area_requests', methods=['POST'])
Expand All @@ -63,4 +68,4 @@ def scd_injection_clear_area() -> Tuple[str, int]:
msg = 'Clear area request unable to parse JSON: {}'.format(e)
return msg, 400

return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body))
return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body), timeout)
4 changes: 2 additions & 2 deletions monitoring/atproxy/run_locally.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ PORT=8075
if [ "$CI" == "true" ]; then
docker_args="--add-host host.docker.internal:host-gateway" # Required to reach other containers in Ubuntu (used for Github Actions)
else
docker_args=""
docker_args="-it"
fi

# shellcheck disable=SC2086
docker run ${docker_args} --name atproxy \
--rm \
-e ATPROXY_CLIENT_BASIC_AUTH="${CLIENT_BASIC_AUTH}" \
-e ATPROXY_PUBLIC_KEY="${PUBLIC_KEY}" \
-e ATPROXY_TOKEN_AUDIENCE="${AUD}" \
-e ATPROXY_QUERY_TIMEOUT="${ATPROXY_QUERY_TIMEOUT:-5}" \
-e PYTHONUNBUFFERED=TRUE \
-p ${PORT}:5000 \
-v "$(pwd)/build/test-certs:/var/test-certs:ro" \
Expand Down
1 change: 1 addition & 0 deletions monitoring/atproxy/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ cp health_check.sh /app
gunicorn \
--preload \
--workers=4 \
--threads=2 \
--timeout 60 \
--bind=0.0.0.0:5000 \
--log-level debug \
Expand Down
20 changes: 7 additions & 13 deletions monitoring/mock_uss/atproxy_client/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _wait_for_atproxy() -> None:
while not webapp.is_stopping():
resp = None
try:
resp = requests.get(status_url, auth=basic_auth)
resp = requests.get(status_url, auth=basic_auth, timeout=8)
if resp.status_code == 200:
break
logger.info(
Expand Down Expand Up @@ -82,17 +82,14 @@ def _poll_atproxy() -> None:
basic_auth = mock_uss.webapp.config[config.KEY_ATPROXY_BASIC_AUTH].tuple

# Poll atproxy to see if there are any requests pending
query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth)
query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth, timeout=8)
if query.status_code != 200:
logger.error(
"Error {} polling {}:\n{}",
query.status_code,
query_url,
"JSON: " + json.dumps(query.response.json, indent=2)
if query.response.json
else f"Body: {query.response.content}",
json.dumps(query, indent=2),
)
time.sleep(5)
return
try:
queries_resp: ListQueriesResponse = ImplicitDict.parse(
Expand All @@ -102,7 +99,6 @@ def _poll_atproxy() -> None:
logger.error(
"Error parsing atproxy response to request for queries: {}", str(e)
)
time.sleep(5)
return
if not queries_resp.requests:
logger.debug("No queries currently pending.")
Expand Down Expand Up @@ -145,6 +141,7 @@ def _poll_atproxy() -> None:
f"{query_url}/{request_to_handle.id}",
json=fulfillment,
auth=basic_auth,
timeout=(1, 2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those values seem low compared to others.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional and hopefully correct :) Both calls to atproxy handler endpoints should have very little overhead -- just accessing shared memory and returning the result. The first one (query) is given a longer timeout (8 seconds) because it intentionally blocks for 5 seconds optimistically hoping that a request might show up on the queue. The fulfillment query (this one) merely writes to shared memory and returns, so therefore should never reasonably take longer than this (I think).

I'm not 100% sure this is correct though because there's obviously something I don't understand because #28 continues to be a problem, especially on my local machine where I can reproduce it pretty much every time.

)
if query.status_code != 204:
logger.error(
Expand All @@ -153,14 +150,11 @@ def _poll_atproxy() -> None:
fulfillment.return_code,
request_to_handle.id,
attempt,
"JSON: " + json.dumps(query.response.json, indent=2)
if query.response.json
else f"Body: {query.response.content}",
)
logger.debug(
"Query details for failed attempt to report response to atproxy:\n{}",
json.dumps(query, indent=2),
)
if query.status_code != 999 and query.status_code != 500:
# Error response is not retryable
break
else:
logger.info(
f"Delivered response to request {request_to_handle.id} to atproxy on attempt {attempt}"
Expand Down
9 changes: 9 additions & 0 deletions monitoring/mock_uss/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
if not tasks:
logger.info(f"No {trigger} tasks to initiate from process ID {os.getpid()}")
return
logger.info(
"Running {} {} task{}", len(tasks), trigger, "s" if len(tasks) > 1 else ""
)
for task_name, setup_task in tasks.items():
logger.info(
f"Initiating '{task_name}' {trigger} task from process ID {os.getpid()}"
Expand All @@ -113,6 +116,12 @@ def _run_one_time_tasks(self, trigger: TaskTrigger):
)
self.stop()
return
logger.info(
"Completed running {} {} task{}",
len(tasks),
trigger,
"s" if len(tasks) > 1 else "",
)

def setup(self):
self._run_one_time_tasks(TaskTrigger.Setup)
Expand Down
3 changes: 2 additions & 1 deletion monitoring/monitorlib/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ def query_and_describe(
else:
utm_session = True
req_kwargs = kwargs.copy()
req_kwargs["timeout"] = TIMEOUTS
if "timeout" not in req_kwargs:
req_kwargs["timeout"] = TIMEOUTS
t0 = datetime.datetime.utcnow()
try:
return describe_query(client.request(verb, url, **req_kwargs), t0)
Expand Down
25 changes: 19 additions & 6 deletions monitoring/monitorlib/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

EPOCH = datetime.datetime.utcfromtimestamp(0)
TOKEN_REFRESH_MARGIN = datetime.timedelta(seconds=15)
CLIENT_TIMEOUT = 60 # seconds
CLIENT_TIMEOUT = 10 # seconds


class AuthAdapter(object):
Expand Down Expand Up @@ -75,12 +75,18 @@ class UTMClientSession(requests.Session):
DSS).
"""

def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None):
def __init__(
self,
prefix_url: str,
auth_adapter: Optional[AuthAdapter] = None,
timeout_seconds: Optional[float] = None,
):
super().__init__()

self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url
self.auth_adapter = auth_adapter
self.default_scopes = None
self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT

# Overrides method on requests.Session
def prepare_request(self, request, **kwargs):
Expand Down Expand Up @@ -113,7 +119,8 @@ def auth(
return prepared_request

kwargs["auth"] = auth
kwargs["timeout"] = CLIENT_TIMEOUT
if "timeout" not in kwargs:
kwargs["timeout"] = self.timeout_seconds
return kwargs

def request(self, method, url, **kwargs):
Expand All @@ -130,14 +137,20 @@ class AsyncUTMTestSession:
* Automatically applies authorization according to adapter, when present
"""

def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None):
def __init__(
self,
prefix_url: str,
auth_adapter: Optional[AuthAdapter] = None,
timeout_seconds: Optional[float] = None,
):
self._client = None
loop = asyncio.get_event_loop()
loop.run_until_complete(self.build_session())

self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url
self.auth_adapter = auth_adapter
self.default_scopes = None
self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT

async def build_session(self):
self._client = ClientSession()
Expand Down Expand Up @@ -168,8 +181,8 @@ def adjust_request_kwargs(self, url, method, kwargs):
if method == "PUT" and kwargs.get("data"):
kwargs["json"] = kwargs["data"]
del kwargs["data"]

kwargs["timeout"] = CLIENT_TIMEOUT
if "timeout" not in kwargs:
kwargs["timeout"] = self.timeout_seconds
return kwargs

async def put(self, url, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion monitoring/uss_qualifier/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def main() -> int:

if config.artifacts:
if config.artifacts.report:
logger.info(json.dumps(config.artifacts.report))
if config.artifacts.report.redact_access_tokens:
logger.info("Redacting access tokens in report")
redact_access_tokens(report)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class FlightPlannerConfiguration(ImplicitDict):
injection_base_url: str
"""Base URL for the flight planner's implementation of the interfaces/automated-testing/scd/scd.yaml API"""

timeout_seconds: Optional[float] = None
"""Number of seconds to allow for requests to this flight planner. If None, use default."""

def __init__(self, *args, **kwargs):
super().__init__(**kwargs)
try:
Expand Down Expand Up @@ -58,7 +61,7 @@ def __init__(
):
self.config = config
self.client = infrastructure.UTMClientSession(
self.config.injection_base_url, auth_adapter
self.config.injection_base_url, auth_adapter, config.timeout_seconds
)

# Flights injected by this target.
Expand Down
3 changes: 2 additions & 1 deletion monitoring/uss_qualifier/resources/netrid/observers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Optional, Tuple

from loguru import logger
import s2sphere
from implicitdict import ImplicitDict

Expand Down Expand Up @@ -45,7 +46,7 @@ def observe_system(
else None
)
except ValueError as e:
print("Error parsing observation response: {}".format(e))
logger.error("Error parsing observation response: {}", e)
result = None
return result, query

Expand Down
Loading