diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md index 171b6d3b57..d83a7242ca 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md @@ -20,10 +20,11 @@ This scenario will check for the scope's availability and transparently ignore c Required scopes for running this scenario: - `utm.strategic_coordination` +- `utm.constraint_management` +- `utm.constraint_processing` Optional scopes that will allow the scenario to provide additional coverage: -- `utm.availability_arbitration` - `""` (empty string) ### id_generator @@ -317,6 +318,146 @@ it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../require If the DSS does not allow searching for operational intents when valid credentials are presented, it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. +### Constraint reference endpoints authentication test step + +#### 🛑 Unauthorized requests return the proper error message body check + +If the DSS under test does not return a proper error message body when an unauthorized request is received, +it fails to properly implement the OpenAPI specification that is part of **[astm.f3548.v21.DSS0005,3](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing credentials check + +If the DSS under test allows the creation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with invalid credentials check + +If the DSS under test allows the creation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with incorrect scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with valid credentials check + +If the DSS does not allow the creation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Create response format](../fragments/cr/crud/create_format.md) + +Check response format of a creation request. + +#### 🛑 Get constraint reference with missing credentials check + +If the DSS under test allows the fetching of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with invalid credentials check + +If the DSS under test allows the fetching of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with missing scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with incorrect scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with valid credentials check + +If the DSS does not allow fetching a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing credentials check + +If the DSS under test allows the mutation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with invalid credentials check + +If the DSS under test allows the mutation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with incorrect scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with valid credentials check + +If the DSS does not allow the mutation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Mutate response format](../fragments/cr/crud/update_format.md) + +Check response format of a mutation. + +#### 🛑 Delete constraint reference with missing credentials check + +If the DSS under test allows the deletion of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with invalid credentials check + +If the DSS under test allows the deletion of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with missing scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with incorrect scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with valid credentials check + +If the DSS does not allow the deletion of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing credentials check + +If the DSS under test allows searching for constraint references without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with invalid credentials check + +If the DSS under test allows searching for constraint references with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing scope check + +If the DSS under test allows searching for constraint references with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with incorrect scope check + +If the DSS under test allows searching for constraint references with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with valid credentials check + +If the DSS does not allow searching for constraint references when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + ### Availability endpoints authentication test step #### 🛑 Unauthorized requests return the proper error message body check diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py index 868d5f0685..1c642e0674 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py @@ -1,30 +1,25 @@ from datetime import datetime, timedelta from uas_standards.astm.f3548.v21.api import UssAvailabilityState - -from monitoring.monitorlib.fetch import QueryError -from monitoring.uss_qualifier.resources.astm.f3548.v21.subscription_params import ( - SubscriptionParams, -) from uas_standards.astm.f3548.v21.constants import ( Scope, ) from monitoring.monitorlib.auth import InvalidTokenSignatureAuth +from monitoring.monitorlib.fetch import QueryError from monitoring.monitorlib.geotemporal import Volume4D from monitoring.monitorlib.infrastructure import UTMClientSession from monitoring.prober.infrastructure import register_resource_type from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import ( DSSInstanceResource, - DSSInstance, ) from monitoring.uss_qualifier.resources.astm.f3548.v21.planning_area import ( PlanningAreaResource, ) from monitoring.uss_qualifier.resources.interuss.id_generator import IDGeneratorResource from monitoring.uss_qualifier.scenarios.astm.utm.dss import test_step_fragments -from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.availability_api_validator import ( - AvailabilityAuthValidator, +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.cr_api_validator import ( + ConstraintRefAuthValidator, ) from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( GenericAuthValidator, @@ -32,6 +27,9 @@ from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.oir_api_validator import ( OperationalIntentRefAuthValidator, ) +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.availability_api_validator import ( + AvailabilityAuthValidator, +) from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.sub_api_validator import ( SubscriptionAuthValidator, ) @@ -46,7 +44,7 @@ class AuthenticationValidation(TestScenario): A scenario that verifies that the DSS properly authenticates requests to all its endpoints, and properly validates the scopes of the requests depending on the action being performed. - Note that this scenario does not verif that a DSS only allows an entity owner to modify the: + Note that this scenario does not verif that a DSS only allows an entity owner to mutate or delete them:: this is covered in other scenarios. """ @@ -59,18 +57,6 @@ class AuthenticationValidation(TestScenario): _test_id: str """Base identifier for the entities that will be created""" - _sub_validator: SubscriptionAuthValidator - _oir_validator: OperationalIntentRefAuthValidator - _availability_validator: AvailabilityAuthValidator - - _sub_params: SubscriptionParams - - _scd_dss: DSSInstance - _availability_dss: DSSInstance - - _wrong_scope_for_availability: Scope - _wrong_scope_for_scd: Scope - def __init__( self, dss: DSSInstanceResource, @@ -85,10 +71,11 @@ def __init__( but has no other requirements. """ super().__init__() - # This is the proper scope for interactions with the DSS for subscriptions and operational intent # references in this scenario - scd_scopes = {Scope.StrategicCoordination: "create and delete subscriptions"} + scd_scopes = { + Scope.StrategicCoordination: "create and delete subscriptions and operational intent resources" + } # For the 'wrong' scope we pick anything from the available scopes that isn't the SCD, CMSA or empty scope: self._wrong_scope_for_scd = dss.get_authorized_scope_not_in( @@ -103,11 +90,7 @@ def __init__( if self._wrong_scope_for_scd is not None: scd_scopes[ self._wrong_scope_for_scd - ] = "Attempt to query subscriptions with wrong scope" - - availability_scopes = { - Scope.AvailabilityArbitration: "read and set availability for a USS" - } + ] = "Attempt to query subscriptions and OIRs with wrong scope" self._wrong_scope_for_availability = dss.get_authorized_scope_not_in( [ @@ -118,22 +101,45 @@ def __init__( ] ) + availability_scopes = { + Scope.AvailabilityArbitration: "read and set availability for a USS" + } + if self._wrong_scope_for_availability is not None: availability_scopes[ self._wrong_scope_for_availability ] = "Attempt to query availability with wrong scope" + constraints_scopes = { + Scope.ConstraintManagement: "Create, update, and delete constraints", + } + + # For the 'wrong' scope for constraints endpoints, we pick anything from the available scopes that isn't constraint management, processing or empty scope: + self._wrong_scope_for_constraints = dss.get_authorized_scope_not_in( + [Scope.ConstraintManagement, Scope.ConstraintProcessing, ""] + ) + + if self._wrong_scope_for_constraints is not None: + constraints_scopes[ + self._wrong_scope_for_constraints + ] = "Attempt to query constraints with wrong scope" + self._test_missing_scope = False if dss.can_use_scope(""): - scd_scopes[""] = "Attempt to query subscriptions with missing scope" + scd_scopes[ + "" + ] = "Attempt to query subscriptions and OIRs with missing scope" + availability_scopes[""] = "Attempt to query availability with missing scope" + constraints_scopes[""] = "Attempt to query constraints with missing scope" self._test_missing_scope = True # Note: .get_instance should be called once we know every scope we will need, # in order to guarantee that they are indeed available. self._scd_dss = dss.get_instance(scd_scopes) + self._constraints_dss = dss.get_instance(constraints_scopes) self._availability_dss = dss.get_instance(availability_scopes) - self._pid = [dss.participant_id] + self._pid = [self._scd_dss.participant_id] self._test_id = id_generator.id_factory.make_id(self.SUB_TYPE) self._planning_area = planning_area.specification @@ -144,21 +150,42 @@ def __init__( ) # Session that won't provide a token at all - self._no_auth_session = UTMClientSession(dss.base_url, auth_adapter=None) + self._no_auth_session = UTMClientSession( + self._scd_dss.base_url, auth_adapter=None + ) # Session that should provide a well-formed token with a wrong signature self._invalid_token_session = UTMClientSession( - dss.base_url, auth_adapter=InvalidTokenSignatureAuth() + self._scd_dss.base_url, auth_adapter=InvalidTokenSignatureAuth() ) def run(self, context: ExecutionContext): - generic_validator = GenericAuthValidator( + scd_generic_validator = GenericAuthValidator( self, self._scd_dss, Scope.StrategicCoordination ) - self._sub_validator = SubscriptionAuthValidator( + self.begin_test_scenario(context) + self._setup_case() + self.begin_test_case("Endpoint authorization") + + if self._test_missing_scope: + self.record_note("missing_scope", "Missing scope testing enabled.") + else: + self.record_note("missing_scope", "Missing scope testing disabled.") + + self.begin_test_step("Subscription endpoints authentication") + + if self._wrong_scope_for_scd: + self.record_note( + "wrong_scope_for_scd", + f"Incorrect scope testing enabled with scope {self._wrong_scope_for_scd}.", + ) + else: + self.record_note("wrong_scope", "Incorrect scope testing disabled.") + + sub_validator = SubscriptionAuthValidator( scenario=self, - generic_validator=generic_validator, + generic_validator=scd_generic_validator, dss=self._scd_dss, test_id=self._test_id, planning_area=self._planning_area, @@ -168,10 +195,16 @@ def run(self, context: ExecutionContext): test_wrong_scope=self._wrong_scope_for_scd, test_missing_scope=self._test_missing_scope, ) + sub_validator.verify_sub_endpoints_authentication() + + self.end_test_step() - self._oir_validator = OperationalIntentRefAuthValidator( + self.begin_test_step("Operational intents endpoints authentication") + # The validator relies on the 'current' time, so it should be instantiated + # just before being run + oir_validator = OperationalIntentRefAuthValidator( scenario=self, - generic_validator=generic_validator, + generic_validator=scd_generic_validator, dss=self._scd_dss, test_id=self._test_id, planning_area=self._planning_area, @@ -181,44 +214,37 @@ def run(self, context: ExecutionContext): test_wrong_scope=self._wrong_scope_for_scd, test_missing_scope=self._test_missing_scope, ) + oir_validator.verify_oir_endpoints_authentication() + self.end_test_step() - self._availability_validator = AvailabilityAuthValidator( + if self._wrong_scope_for_constraints: + self.record_note( + "wrong_scope_for_constraints", + f"Incorrect scope testing enabled for Constraint Reference endpoints with scope {self._wrong_scope_for_constraints}.", + ) + else: + self.record_note( + "wrong_scope_scd", + "Incorrect scope testing disabled for Constraint Reference endpoints", + ) + + self.begin_test_step("Constraint reference endpoints authentication") + cr_validator = ConstraintRefAuthValidator( scenario=self, generic_validator=GenericAuthValidator( - self, self._availability_dss, Scope.AvailabilityArbitration + self, self._constraints_dss, Scope.ConstraintManagement ), - dss=self._availability_dss, + dss=self._constraints_dss, test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, no_auth_session=self._no_auth_session, invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope_for_availability, + test_wrong_scope=self._wrong_scope_for_constraints, test_missing_scope=self._test_missing_scope, ) - - self._sub_params = self._planning_area.get_new_subscription_params( - subscription_id=self._test_id, - # Set this slightly in the past: we will update the subscriptions - # to a later value that still needs to be roughly 'now' without getting into the future - start_time=datetime.now().astimezone() - timedelta(seconds=10), - duration=timedelta(minutes=45), - # This is a planning area without constraint processing - notify_for_op_intents=True, - notify_for_constraints=False, - ) - - self.begin_test_scenario(context) - self._setup_case() - self.begin_test_case("Endpoint authorization") - - if self._wrong_scope_for_scd: - self.record_note( - "wrong_scope_scd", - f"Incorrect scope testing enabled for SCD endpoints with scope {self._wrong_scope_for_scd}.", - ) - else: - self.record_note( - "wrong_scope_scd", "Incorrect scope testing disabled for SCD endpoints" - ) + cr_validator.verify_cr_endpoints_authentication() + self.end_test_step() if self._wrong_scope_for_availability: self.record_note( @@ -231,22 +257,21 @@ def run(self, context: ExecutionContext): "Incorrect scope testing disabled for availability endpoints", ) - if self._test_missing_scope: - self.record_note("missing_scope", "Missing scope testing enabled.") - else: - self.record_note("missing_scope", "Missing scope testing disabled.") - - self.begin_test_step("Subscription endpoints authentication") - self._sub_validator.verify_sub_endpoints_authentication() - - self.end_test_step() - - self.begin_test_step("Operational intents endpoints authentication") - self._oir_validator.verify_oir_endpoints_authentication() - self.end_test_step() - self.begin_test_step("Availability endpoints authentication") - self._availability_validator.verify_availability_endpoints_authentication() + + availability_validator = AvailabilityAuthValidator( + scenario=self, + generic_validator=GenericAuthValidator( + self, self._availability_dss, Scope.AvailabilityArbitration + ), + dss=self._availability_dss, + test_id=self._test_id, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_availability, + test_missing_scope=self._test_missing_scope, + ) + availability_validator.verify_availability_endpoints_authentication() self.end_test_step() self.end_test_case() diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py new file mode 100644 index 0000000000..60b8b98205 --- /dev/null +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py @@ -0,0 +1,559 @@ +from datetime import datetime, timedelta +from typing import Optional + +from implicitdict import ImplicitDict, StringBasedDateTime +from uas_standards.astm.f3548.v21.api import ( + OPERATIONS, + OperationID, + OperationalIntentState, + ChangeOperationalIntentReferenceResponse, + PutOperationalIntentReferenceParameters, + Time, + QueryOperationalIntentReferenceParameters, + PutConstraintReferenceParameters, + ChangeConstraintReferenceResponse, + QueryConstraintReferenceParameters, +) + +from monitoring.monitorlib import fetch +from monitoring.monitorlib.fetch import QueryType, QueryError +from monitoring.monitorlib.geotemporal import Volume4D +from monitoring.monitorlib.infrastructure import UTMClientSession +from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance +from monitoring.uss_qualifier.resources.astm.f3548.v21.planning_area import ( + PlanningAreaSpecification, +) +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( + GenericAuthValidator, +) +from monitoring.uss_qualifier.scenarios.scenario import TestScenario, PendingCheck + +TIME_TOLERANCE_SEC = 1 + + +class ConstraintRefAuthValidator: + def __init__( + self, + scenario: TestScenario, + generic_validator: GenericAuthValidator, + dss: DSSInstance, + test_id: str, + planning_area: PlanningAreaSpecification, + planning_area_volume4d: Volume4D, + no_auth_session: UTMClientSession, + invalid_token_session: UTMClientSession, + test_wrong_scope: Optional[str] = None, + test_missing_scope: bool = False, + ): + """ + + Args: + scenario: Scenario on which the checks will be done + generic_validator: Provides generic verification methods for DSS API calls + dss: the DSS instance being tested + test_id: identifier to use for the OIRs that will be created + planning_area: the planning area to use for the subscriptions + planning_area_volume4d: a volume 4d encompassing the planning area + no_auth_session: an unauthenticated session + invalid_token_session: a session using a well-formed token that has an invalid signature + test_wrong_scope: a valid scope that is not allowed to perform operations on subscriptions, if available. + If None, checks using a wrong scope will be skipped. + test_missing_scope: if True, will attempt to perform operations without specifying a scope using the valid credentials. + """ + self._scenario = scenario + self._gen_val = generic_validator + self._dss = dss + self._pid = dss.participant_id + self._test_id = test_id + self._planning_area = planning_area + + time_start = datetime.now().astimezone() - timedelta(seconds=10) + time_end = time_start + timedelta(minutes=20) + + self._cr_params = planning_area.get_new_constraint_ref_params( + time_start=time_start, + time_end=time_end, + ) + self._planning_area_volume4d = planning_area_volume4d + self._no_auth_session = no_auth_session + self._invalid_token_session = invalid_token_session + + self._test_wrong_scope = test_wrong_scope + self._test_missing_scope = test_missing_scope + + def verify_cr_endpoints_authentication(self): + self._verify_cr_creation() + self._verify_cr_get() + self._verify_cr_mutation() + self._verify_cr_deletion() + self._verify_cr_search() + + def _verify_cr_creation(self): + op = OPERATIONS[OperationID.CreateConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + json=self._cr_params, + query_type=QueryType.F3548v21DSSCreateConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No auth + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + # Invalid token + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Create constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + # Valid credentials but missing scope: + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + # Valid credentials but wrong scope: + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Create constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + # Valid credentials + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 201: # As specified in OpenAPI spec + check.record_failed( + summary=f"Expected 201, got {valid_q.status_code}", + details=f"Error message: {valid_q.error_message}", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Create constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + oir_resp = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + # Save the current OIR + self._current_cr = oir_resp.constraint_reference + + def _verify_cr_get(self): + op = OPERATIONS[OperationID.GetConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + query_type=QueryType.F3548v21DSSGetConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No Auth + query_no_auth = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing credentials", self._pid + ) as check: + if query_no_auth.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_no_auth.status_code}", + query_timestamps=[query_no_auth.request.timestamp], + ) + self._gen_val.verify_4xx_response(query_no_auth) + + # Invalid token + query_invalid_token = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Get constraint reference with invalid credentials", self._pid + ) as check: + if query_invalid_token.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_invalid_token.status_code}", + query_timestamps=[query_invalid_token.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_invalid_token) + + # Valid credentials but missing scope + if self._test_missing_scope: + query_missing_scope = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing scope", self._pid + ) as check: + if query_missing_scope.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_missing_scope.status_code}", + query_timestamps=[query_missing_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_missing_scope) + + # Valid credentials but wrong scope + if self._test_wrong_scope: + query_wrong_scope = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Get constraint reference with incorrect scope", self._pid + ) as check: + if query_wrong_scope.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {query_wrong_scope.status_code}", + query_timestamps=[query_wrong_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_wrong_scope) + + # Valid credentials + query_valid_auth = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with valid credentials", self._pid + ) as check: + if query_valid_auth.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {query_valid_auth.status_code}", + query_timestamps=[query_valid_auth.request.timestamp], + ) + + def _verify_cr_mutation(self): + op = OPERATIONS[OperationID.UpdateConstraintReference] + new_params = PutConstraintReferenceParameters(**self._cr_params) + updated_volume = new_params.extents[0] + new_end = updated_volume.time_end.value.datetime - timedelta(seconds=10) + updated_volume.time_end = Time(value=StringBasedDateTime(new_end)) + new_params.extents = [updated_volume] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + json=new_params, + query_type=QueryType.F3548v21DSSUpdateConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Mutate constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + details=f"Mutation is expected to have succeeded, but got status {valid_q.status_code} with error {valid_q.error_message} instead", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Mutate constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + parsed_oir = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = parsed_oir.constraint_reference + + def _verify_cr_deletion(self): + op = OPERATIONS[OperationID.DeleteConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + query_type=QueryType.F3548v21DSSDeleteConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Delete constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = None + + def _verify_cr_search(self): + op = OPERATIONS[OperationID.QueryConstraintReferences] + query_kwargs = dict( + verb=op.verb, + url=op.path, + query_type=QueryType.F3548v21DSSQueryConstraintReferences, + json=QueryConstraintReferenceParameters( + area_of_interest=self._planning_area_volume4d.to_f3548v21() + ), + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing credentials", + self._pid, + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Search constraint references with invalid credentials", + self._pid, + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Search constraint references with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + def _sanity_check_cr_not_created( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + _, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + if qe.cause_status_code != 404: + check.record_failed( + summary="OIR was created by an unauthorized request.", + details="The Operational Intent Reference should not have been created, as the creation attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + qe.cause.request.timestamp, + ], + ) + self._gen_val.verify_4xx_response(qe.cause) + + def _sanity_check_oir_not_updated( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + oir, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + if ( + abs( + oir.time_end.value.datetime + - self._current_cr.time_end.value.datetime + ).total_seconds() + > TIME_TOLERANCE_SEC + ): + check.record_failed( + summary="OIR was updated by an unauthorized request.", + details=f"The Constraint Reference with id {self._test_id} should not have been updated, as the update attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + sanity_check.request.timestamp, + ], + ) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + check.record_failed( + summary="Could not fetch CR to confirm it has not been mutated", + details=f"The Constraint Reference with id {self._test_id} could not be fetched to confirm it has not been mutated: {qe}", + query_timestamps=[ + creation_q.request.timestamp, + qe.queries[0].request.timestamp, + ], + )