diff --git a/monitoring/monitorlib/schema_validation.py b/monitoring/monitorlib/schema_validation.py index 8865e09914..72bffa250c 100644 --- a/monitoring/monitorlib/schema_validation.py +++ b/monitoring/monitorlib/schema_validation.py @@ -50,6 +50,7 @@ class F3411_22a(str, Enum): class F3548_21(str, Enum): OpenAPIPath = "interfaces/astm-utm/Protocol/utm.yaml" + ErrorResponse = "components.schemas.ErrorResponse" GetOperationalIntentDetailsResponse = ( "components.schemas.GetOperationalIntentDetailsResponse" ) diff --git a/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py b/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py index 530de8e116..b176eedb7d 100644 --- a/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py +++ b/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py @@ -446,6 +446,9 @@ def __init__( def can_use_scope(self, scope: str) -> bool: return scope in self._auth_adapter.scopes + def get_authorized_scopes(self) -> Set[str]: + return self._auth_adapter.scopes.copy() + def get_instance(self, scopes_required: Dict[str, str]) -> DSSInstance: """Get a client object ready to be used. diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md index efca4dc598..387760901b 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md @@ -17,9 +17,12 @@ for multiple scopes (so that a wrong scope may be used in place of the correct o This scenario will check for the scope's availability and transparently ignore checks that can't be conducted. -The scopes the scenario is expected to be allowed to use are: +Required scopes for running this scenario: - `utm.strategic_coordination` + +Optional scopes that will allow the scenario to provide additional coverage: + - `utm.availability_arbitration` - `""` (empty string) diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py index 5f42b398a9..6891720f67 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py @@ -1,3 +1,4 @@ +import random from datetime import datetime, timedelta from uas_standards.astm.f3548.v21.constants import ( @@ -28,10 +29,11 @@ class AuthenticationValidation(TestScenario): """ - A scenario that verifies that the DSS properly authenticates requests to all its endpoints. + A scenario that verifies that the DSS properly authenticates requests to all its endpoints, + and properly validates the scopes of the requests depending on the action being performed. - This scenario does not (yet) cover anything related to authorization: this first version - is intended to cover DSS0210,A2-7-2,7 + Note that this scenario does not verif that a DSS only allows an entity owner to modify the: + this is covered in other scenarios. """ SUB_TYPE = register_resource_type( @@ -58,14 +60,18 @@ def __init__( """ super().__init__() scopes = {Scope.StrategicCoordination: "create and delete subscriptions"} - # We use the AvailabilityArbitration scope as the 'wrong' scope for some tests - # this checks if we are allowed to use it - self._wrong_scope = None - if dss.can_use_scope(Scope.AvailabilityArbitration): + # For the 'wrong' scope we pick anything from the available scopes that isn't the SCD or empty scope: + available_scopes = dss.get_authorized_scopes() + available_scopes.remove(Scope.StrategicCoordination) + available_scopes.remove("") + + self._wrong_scope = ( + random.choice(list(available_scopes)) if available_scopes else None + ) + if self._wrong_scope: scopes[ - Scope.AvailabilityArbitration + self._wrong_scope ] = "Attempt to query subscriptions with wrong scope" - self._wrong_scope = Scope.AvailabilityArbitration self._test_missing_scope = False if dss.can_use_scope(""): diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/generic.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/generic.py index 9dbcb4eaed..da6f8df0f0 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/generic.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/generic.py @@ -1,8 +1,9 @@ from uas_standards.astm.f3548.v21.constants import Scope -from monitoring.monitorlib import fetch +from monitoring.monitorlib import fetch, schema_validation from monitoring.monitorlib.auth import InvalidTokenSignatureAuth from monitoring.monitorlib.infrastructure import UTMClientSession +from monitoring.monitorlib.schema_validation import F3548_21 from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance from monitoring.uss_qualifier.scenarios.scenario import TestScenario @@ -39,7 +40,7 @@ def query_invalid_token(self, **query_kwargs) -> fetch.Query: """ q = fetch.query_and_describe( client=self._invalid_token_session, - scope=Scope.StrategicCoordination, + scope=self._valid_scope, **query_kwargs, ) self._scenario.record_query(q) @@ -80,22 +81,20 @@ def query_valid_auth(self, **query_kwargs) -> fetch.Query: return q def verify_4xx_response(self, q: fetch.Query): - """Verifies that the passed query response's body is a valid ErrorResponse: - it is either empty or contains a single 'message' field, as per the OpenAPI spec. + """Verifies that the passed query response's body is a valid ErrorResponse, as per the OpenAPI spec.""" - Note that 409 responses to Operational Intent Reference mutations will contain more fields, - these are not handled here. - """ with self._scenario.check( "Unauthorized requests return the proper error message body" ) as check: - if len(q.response.json) == 0: - return - elif len(q.response.json) == 1 and "message" in q.response.json: - return - else: + errors = schema_validation.validate( + F3548_21.OpenAPIPath, + F3548_21.ErrorResponse, + q.response.json, + ) + if errors: check.record_failed( summary="Unexpected error response body", - details=f"Response body for {q.request.method} query to {q.request.url} should be empty or contain a single 'message' field. Was: {q.response.json}", + details=f"Response body for {q.request.method} query to {q.request.url} failed validation: {errors}, " + f"body content was: {q.response.json}", query_timestamps=[q.request.timestamp], )