diff --git a/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py b/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py index 24d99ab831..582a26a5e1 100644 --- a/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py +++ b/monitoring/uss_qualifier/resources/astm/f3548/v21/dss.py @@ -717,7 +717,7 @@ def participant_id(self) -> str: def base_url(self) -> str: return self._specification.base_url - def get_authorized_scope_not_in(self, ignored_scopes: List[str]) -> Optional[str]: + def get_authorized_scope_not_in(self, ignored_scopes: List[str]) -> Optional[Scope]: """Returns a scope that this DSS Resource is allowed to use but that is not any of the ones that are passed in 'ignored_scopes'. If no such scope is found, None is returned. diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md index fff406295e..cea394356f 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md @@ -22,6 +22,7 @@ At least one of the following scopes needs to be available for this scenario to - `utm.strategic_coordination` - `utm.availability_arbitration` +- `utm.constraint_management` In order to verify each endpoint group, all scopes above must be available. @@ -387,6 +388,146 @@ it is in violation of **[astm.f3548.v21.DSS0100,1](../../../../../requirements/a The response to a successful USS Availability Set request is expected to conform to the format defined by the OpenAPI specification under the `A3.1` Annex of ASTM F3548−21, otherwise, the DSS is failing to implement **[astm.f3548.v21.DSS0100,1](../../../../../requirements/astm/f3548/v21.md)**. +### Constraint reference endpoints authentication test step + +#### 🛑 Unauthorized requests return the proper error message body check + +If the DSS under test does not return a proper error message body when an unauthorized request is received, +it fails to properly implement the OpenAPI specification that is part of **[astm.f3548.v21.DSS0005,3](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing credentials check + +If the DSS under test allows the creation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with invalid credentials check + +If the DSS under test allows the creation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with missing scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with incorrect scope check + +If the DSS under test allows the creation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Create constraint reference with valid credentials check + +If the DSS does not allow the creation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Create response format](../fragments/cr/crud/create_format.md) + +Check response format of a creation request. + +#### 🛑 Get constraint reference with missing credentials check + +If the DSS under test allows the fetching of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with invalid credentials check + +If the DSS under test allows the fetching of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with missing scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with incorrect scope check + +If the DSS under test allows the fetching of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Get constraint reference with valid credentials check + +If the DSS does not allow fetching a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing credentials check + +If the DSS under test allows the mutation of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with invalid credentials check + +If the DSS under test allows the mutation of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with missing scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with incorrect scope check + +If the DSS under test allows the mutation of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Mutate constraint reference with valid credentials check + +If the DSS does not allow the mutation of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### [Mutate response format](../fragments/cr/crud/update_format.md) + +Check response format of a mutation. + +#### 🛑 Delete constraint reference with missing credentials check + +If the DSS under test allows the deletion of a constraint reference without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with invalid credentials check + +If the DSS under test allows the deletion of a constraint reference with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with missing scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with incorrect scope check + +If the DSS under test allows the deletion of a constraint reference with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Delete constraint reference with valid credentials check + +If the DSS does not allow the deletion of a constraint reference when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing credentials check + +If the DSS under test allows searching for constraint references without any credentials being presented, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with invalid credentials check + +If the DSS under test allows searching for constraint references with credentials that are well-formed but invalid, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with missing scope check + +If the DSS under test allows searching for constraint references with valid credentials but a missing scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with incorrect scope check + +If the DSS under test allows searching for constraint references with valid credentials but an incorrect scope, +it is in violation of **[astm.f3548.v21.DSS0210,A2-7-2,7](../../../../../requirements/astm/f3548/v21.md)**. + +#### 🛑 Search constraint references with valid credentials check + +If the DSS does not allow searching for constraint references when valid credentials are presented, +it is in violation of **[astm.f3548.v21.DSS0005,1](../../../../../requirements/astm/f3548/v21.md)**. + ## [Cleanup](../clean_workspace.md) ### [Availability can be requested](../fragments/availability/read.md) diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py index aa005154e8..b7f41b145b 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py @@ -24,6 +24,9 @@ from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.availability_api_validator import ( AvailabilityAuthValidator, ) +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.cr_api_validator import ( + ConstraintRefAuthValidator, +) from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( GenericAuthValidator, ) @@ -59,6 +62,7 @@ class AuthenticationValidation(TestScenario): _scd_dss: Optional[DSSInstance] = None _availability_dss: Optional[DSSInstance] = None + _constraints_dss: Optional[DSSInstance] = None def __init__( self, @@ -119,6 +123,27 @@ def __init__( availability_scopes = None self._wrong_scope_for_availability = None + if dss.can_use_scope(Scope.ConstraintManagement): + constraints_scopes = { + Scope.ConstraintManagement: "Create, update, and delete constraints", + } + + self._wrong_scope_for_constraints = dss.get_authorized_scope_not_in( + [ + Scope.ConstraintManagement, # Allowed to get and update + Scope.ConstraintProcessing, # Allowed to get + "", # Already Used for empty scope testing + ] + ) + + if self._wrong_scope_for_constraints is not None: + constraints_scopes[ + self._wrong_scope_for_constraints + ] = "Attempt to query constraints with wrong scope" + else: + constraints_scopes = None + self._wrong_scope_for_constraints = None + self._test_missing_scope = False if dss.can_use_scope(""): # Add empty scope to every map when they are non-empty: @@ -131,6 +156,10 @@ def __init__( availability_scopes[ "" ] = "Attempt to query availability with missing scope" + if constraints_scopes: + constraints_scopes[ + "" + ] = "Attempt to query constraints with missing scope" self._test_missing_scope = True # Note: .get_instance should be called once we know every scope we will need, @@ -139,6 +168,8 @@ def __init__( # and skip .get_instance altogether (otherwise the scenario would not be run) if scd_scopes: self._scd_dss = dss.get_instance(scd_scopes) + if constraints_scopes: + self._constraints_dss = dss.get_instance(constraints_scopes) if availability_scopes: self._availability_dss = dss.get_instance(availability_scopes) @@ -162,7 +193,11 @@ def __init__( self._scd_dss.base_url, auth_adapter=InvalidTokenSignatureAuth() ) - if not self._scd_dss and not self._availability_dss: + if ( + not self._scd_dss + and not self._constraints_dss + and not self._availability_dss + ): raise MissingResourceError( f"AuthAdapterResource provided to {fullname(type(self))} has none of the required scopes for this scenario.", "<unknown>", @@ -269,6 +304,37 @@ def run(self, context: ExecutionContext): else: self.record_note("availability", "Skipping Availability endpoints") + if self._constraints_dss: + self.record_note("constraints", "Testing Constraint Reference endpoints") + self.begin_test_step("Constraint reference endpoints authentication") + if self._wrong_scope_for_constraints: + self.record_note( + "constraints_wrong_scope", + f"Incorrect scope testing enabled with scope {self._wrong_scope_for_constraints}.", + ) + else: + self.record_note( + "constraints_wrong_scope", "Incorrect scope testing disabled." + ) + cr_validator = ConstraintRefAuthValidator( + scenario=self, + generic_validator=GenericAuthValidator( + self, self._constraints_dss, Scope.ConstraintManagement + ), + dss=self._constraints_dss, + test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_constraints, + test_missing_scope=self._test_missing_scope, + ) + cr_validator.verify_cr_endpoints_authentication() + self.end_test_step() + else: + self.record_note("constraints", "Skipping Constraint Reference endpoints") + self.end_test_case() self.end_test_scenario() diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py new file mode 100644 index 0000000000..69591316e8 --- /dev/null +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/cr_api_validator.py @@ -0,0 +1,559 @@ +from datetime import datetime, timedelta +from typing import Optional + +from implicitdict import ImplicitDict, StringBasedDateTime +from uas_standards.astm.f3548.v21.api import ( + OPERATIONS, + OperationID, + OperationalIntentState, + ChangeOperationalIntentReferenceResponse, + PutOperationalIntentReferenceParameters, + Time, + QueryOperationalIntentReferenceParameters, + PutConstraintReferenceParameters, + ChangeConstraintReferenceResponse, + QueryConstraintReferenceParameters, +) + +from monitoring.monitorlib import fetch +from monitoring.monitorlib.fetch import QueryType, QueryError +from monitoring.monitorlib.geotemporal import Volume4D +from monitoring.monitorlib.infrastructure import UTMClientSession +from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import DSSInstance +from monitoring.uss_qualifier.resources.astm.f3548.v21.planning_area import ( + PlanningAreaSpecification, +) +from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.generic import ( + GenericAuthValidator, +) +from monitoring.uss_qualifier.scenarios.scenario import TestScenario, PendingCheck + +TIME_TOLERANCE_SEC = 1 + + +class ConstraintRefAuthValidator: + def __init__( + self, + scenario: TestScenario, + generic_validator: GenericAuthValidator, + dss: DSSInstance, + test_id: str, + planning_area: PlanningAreaSpecification, + planning_area_volume4d: Volume4D, + no_auth_session: UTMClientSession, + invalid_token_session: UTMClientSession, + test_wrong_scope: Optional[str] = None, + test_missing_scope: bool = False, + ): + """ + + Args: + scenario: Scenario on which the checks will be done + generic_validator: Provides generic verification methods for DSS API calls + dss: the DSS instance being tested + test_id: identifier to use for the CRs that will be created + planning_area: the planning area to use for the subscriptions + planning_area_volume4d: a volume 4d encompassing the planning area + no_auth_session: an unauthenticated session + invalid_token_session: a session using a well-formed token that has an invalid signature + test_wrong_scope: a valid scope that is not allowed to perform operations on subscriptions, if available. + If None, checks using a wrong scope will be skipped. + test_missing_scope: if True, will attempt to perform operations without specifying a scope using the valid credentials. + """ + self._scenario = scenario + self._gen_val = generic_validator + self._dss = dss + self._pid = dss.participant_id + self._test_id = test_id + self._planning_area = planning_area + + time_start = datetime.now().astimezone() - timedelta(seconds=10) + time_end = time_start + timedelta(minutes=20) + + self._cr_params = planning_area.get_new_constraint_ref_params( + time_start=time_start, + time_end=time_end, + ) + self._planning_area_volume4d = planning_area_volume4d + self._no_auth_session = no_auth_session + self._invalid_token_session = invalid_token_session + + self._test_wrong_scope = test_wrong_scope + self._test_missing_scope = test_missing_scope + + def verify_cr_endpoints_authentication(self): + self._verify_cr_creation() + self._verify_cr_get() + self._verify_cr_mutation() + self._verify_cr_deletion() + self._verify_cr_search() + + def _verify_cr_creation(self): + op = OPERATIONS[OperationID.CreateConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + json=self._cr_params, + query_type=QueryType.F3548v21DSSCreateConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No auth + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + # Invalid token + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Create constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + # Valid credentials but missing scope: + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Create constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + # Valid credentials but wrong scope: + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Create constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + # Valid credentials + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Create constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 201: # As specified in OpenAPI spec + check.record_failed( + summary=f"Expected 201, got {valid_q.status_code}", + details=f"Error message: {valid_q.error_message}", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Create constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + cr_resp = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + # Save the current CR + self._current_cr = cr_resp.constraint_reference + + def _verify_cr_get(self): + op = OPERATIONS[OperationID.GetConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id), + query_type=QueryType.F3548v21DSSGetConstraintReference, + participant_id=self._dss.participant_id, + ) + + # No Auth + query_no_auth = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing credentials", self._pid + ) as check: + if query_no_auth.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_no_auth.status_code}", + query_timestamps=[query_no_auth.request.timestamp], + ) + self._gen_val.verify_4xx_response(query_no_auth) + + # Invalid token + query_invalid_token = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Get constraint reference with invalid credentials", self._pid + ) as check: + if query_invalid_token.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_invalid_token.status_code}", + query_timestamps=[query_invalid_token.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_invalid_token) + + # Valid credentials but missing scope + if self._test_missing_scope: + query_missing_scope = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Get constraint reference with missing scope", self._pid + ) as check: + if query_missing_scope.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {query_missing_scope.status_code}", + query_timestamps=[query_missing_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_missing_scope) + + # Valid credentials but wrong scope + if self._test_wrong_scope: + query_wrong_scope = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Get constraint reference with incorrect scope", self._pid + ) as check: + if query_wrong_scope.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {query_wrong_scope.status_code}", + query_timestamps=[query_wrong_scope.request.timestamp], + ) + + self._gen_val.verify_4xx_response(query_wrong_scope) + + # Valid credentials + query_valid_auth = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Get constraint reference with valid credentials", self._pid + ) as check: + if query_valid_auth.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {query_valid_auth.status_code}", + query_timestamps=[query_valid_auth.request.timestamp], + ) + + def _verify_cr_mutation(self): + op = OPERATIONS[OperationID.UpdateConstraintReference] + new_params = PutConstraintReferenceParameters(**self._cr_params) + updated_volume = new_params.extents[0] + new_end = updated_volume.time_end.value.datetime - timedelta(seconds=10) + updated_volume.time_end = Time(value=StringBasedDateTime(new_end)) + new_params.extents = [updated_volume] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + json=new_params, + query_type=QueryType.F3548v21DSSUpdateConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._sanity_check_cr_not_created(check, no_auth_q) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._sanity_check_cr_not_updated(check, invalid_token_q) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_updated(check, no_scope_q) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Mutate constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._sanity_check_cr_not_updated(check, wrong_scope_q) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Mutate constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + details=f"Mutation is expected to have succeeded, but got status {valid_q.status_code} with error {valid_q.error_message} instead", + query_timestamps=[valid_q.request.timestamp], + ) + + with self._scenario.check( + "Mutate constraint reference response format conforms to spec", + self._pid, + ) as check: + try: + parsed_cr = ImplicitDict.parse( + valid_q.response.json, ChangeConstraintReferenceResponse + ) + except ValueError as e: + check.record_failed( + summary="Could not parse the response body", + details=f"Failed to parse the response body as a ChangeConstraintReferenceResponse: {e}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = parsed_cr.constraint_reference + + def _verify_cr_deletion(self): + op = OPERATIONS[OperationID.DeleteConstraintReference] + query_kwargs = dict( + verb=op.verb, + url=op.path.format(entityid=self._test_id, ovn=self._current_cr.ovn), + query_type=QueryType.F3548v21DSSDeleteConstraintReference, + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing credentials", self._pid + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with invalid credentials", self._pid + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Delete constraint reference with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Delete constraint reference with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + self._current_cr = None + + def _verify_cr_search(self): + op = OPERATIONS[OperationID.QueryConstraintReferences] + query_kwargs = dict( + verb=op.verb, + url=op.path, + query_type=QueryType.F3548v21DSSQueryConstraintReferences, + json=QueryConstraintReferenceParameters( + area_of_interest=self._planning_area_volume4d.to_f3548v21() + ), + participant_id=self._dss.participant_id, + ) + + no_auth_q = self._gen_val.query_no_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing credentials", + self._pid, + ) as check: + if no_auth_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_auth_q.status_code}", + query_timestamps=[no_auth_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_auth_q) + + invalid_token_q = self._gen_val.query_invalid_token(**query_kwargs) + with self._scenario.check( + "Search constraint references with invalid credentials", + self._pid, + ) as check: + if invalid_token_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {invalid_token_q.status_code}", + query_timestamps=[invalid_token_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(invalid_token_q) + + if self._test_missing_scope: + no_scope_q = self._gen_val.query_missing_scope(**query_kwargs) + with self._scenario.check( + "Search constraint references with missing scope", self._pid + ) as check: + if no_scope_q.status_code != 401: + check.record_failed( + summary=f"Expected 401, got {no_scope_q.status_code}", + query_timestamps=[no_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(no_scope_q) + + if self._test_wrong_scope: + wrong_scope_q = self._gen_val.query_wrong_scope( + scope=self._test_wrong_scope, **query_kwargs + ) + with self._scenario.check( + "Search constraint references with incorrect scope", self._pid + ) as check: + if wrong_scope_q.status_code != 403: + check.record_failed( + summary=f"Expected 403, got {wrong_scope_q.status_code}", + query_timestamps=[wrong_scope_q.request.timestamp], + ) + + self._gen_val.verify_4xx_response(wrong_scope_q) + + valid_q = self._gen_val.query_valid_auth(**query_kwargs) + with self._scenario.check( + "Search constraint references with valid credentials", self._pid + ) as check: + if valid_q.status_code != 200: + check.record_failed( + summary=f"Expected 200, got {valid_q.status_code}", + query_timestamps=[valid_q.request.timestamp], + ) + + def _sanity_check_cr_not_created( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + _, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + if qe.cause_status_code != 404: + check.record_failed( + summary="CR was created by an unauthorized request.", + details="The Operational Intent Reference should not have been created, as the creation attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + qe.cause.request.timestamp, + ], + ) + self._gen_val.verify_4xx_response(qe.cause) + + def _sanity_check_cr_not_updated( + self, check: PendingCheck, creation_q: fetch.Query + ): + try: + cr, sanity_check = self._dss.get_constraint_ref(self._test_id) + self._scenario.record_query(sanity_check) + if ( + abs( + cr.time_end.value.datetime + - self._current_cr.time_end.value.datetime + ).total_seconds() + > TIME_TOLERANCE_SEC + ): + check.record_failed( + summary="CR was updated by an unauthorized request.", + details=f"The Constraint Reference with id {self._test_id} should not have been updated, as the update attempt was not authenticated.", + query_timestamps=[ + creation_q.request.timestamp, + sanity_check.request.timestamp, + ], + ) + except QueryError as qe: + self._scenario.record_queries(qe.queries) + check.record_failed( + summary="Could not fetch CR to confirm it has not been mutated", + details=f"The Constraint Reference with id {self._test_id} could not be fetched to confirm it has not been mutated: {qe}", + query_timestamps=[ + creation_q.request.timestamp, + qe.queries[0].request.timestamp, + ], + )