diff --git a/monitoring/atproxy/config.py b/monitoring/atproxy/config.py index cc50dab8b2..7dfd760a2a 100644 --- a/monitoring/atproxy/config.py +++ b/monitoring/atproxy/config.py @@ -9,11 +9,13 @@ ENV_KEY_PUBLIC_KEY = '{}_PUBLIC_KEY'.format(ENV_KEY_PREFIX) ENV_KEY_TOKEN_AUDIENCE = '{}_TOKEN_AUDIENCE'.format(ENV_KEY_PREFIX) ENV_KEY_CLIENT_BASIC_AUTH = '{}_CLIENT_BASIC_AUTH'.format(ENV_KEY_PREFIX) +ENV_KEY_QUERY_TIMEOUT = '{}_QUERY_TIMEOUT'.format(ENV_KEY_PREFIX) # These keys map to entries in the Config class KEY_TOKEN_PUBLIC_KEY = 'TOKEN_PUBLIC_KEY' KEY_TOKEN_AUDIENCE = 'TOKEN_AUDIENCE' KEY_CLIENT_BASIC_AUTH = 'CLIENT_BASIC_AUTH' +KEY_QUERY_TIMEOUT = 'QUERY_TIMEOUT' KEY_CODE_VERSION = 'MONITORING_VERSION' @@ -25,6 +27,7 @@ class Config(object): os.environ.get(ENV_KEY_PUBLIC_KEY, '')).encode('utf-8') TOKEN_AUDIENCE = os.environ.get(ENV_KEY_TOKEN_AUDIENCE, '') CLIENT_BASIC_AUTH = os.environ[ENV_KEY_CLIENT_BASIC_AUTH] + QUERY_TIMEOUT = float(os.environ.get(ENV_KEY_QUERY_TIMEOUT, "59")) CODE_VERSION = os.environ.get(KEY_CODE_VERSION, 'Unknown') diff --git a/monitoring/atproxy/handling.py b/monitoring/atproxy/handling.py index e0bb9c8b77..7dafc3a9b6 100644 --- a/monitoring/atproxy/handling.py +++ b/monitoring/atproxy/handling.py @@ -41,15 +41,15 @@ class PutQueryRequest(ImplicitDict): """HTTP return code.""" -def fulfill_query(req: ImplicitDict) -> Tuple[str, int]: +def fulfill_query(req: ImplicitDict, timeout: timedelta) -> Tuple[str, int]: """Fulfill an incoming automated testing query. :param req: Request descriptor from requests.py. + :param timeout: Maximum amount of time client has to fulfill query. :return: Flask endpoint handler result (content, HTTP code). """ t_start = datetime.utcnow() query = Query(type=req.request_type_name(), request=req) - timeout = timedelta(seconds=59) id = str(uuid.uuid4()) logger.debug('Attempting to fulfill {} query {} from worker {}', query.type, id, os.getpid()) diff --git a/monitoring/atproxy/routes_rid_injection.py b/monitoring/atproxy/routes_rid_injection.py index 8c8d3ab9f0..2d010dffd6 100644 --- a/monitoring/atproxy/routes_rid_injection.py +++ b/monitoring/atproxy/routes_rid_injection.py @@ -1,14 +1,18 @@ +from datetime import timedelta from typing import Tuple import flask from . import handling from .app import webapp +from .config import KEY_QUERY_TIMEOUT from .oauth import requires_scope from .requests import RIDInjectionCreateTestRequest, RIDInjectionDeleteTestRequest from monitoring.monitorlib.rid_automated_testing import injection_api from implicitdict import ImplicitDict +timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT]) + @webapp.route('/ridsp/injection/tests/', methods=['PUT']) @requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT]) @@ -23,11 +27,11 @@ def rid_injection_create_test(test_id: str) -> Tuple[str, int]: msg = 'Create test {} unable to parse JSON: {}'.format(test_id, e) return msg, 400 - return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body)) + return handling.fulfill_query(RIDInjectionCreateTestRequest(test_id=test_id, request_body=req_body), timeout) @webapp.route('/ridsp/injection/tests//', methods=['DELETE']) @requires_scope([injection_api.SCOPE_RID_QUALIFIER_INJECT]) def rid_injection_delete_test(test_id: str, version: str) -> Tuple[str, int]: """Implements test deletion in RID automated testing injection API.""" - return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version)) + return handling.fulfill_query(RIDInjectionDeleteTestRequest(test_id=test_id, version=version), timeout) diff --git a/monitoring/atproxy/routes_rid_observation.py b/monitoring/atproxy/routes_rid_observation.py index fbbb3d3d7b..ce259d1cbd 100644 --- a/monitoring/atproxy/routes_rid_observation.py +++ b/monitoring/atproxy/routes_rid_observation.py @@ -1,3 +1,4 @@ +from datetime import timedelta from typing import Tuple import flask @@ -5,19 +6,22 @@ from . import handling from .app import webapp +from .config import KEY_QUERY_TIMEOUT from .oauth import requires_scope from .requests import RIDObservationGetDisplayDataRequest, RIDObservationGetDetailsRequest +timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT]) + @webapp.route('/riddp/observation/display_data', methods=['GET']) @requires_scope([Scope.Read]) def rid_observation_display_data() -> Tuple[str, int]: """Implements retrieval of current display data per automated testing API.""" - return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view'])) + return handling.fulfill_query(RIDObservationGetDisplayDataRequest(view=flask.request.args['view']), timeout) @webapp.route('/riddp/observation/display_data/', methods=['GET']) @requires_scope([Scope.Read]) def rid_observation_flight_details(flight_id: str) -> Tuple[str, int]: """Implements get flight details endpoint per automated testing API.""" - return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id)) + return handling.fulfill_query(RIDObservationGetDetailsRequest(id=flight_id), timeout) diff --git a/monitoring/atproxy/routes_scd.py b/monitoring/atproxy/routes_scd.py index 8a197176a7..f23716e363 100644 --- a/monitoring/atproxy/routes_scd.py +++ b/monitoring/atproxy/routes_scd.py @@ -1,3 +1,4 @@ +from datetime import timedelta from typing import Tuple import flask @@ -7,6 +8,7 @@ from . import handling from .app import webapp +from .config import KEY_QUERY_TIMEOUT from .oauth import requires_scope from .requests import SCDInjectionStatusRequest, \ SCDInjectionCapabilitiesRequest, SCDInjectionPutFlightRequest, \ @@ -14,18 +16,21 @@ from monitoring.monitorlib.scd_automated_testing.scd_injection_api import SCOPE_SCD_QUALIFIER_INJECT +timeout = timedelta(seconds=webapp.config[KEY_QUERY_TIMEOUT]) + + @webapp.route('/scd/v1/status', methods=['GET']) @requires_scope([SCOPE_SCD_QUALIFIER_INJECT]) def scd_injection_status() -> Tuple[str, int]: """Implements status in SCD automated testing injection API.""" - return handling.fulfill_query(SCDInjectionStatusRequest()) + return handling.fulfill_query(SCDInjectionStatusRequest(), timeout) @webapp.route('/scd/v1/capabilities', methods=['GET']) @requires_scope([SCOPE_SCD_QUALIFIER_INJECT]) def scd_injection_capabilities() -> Tuple[str, int]: """Implements capabilities in SCD automated testing injection API.""" - return handling.fulfill_query(SCDInjectionCapabilitiesRequest()) + return handling.fulfill_query(SCDInjectionCapabilitiesRequest(), timeout) @webapp.route('/scd/v1/flights/', methods=['PUT']) @@ -40,14 +45,14 @@ def scd_injection_put_flight(flight_id: str) -> Tuple[str, int]: except ValueError as e: msg = 'Upsert flight {} unable to parse JSON: {}'.format(flight_id, e) return msg, 400 - return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body)) + return handling.fulfill_query(SCDInjectionPutFlightRequest(flight_id=flight_id, request_body=req_body), timeout) @webapp.route('/scd/v1/flights/', methods=['DELETE']) @requires_scope([SCOPE_SCD_QUALIFIER_INJECT]) def scd_injection_delete_flight(flight_id: str) -> Tuple[str, int]: """Implements flight deletion in SCD automated testing injection API.""" - return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id)) + return handling.fulfill_query(SCDInjectionDeleteFlightRequest(flight_id=flight_id), timeout) @webapp.route('/scd/v1/clear_area_requests', methods=['POST']) @@ -63,4 +68,4 @@ def scd_injection_clear_area() -> Tuple[str, int]: msg = 'Clear area request unable to parse JSON: {}'.format(e) return msg, 400 - return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body)) + return handling.fulfill_query(SCDInjectionClearAreaRequest(request_body=req_body), timeout) diff --git a/monitoring/atproxy/run_locally.sh b/monitoring/atproxy/run_locally.sh index a6aca9c03a..09f3afa422 100755 --- a/monitoring/atproxy/run_locally.sh +++ b/monitoring/atproxy/run_locally.sh @@ -21,15 +21,15 @@ PORT=8075 if [ "$CI" == "true" ]; then docker_args="--add-host host.docker.internal:host-gateway" # Required to reach other containers in Ubuntu (used for Github Actions) else - docker_args="" + docker_args="-it" fi # shellcheck disable=SC2086 docker run ${docker_args} --name atproxy \ - --rm \ -e ATPROXY_CLIENT_BASIC_AUTH="${CLIENT_BASIC_AUTH}" \ -e ATPROXY_PUBLIC_KEY="${PUBLIC_KEY}" \ -e ATPROXY_TOKEN_AUDIENCE="${AUD}" \ + -e ATPROXY_QUERY_TIMEOUT="${ATPROXY_QUERY_TIMEOUT:-5}" \ -e PYTHONUNBUFFERED=TRUE \ -p ${PORT}:5000 \ -v "$(pwd)/build/test-certs:/var/test-certs:ro" \ diff --git a/monitoring/atproxy/start.sh b/monitoring/atproxy/start.sh index 0babcdabf3..771aabd4e6 100755 --- a/monitoring/atproxy/start.sh +++ b/monitoring/atproxy/start.sh @@ -21,6 +21,7 @@ cp health_check.sh /app gunicorn \ --preload \ --workers=4 \ + --threads=2 \ --timeout 60 \ --bind=0.0.0.0:5000 \ --log-level debug \ diff --git a/monitoring/mock_uss/atproxy_client/daemon.py b/monitoring/mock_uss/atproxy_client/daemon.py index 41fa125690..afb9dde275 100644 --- a/monitoring/mock_uss/atproxy_client/daemon.py +++ b/monitoring/mock_uss/atproxy_client/daemon.py @@ -53,7 +53,7 @@ def _wait_for_atproxy() -> None: while not webapp.is_stopping(): resp = None try: - resp = requests.get(status_url, auth=basic_auth) + resp = requests.get(status_url, auth=basic_auth, timeout=8) if resp.status_code == 200: break logger.info( @@ -82,17 +82,14 @@ def _poll_atproxy() -> None: basic_auth = mock_uss.webapp.config[config.KEY_ATPROXY_BASIC_AUTH].tuple # Poll atproxy to see if there are any requests pending - query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth) + query = fetch.query_and_describe(None, "GET", query_url, auth=basic_auth, timeout=8) if query.status_code != 200: logger.error( "Error {} polling {}:\n{}", query.status_code, query_url, - "JSON: " + json.dumps(query.response.json, indent=2) - if query.response.json - else f"Body: {query.response.content}", + json.dumps(query, indent=2), ) - time.sleep(5) return try: queries_resp: ListQueriesResponse = ImplicitDict.parse( @@ -102,7 +99,6 @@ def _poll_atproxy() -> None: logger.error( "Error parsing atproxy response to request for queries: {}", str(e) ) - time.sleep(5) return if not queries_resp.requests: logger.debug("No queries currently pending.") @@ -145,6 +141,7 @@ def _poll_atproxy() -> None: f"{query_url}/{request_to_handle.id}", json=fulfillment, auth=basic_auth, + timeout=(1, 2), ) if query.status_code != 204: logger.error( @@ -153,14 +150,11 @@ def _poll_atproxy() -> None: fulfillment.return_code, request_to_handle.id, attempt, - "JSON: " + json.dumps(query.response.json, indent=2) - if query.response.json - else f"Body: {query.response.content}", - ) - logger.debug( - "Query details for failed attempt to report response to atproxy:\n{}", json.dumps(query, indent=2), ) + if query.status_code != 999 and query.status_code != 500: + # Error response is not retryable + break else: logger.info( f"Delivered response to request {request_to_handle.id} to atproxy on attempt {attempt}" diff --git a/monitoring/mock_uss/server.py b/monitoring/mock_uss/server.py index 83fd442be9..6dcc2b7370 100644 --- a/monitoring/mock_uss/server.py +++ b/monitoring/mock_uss/server.py @@ -94,6 +94,9 @@ def _run_one_time_tasks(self, trigger: TaskTrigger): if not tasks: logger.info(f"No {trigger} tasks to initiate from process ID {os.getpid()}") return + logger.info( + "Running {} {} task{}", len(tasks), trigger, "s" if len(tasks) > 1 else "" + ) for task_name, setup_task in tasks.items(): logger.info( f"Initiating '{task_name}' {trigger} task from process ID {os.getpid()}" @@ -113,6 +116,12 @@ def _run_one_time_tasks(self, trigger: TaskTrigger): ) self.stop() return + logger.info( + "Completed running {} {} task{}", + len(tasks), + trigger, + "s" if len(tasks) > 1 else "", + ) def setup(self): self._run_one_time_tasks(TaskTrigger.Setup) diff --git a/monitoring/monitorlib/fetch/__init__.py b/monitoring/monitorlib/fetch/__init__.py index b0060b3e83..bd945b4b09 100644 --- a/monitoring/monitorlib/fetch/__init__.py +++ b/monitoring/monitorlib/fetch/__init__.py @@ -195,7 +195,8 @@ def query_and_describe( else: utm_session = True req_kwargs = kwargs.copy() - req_kwargs["timeout"] = TIMEOUTS + if "timeout" not in req_kwargs: + req_kwargs["timeout"] = TIMEOUTS t0 = datetime.datetime.utcnow() try: return describe_query(client.request(verb, url, **req_kwargs), t0) diff --git a/monitoring/monitorlib/infrastructure.py b/monitoring/monitorlib/infrastructure.py index 2c783efdab..48924f0293 100644 --- a/monitoring/monitorlib/infrastructure.py +++ b/monitoring/monitorlib/infrastructure.py @@ -18,7 +18,7 @@ EPOCH = datetime.datetime.utcfromtimestamp(0) TOKEN_REFRESH_MARGIN = datetime.timedelta(seconds=15) -CLIENT_TIMEOUT = 60 # seconds +CLIENT_TIMEOUT = 10 # seconds class AuthAdapter(object): @@ -75,12 +75,18 @@ class UTMClientSession(requests.Session): DSS). """ - def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None): + def __init__( + self, + prefix_url: str, + auth_adapter: Optional[AuthAdapter] = None, + timeout_seconds: Optional[float] = None, + ): super().__init__() self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url self.auth_adapter = auth_adapter self.default_scopes = None + self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT # Overrides method on requests.Session def prepare_request(self, request, **kwargs): @@ -113,7 +119,8 @@ def auth( return prepared_request kwargs["auth"] = auth - kwargs["timeout"] = CLIENT_TIMEOUT + if "timeout" not in kwargs: + kwargs["timeout"] = self.timeout_seconds return kwargs def request(self, method, url, **kwargs): @@ -130,7 +137,12 @@ class AsyncUTMTestSession: * Automatically applies authorization according to adapter, when present """ - def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None): + def __init__( + self, + prefix_url: str, + auth_adapter: Optional[AuthAdapter] = None, + timeout_seconds: Optional[float] = None, + ): self._client = None loop = asyncio.get_event_loop() loop.run_until_complete(self.build_session()) @@ -138,6 +150,7 @@ def __init__(self, prefix_url: str, auth_adapter: Optional[AuthAdapter] = None): self._prefix_url = prefix_url[0:-1] if prefix_url[-1] == "/" else prefix_url self.auth_adapter = auth_adapter self.default_scopes = None + self.timeout_seconds = timeout_seconds or CLIENT_TIMEOUT async def build_session(self): self._client = ClientSession() @@ -168,8 +181,8 @@ def adjust_request_kwargs(self, url, method, kwargs): if method == "PUT" and kwargs.get("data"): kwargs["json"] = kwargs["data"] del kwargs["data"] - - kwargs["timeout"] = CLIENT_TIMEOUT + if "timeout" not in kwargs: + kwargs["timeout"] = self.timeout_seconds return kwargs async def put(self, url, **kwargs): diff --git a/monitoring/uss_qualifier/main.py b/monitoring/uss_qualifier/main.py index 9508132618..363d97f259 100644 --- a/monitoring/uss_qualifier/main.py +++ b/monitoring/uss_qualifier/main.py @@ -80,7 +80,6 @@ def main() -> int: if config.artifacts: if config.artifacts.report: - logger.info(json.dumps(config.artifacts.report)) if config.artifacts.report.redact_access_tokens: logger.info("Redacting access tokens in report") redact_access_tokens(report) diff --git a/monitoring/uss_qualifier/resources/flight_planning/flight_planner.py b/monitoring/uss_qualifier/resources/flight_planning/flight_planner.py index bb66de1fb4..2362625592 100644 --- a/monitoring/uss_qualifier/resources/flight_planning/flight_planner.py +++ b/monitoring/uss_qualifier/resources/flight_planning/flight_planner.py @@ -31,6 +31,9 @@ class FlightPlannerConfiguration(ImplicitDict): injection_base_url: str """Base URL for the flight planner's implementation of the interfaces/automated-testing/scd/scd.yaml API""" + timeout_seconds: Optional[float] = None + """Number of seconds to allow for requests to this flight planner. If None, use default.""" + def __init__(self, *args, **kwargs): super().__init__(**kwargs) try: @@ -58,7 +61,7 @@ def __init__( ): self.config = config self.client = infrastructure.UTMClientSession( - self.config.injection_base_url, auth_adapter + self.config.injection_base_url, auth_adapter, config.timeout_seconds ) # Flights injected by this target. diff --git a/monitoring/uss_qualifier/resources/netrid/observers.py b/monitoring/uss_qualifier/resources/netrid/observers.py index bcb8c4233b..a48d004613 100644 --- a/monitoring/uss_qualifier/resources/netrid/observers.py +++ b/monitoring/uss_qualifier/resources/netrid/observers.py @@ -1,5 +1,6 @@ from typing import List, Optional, Tuple +from loguru import logger import s2sphere from implicitdict import ImplicitDict @@ -45,7 +46,7 @@ def observe_system( else None ) except ValueError as e: - print("Error parsing observation response: {}".format(e)) + logger.error("Error parsing observation response: {}", e) result = None return result, query diff --git a/monitoring/uss_qualifier/scenarios/astm/netrid/nominal_behavior.py b/monitoring/uss_qualifier/scenarios/astm/netrid/nominal_behavior.py index 5cbb8b2242..ca1d390544 100644 --- a/monitoring/uss_qualifier/scenarios/astm/netrid/nominal_behavior.py +++ b/monitoring/uss_qualifier/scenarios/astm/netrid/nominal_behavior.py @@ -5,6 +5,7 @@ import arrow from implicitdict import ImplicitDict +from loguru import logger from requests.exceptions import RequestException from monitoring.monitorlib.rid_automated_testing.injection_api import ( @@ -188,7 +189,7 @@ def _poll_during_flights(self): break delay = t_next - arrow.utcnow() if delay.total_seconds() > 0: - print( + logger.debug( f"Waiting {delay.total_seconds()} seconds before polling RID system again..." ) time.sleep(delay.total_seconds()) diff --git a/monitoring/uss_qualifier/scenarios/scenario.py b/monitoring/uss_qualifier/scenarios/scenario.py index fc9b68a80a..bb05f817e0 100644 --- a/monitoring/uss_qualifier/scenarios/scenario.py +++ b/monitoring/uss_qualifier/scenarios/scenario.py @@ -5,8 +5,8 @@ from typing import Callable, Dict, List, Optional, TypeVar, Union, Set import arrow - from implicitdict import StringBasedDateTime +from loguru import logger from monitoring import uss_qualifier as uss_qualifier_module from monitoring.monitorlib import fetch, inspection @@ -250,7 +250,7 @@ def record_note(self, key: str, message: str) -> None: message=message, timestamp=StringBasedDateTime(arrow.utcnow().datetime), ) - print(f"Note: {key} -> {message}") + logger.info(f"Note: {key} -> {message}") def begin_test_scenario(self) -> None: self._expect_phase(ScenarioPhase.NotStarted) @@ -303,7 +303,7 @@ def record_query(self, query: fetch.Query) -> None: if "queries" not in self._step_report: self._step_report.queries = [] self._step_report.queries.append(query) - print( + logger.debug( f"Queried {query.request['method']} {query.request['url']} -> {query.response.status_code}" ) diff --git a/monitoring/uss_qualifier/suites/suite.py b/monitoring/uss_qualifier/suites/suite.py index e1477cfc16..aea32c8582 100644 --- a/monitoring/uss_qualifier/suites/suite.py +++ b/monitoring/uss_qualifier/suites/suite.py @@ -5,6 +5,7 @@ from typing import Dict, List, Optional, TypeVar, Generic from implicitdict import StringBasedDateTime, ImplicitDict +from loguru import logger import yaml from monitoring import uss_qualifier as uss_qualifier_module @@ -42,9 +43,10 @@ def _print_failed_check(failed_check: FailedCheck) -> None: - print("New failed check:") yaml_lines = yaml.dump(json.loads(json.dumps(failed_check))).split("\n") - print("\n".join(" " + line for line in yaml_lines)) + logger.warning( + "New failed check:\n{}", "\n".join(" " + line for line in yaml_lines) + ) class TestSuiteAction(object): @@ -92,7 +94,7 @@ def run(self) -> TestSuiteActionReport: def _run_test_scenario(self) -> TestScenarioReport: scenario = self.test_scenario - print(f'Running "{scenario.documentation.name}" scenario...') + logger.info(f'Running "{scenario.documentation.name}" scenario...') scenario.on_failed_check = _print_failed_check try: try: @@ -107,18 +109,20 @@ def _run_test_scenario(self) -> TestScenarioReport: scenario.record_execution_error(e) report = scenario.get_report() if report.successful: - print(f'SUCCESS for "{scenario.documentation.name}" scenario') + logger.info(f'SUCCESS for "{scenario.documentation.name}" scenario') else: if "execution_error" in report: lines = report.execution_error.stacktrace.split("\n") - print("\n".join(" " + line for line in lines)) - print(f'FAILURE for "{scenario.documentation.name}" scenario') + logger.error( + "Execution error:\n{}", "\n".join(" " + line for line in lines) + ) + logger.warning(f'FAILURE for "{scenario.documentation.name}" scenario') return report def _run_test_suite(self) -> TestSuiteReport: - print(f"Beginning test suite {self.test_suite.definition.name}...") + logger.info(f"Beginning test suite {self.test_suite.definition.name}...") report = self.test_suite.run() - print(f"Completed test suite {self.test_suite.definition.name}") + logger.info(f"Completed test suite {self.test_suite.definition.name}") return report def _run_action_generator(self) -> ActionGeneratorReport: