diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md index a661a3a047..634d321645 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md @@ -20,10 +20,11 @@ This scenario will check for the scope's availability and transparently ignore c Required scopes for running this scenario: - `utm.strategic_coordination` +- `utm.constraint_management` +- `utm.constraint_processing` Optional scopes that will allow the scenario to provide additional coverage: -- `utm.availability_arbitration` - `""` (empty string) ### id_generator @@ -313,6 +314,146 @@ it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../require If the DSS does not allow searching for operational intents when valid credentials are presented, it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. +### Constraint reference endpoints authentication test step + +#### 🛑 Unauthorized requests return the proper error message body check + +If the DSS under test does not return a proper error message body when an unauthorized request is received, +it fails to properly implement the OpenAPI specification that is part of **[astm.f3548.v21.DSS0005,3](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing credentials check + +If the DSS under test allows the creation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with invalid credentials check + +If the DSS under test allows the creation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with incorrect scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with valid credentials check + +If the DSS does not allow the creation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Create response format](../fragments/cr/crud/create_format.md) + +Check response format of a creation request. + +#### 🛑 Get constraint reference with missing credentials check + +If the DSS under test allows the fetching of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with invalid credentials check + +If the DSS under test allows the fetching of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with missing scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with incorrect scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with valid credentials check + +If the DSS does not allow fetching a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing credentials check + +If the DSS under test allows the mutation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with invalid credentials check + +If the DSS under test allows the mutation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with incorrect scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with valid credentials check + +If the DSS does not allow the mutation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Mutate response format](../fragments/cr/crud/update_format.md) + +Check response format of a mutation. + +#### 🛑 Delete constraint reference with missing credentials check + +If the DSS under test allows the deletion of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with invalid credentials check + +If the DSS under test allows the deletion of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with missing scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with incorrect scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with valid credentials check + +If the DSS does not allow the deletion of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing credentials check + +If the DSS under test allows searching for constraint references without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with invalid credentials check + +If the DSS under test allows searching for constraint references with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing scope check + +If the DSS under test allows searching for constraint references with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with incorrect scope check + +If the DSS under test allows searching for constraint references with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with valid credentials check + +If the DSS does not allow searching for constraint references when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + ## [Cleanup](../clean_workspace.md) The cleanup phase of this test scenario removes the subscription with the known test ID if it has not been removed before. diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py index d725dd9742..26e242ace5 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py @@ -17,6 +17,9 @@ ) from monitoring.uss_qualifier.resources.interuss.id_generator import IDGeneratorResource from monitoring.uss_qualifier.scenarios.astm.utm.dss import test_step_fragments +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.cr_api_validator import ( + ConstraintRefAuthValidator, +) from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( GenericAuthValidator, ) @@ -37,7 +40,7 @@ class AuthenticationValidation(TestScenario): A scenario that verifies that the DSS properly authenticates requests to all its endpoints, and properly validates the scopes of the requests depending on the action being performed. - Note that this scenario does not verif that a DSS only allows an entity owner to modify the: + Note that this scenario does not verif that a DSS only allows an entity owner to mutate or delete them:: this is covered in other scenarios. """ @@ -50,10 +53,6 @@ class AuthenticationValidation(TestScenario): _test_id: str """Base identifier for the entities that will be created""" - _sub_validator: SubscriptionAuthValidator - _oir_validator: OperationalIntentRefAuthValidator - _sub_params: SubscriptionParams - def __init__( self, dss: DSSInstanceResource, @@ -68,34 +67,59 @@ def __init__( but has no other requirements. """ super().__init__() - # This is the proper scope for interactions with the DSS in this scenario - scopes = {Scope.StrategicCoordination: "create and delete subscriptions"} + # This is the proper scope for interactions with the DSS for subscriptions and operational intent + # references in this scenario + scd_scopes = { + Scope.StrategicCoordination: "create and delete subscriptions and operational intent resources" + } + + constraints_scopes = { + Scope.ConstraintManagement: "Create, update, and delete constraints", + } + + # For the 'wrong' scope for SCD endpoints, we pick anything from the available scopes that isn't the SCD or empty scope: + available_scopes_scd = dss.get_authorized_scopes() + available_scopes_scd.discard(Scope.StrategicCoordination) + available_scopes_scd.discard("") + + if len(available_scopes_scd) > 0: + # Sort the scopes to obtain a deterministic order, pick the first one + available_scopes_scd = sorted(available_scopes_scd) + self._wrong_scope_for_scd = available_scopes_scd[0] + scd_scopes[ + self._wrong_scope_for_scd + ] = "Attempt to query subscriptions and OIRs with wrong scope" + else: + self._wrong_scope_for_scd = None - # For the 'wrong' scope we pick anything from the available scopes that isn't the SCD or empty scope: - available_scopes = dss.get_authorized_scopes() - available_scopes.discard(Scope.StrategicCoordination) - available_scopes.discard("") + # TODO can we assume that constraint management/processing scopes will always be obtainable for a test? + # If not, we can easily make this part of the scenario optional. + # For the 'wrong' scope for constraints endpoints, we pick anything from the available scopes that isn't constraint management, processing or empty scope: + available_scopes_constraints = dss.get_authorized_scopes() + available_scopes_constraints.discard(Scope.ConstraintManagement) + available_scopes_constraints.discard(Scope.ConstraintProcessing) + available_scopes_constraints.discard("") - if len(available_scopes) > 0: + if len(available_scopes_constraints) > 0: # Sort the scopes to obtain a deterministic order, pick the first one - available_scopes = sorted(available_scopes) - self._wrong_scope = available_scopes[0] - scopes[ - self._wrong_scope - ] = "Attempt to query subscriptions with wrong scope" - else: - self._wrong_scope = None + available_scopes_constraints = sorted(available_scopes_constraints) + self._wrong_scope_for_constraints = available_scopes_constraints[0] + constraints_scopes[ + self._wrong_scope_for_constraints + ] = "Attempt to query constraints with wrong scope" self._test_missing_scope = False if dss.can_use_scope(""): - scopes[""] = "Attempt to query subscriptions with missing scope" + scd_scopes[""] = "Attempt to query subscriptions with missing scope" + constraints_scopes[""] = "Attempt to query constraints with missing scope" self._test_missing_scope = True # Note: .get_instance should be called once we know every scope we will need, # in order to guarantee that they are indeed available. - self._dss = dss.get_instance(scopes) + self._scd_dss = dss.get_instance(scd_scopes) + self._constraints_dss = dss.get_instance(constraints_scopes) - self._pid = [self._dss.participant_id] + self._pid = [self._scd_dss.participant_id] self._test_id = id_generator.id_factory.make_id(self.SUB_TYPE) self._planning_area = planning_area.specification @@ -106,63 +130,28 @@ def __init__( ) # Session that won't provide a token at all - self._no_auth_session = UTMClientSession(self._dss.base_url, auth_adapter=None) + self._no_auth_session = UTMClientSession( + self._scd_dss.base_url, auth_adapter=None + ) # Session that should provide a well-formed token with a wrong signature self._invalid_token_session = UTMClientSession( - self._dss.base_url, auth_adapter=InvalidTokenSignatureAuth() + self._scd_dss.base_url, auth_adapter=InvalidTokenSignatureAuth() ) def run(self, context: ExecutionContext): - generic_validator = GenericAuthValidator( - self, self._dss, Scope.StrategicCoordination - ) - - self._sub_validator = SubscriptionAuthValidator( - scenario=self, - generic_validator=generic_validator, - dss=self._dss, - test_id=self._test_id, - planning_area=self._planning_area, - planning_area_volume4d=self._planning_area_volume4d, - no_auth_session=self._no_auth_session, - invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope, - test_missing_scope=self._test_missing_scope, - ) - - self._oir_validator = OperationalIntentRefAuthValidator( - scenario=self, - generic_validator=generic_validator, - dss=self._dss, - test_id=self._test_id, - planning_area=self._planning_area, - planning_area_volume4d=self._planning_area_volume4d, - no_auth_session=self._no_auth_session, - invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope, - test_missing_scope=self._test_missing_scope, - ) - - self._sub_params = self._planning_area.get_new_subscription_params( - subscription_id=self._test_id, - # Set this slightly in the past: we will update the subscriptions - # to a later value that still needs to be roughly 'now' without getting into the future - start_time=datetime.now().astimezone() - timedelta(seconds=10), - duration=timedelta(minutes=45), - # This is a planning area without constraint processing - notify_for_op_intents=True, - notify_for_constraints=False, + scd_generic_validator = GenericAuthValidator( + self, self._scd_dss, Scope.StrategicCoordination ) self.begin_test_scenario(context) self._setup_case() self.begin_test_case("Endpoint authorization") - if self._wrong_scope: + if self._wrong_scope_for_scd: self.record_note( "wrong_scope", - f"Incorrect scope testing enabled with scope {self._wrong_scope}.", + f"Incorrect scope testing enabled with scope {self._wrong_scope_for_scd}.", ) else: self.record_note("wrong_scope", "Incorrect scope testing disabled.") @@ -173,12 +162,57 @@ def run(self, context: ExecutionContext): self.record_note("missing_scope", "Missing scope testing disabled.") self.begin_test_step("Subscription endpoints authentication") - self._sub_validator.verify_sub_endpoints_authentication() + + sub_validator = SubscriptionAuthValidator( + scenario=self, + generic_validator=scd_generic_validator, + dss=self._scd_dss, + test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_scd, + test_missing_scope=self._test_missing_scope, + ) + sub_validator.verify_sub_endpoints_authentication() self.end_test_step() self.begin_test_step("Operational intents endpoints authentication") - self._oir_validator.verify_oir_endpoints_authentication() + # The validator relies on the 'current' time, so it should be instantiated + # just before being run + oir_validator = OperationalIntentRefAuthValidator( + scenario=self, + generic_validator=scd_generic_validator, + dss=self._scd_dss, + test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_scd, + test_missing_scope=self._test_missing_scope, + ) + oir_validator.verify_oir_endpoints_authentication() + self.end_test_step() + + self.begin_test_step("Constraint reference endpoints authentication") + cr_validator = ConstraintRefAuthValidator( + self, + GenericAuthValidator( + self, self._constraints_dss, Scope.ConstraintManagement + ), + self._constraints_dss, + self._test_id, + self._planning_area, + self._planning_area_volume4d, + self._no_auth_session, + self._invalid_token_session, + self._wrong_scope_for_scd, + self._test_missing_scope, + ) + cr_validator.verify_cr_endpoints_authentication() self.end_test_step() self.end_test_case() @@ -203,13 +237,13 @@ def _ensure_test_entities_dont_exist(self): # Drop OIR's first: subscriptions may be tied to them and can't be deleted # as long as they exist - test_step_fragments.cleanup_op_intent(self, self._dss, self._test_id) - test_step_fragments.cleanup_sub(self, self._dss, self._test_id) + test_step_fragments.cleanup_op_intent(self, self._scd_dss, self._test_id) + test_step_fragments.cleanup_sub(self, self._scd_dss, self._test_id) def _ensure_no_active_subs_exist(self): test_step_fragments.cleanup_active_subs( self, - self._dss, + self._scd_dss, self._planning_area_volume4d, ) diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py new file mode 100644 index 0000000000..60b8b98205 --- /dev/null +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py @@ -0,0 +1,559 @@ +from datetime import datetime, timedelta +from typing import Optional + +from implicitdict import ImplicitDict, StringBasedDateTime +from uas_standards.astm.f3548.v21.api import ( + OPERATIONS, + OperationID, + OperationalIntentState, + ChangeOperationalIntentReferenceResponse, + PutOperationalIntentReferenceParameters, + Time, + QueryOperationalIntentReferenceParameters, + PutConstraintReferenceParameters, + ChangeConstraintReferenceResponse, + QueryConstraintReferenceParameters, +) + +from monitoring.monitorlib import fetch +from monitoring.monitorlib.fetch import QueryType, QueryError +from monitoring.monitorlib.geotemporal import Volume4D +from monitoring.monitorlib.infrastructure import UTMClientSession +from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance +from monitoring.uss_qualifier.resources.astm.f3548.v21.planning_area import ( + PlanningAreaSpecification, +) +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( + GenericAuthValidator, +) +from monitoring.uss_qualifier.scenarios.scenario import TestScenario, PendingCheck + +TIME_TOLERANCE_SEC = 1 + + +class ConstraintRefAuthValidator: + def __init__( + self, + scenario: TestScenario, + generic_validator: GenericAuthValidator, + dss: DSSInstance, + test_id: str, + planning_area: PlanningAreaSpecification, + planning_area_volume4d: Volume4D, + no_auth_session: UTMClientSession, + invalid_token_session: UTMClientSession, + test_wrong_scope: Optional[str] = None, + test_missing_scope: bool = False, + ): + """ + + Args: + scenario: Scenario on which the checks will be done + generic_validator: Provides generic verification methods for DSS API calls + dss: the DSS instance being tested + test_id: identifier to use for the OIRs that will be created + planning_area: the planning area to use for the subscriptions + planning_area_volume4d: a volume 4d encompassing the planning area + no_auth_session: an unauthenticated session + invalid_token_session: a session using a well-formed token that has an invalid signature + test_wrong_scope: a valid scope that is not allowed to perform operations on subscriptions, if available. + If None, checks using a wrong scope will be skipped. + test_missing_scope: if True, will attempt to perform operations without specifying a scope using the valid credentials. + """ + self._scenario = scenario + self._gen_val = generic_validator + self._dss = dss + self._pid = dss.participant_id + self._test_id = test_id + self._planning_area = planning_area + + time_start = datetime.now().astimezone() - timedelta(seconds=10) + time_end = time_start + timedelta(minutes=20) + + self._cr_params = planning_area.get_new_constraint_ref_params( + time_start=time_start, + time_end=time_end, + ) + self._planning_area_volume4d = planning_area_volume4d + self._no_auth_session = no_auth_session + self._invalid_token_session = invalid_token_session + + self._test_wrong_scope = test_wrong_scope + self._test_missing_scope = test_missing_scope + + def verify_cr_endpoints_authentication(self): + self._verify_cr_creation() + self._verify_cr_get() + self._verify_cr_mutation() + self._verify_cr_deletion() + self._verify_cr_search() + + def _verify_cr_creation(self): + op = OPERATIONS[OperationID.CreateConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + json=self._cr_params, + query_type=QueryType.F3548v21DSSCreateConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No auth + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + # Invalid token + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Create constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + # Valid credentials but missing scope: + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + # Valid credentials but wrong scope: + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Create constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + # Valid credentials + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 201: # As specified in OpenAPI spec + check.record_failed( + summary=f"Expected 201, got {valid_q.status_code}", + details=f"Error message: {valid_q.error_message}", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Create constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + oir_resp = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + # Save the current OIR + self._current_cr = oir_resp.constraint_reference + + def _verify_cr_get(self): + op = OPERATIONS[OperationID.GetConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + query_type=QueryType.F3548v21DSSGetConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No Auth + query_no_auth = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing credentials", self._pid + ) as check: + if query_no_auth.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_no_auth.status_code}", + query_timestamps=[query_no_auth.request.timestamp], + ) + self._gen_val.verify_4xx_response(query_no_auth) + + # Invalid token + query_invalid_token = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Get constraint reference with invalid credentials", self._pid + ) as check: + if query_invalid_token.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_invalid_token.status_code}", + query_timestamps=[query_invalid_token.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_invalid_token) + + # Valid credentials but missing scope + if self._test_missing_scope: + query_missing_scope = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing scope", self._pid + ) as check: + if query_missing_scope.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_missing_scope.status_code}", + query_timestamps=[query_missing_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_missing_scope) + + # Valid credentials but wrong scope + if self._test_wrong_scope: + query_wrong_scope = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Get constraint reference with incorrect scope", self._pid + ) as check: + if query_wrong_scope.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {query_wrong_scope.status_code}", + query_timestamps=[query_wrong_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_wrong_scope) + + # Valid credentials + query_valid_auth = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with valid credentials", self._pid + ) as check: + if query_valid_auth.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {query_valid_auth.status_code}", + query_timestamps=[query_valid_auth.request.timestamp], + ) + + def _verify_cr_mutation(self): + op = OPERATIONS[OperationID.UpdateConstraintReference] + new_params = PutConstraintReferenceParameters(**self._cr_params) + updated_volume = new_params.extents[0] + new_end = updated_volume.time_end.value.datetime - timedelta(seconds=10) + updated_volume.time_end = Time(value=StringBasedDateTime(new_end)) + new_params.extents = [updated_volume] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + json=new_params, + query_type=QueryType.F3548v21DSSUpdateConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Mutate constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_oir_not_updated(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + details=f"Mutation is expected to have succeeded, but got status {valid_q.status_code} with error {valid_q.error_message} instead", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Mutate constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + parsed_oir = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = parsed_oir.constraint_reference + + def _verify_cr_deletion(self): + op = OPERATIONS[OperationID.DeleteConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + query_type=QueryType.F3548v21DSSDeleteConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Delete constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = None + + def _verify_cr_search(self): + op = OPERATIONS[OperationID.QueryConstraintReferences] + query_kwargs = dict( + verb=op.verb, + url=op.path, + query_type=QueryType.F3548v21DSSQueryConstraintReferences, + json=QueryConstraintReferenceParameters( + area_of_interest=self._planning_area_volume4d.to_f3548v21() + ), + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing credentials", + self._pid, + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Search constraint references with invalid credentials", + self._pid, + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Search constraint references with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + def _sanity_check_cr_not_created( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + _, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + if qe.cause_status_code != 404: + check.record_failed( + summary="OIR was created by an unauthorized request.", + details="The Operational Intent Reference should not have been created, as the creation attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + qe.cause.request.timestamp, + ], + ) + self._gen_val.verify_4xx_response(qe.cause) + + def _sanity_check_oir_not_updated( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + oir, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + if ( + abs( + oir.time_end.value.datetime + - self._current_cr.time_end.value.datetime + ).total_seconds() + > TIME_TOLERANCE_SEC + ): + check.record_failed( + summary="OIR was updated by an unauthorized request.", + details=f"The Constraint Reference with id {self._test_id} should not have been updated, as the update attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + sanity_check.request.timestamp, + ], + ) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + check.record_failed( + summary="Could not fetch CR to confirm it has not been mutated", + details=f"The Constraint Reference with id {self._test_id} could not be fetched to confirm it has not been mutated: {qe}", + query_timestamps=[ + creation_q.request.timestamp, + qe.queries[0].request.timestamp, + ], + )