From fbf42bc22a2f45aaef3ad63ba1c893aa362f6d47 Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Fri, 9 Aug 2024 10:17:19 +0200 Subject: [PATCH] [uss_qualifier] don't skip authentication validation if certain scopes are unavailable --- .../authentication_validation.md | 9 +- .../authentication_validation.py | 297 ++++++++++-------- 2 files changed, 166 insertions(+), 140 deletions(-) diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md index 171b6d3b57..fff406295e 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.md @@ -13,17 +13,20 @@ Note that this does not cover authorization. [`DSSInstanceResource`](../../../../../resources/astm/f3548/v21/dss.py) to be tested in this scenario. Note that to benefit from the maximum coverage, the DSS' AuthAdapterResource must be able to obtain credentials -for multiple scopes (so that a wrong scope may be used in place of the correct one) as well as an empty scope (that is, provide credentials where the scope is an empty string). +for multiple scopes (so that a wrong scope may be used in place of the correct one) as well as an empty scope +(that is, provide credentials where the scope is an empty string). This scenario will check for the scope's availability and transparently ignore checks that can't be conducted. -Required scopes for running this scenario: +At least one of the following scopes needs to be available for this scenario to at least partially run: - `utm.strategic_coordination` +- `utm.availability_arbitration` + +In order to verify each endpoint group, all scopes above must be available. Optional scopes that will allow the scenario to provide additional coverage: -- `utm.availability_arbitration` - `""` (empty string) ### id_generator diff --git a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py index 868d5f0685..707187a919 100644 --- a/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py +++ b/monitoring/uss_qualifier/scenarios/astm/utm/dss/authentication/authentication_validation.py @@ -1,18 +1,15 @@ -from datetime import datetime, timedelta +from typing import Optional from uas_standards.astm.f3548.v21.api import UssAvailabilityState - -from monitoring.monitorlib.fetch import QueryError -from monitoring.uss_qualifier.resources.astm.f3548.v21.subscription_params import ( - SubscriptionParams, -) from uas_standards.astm.f3548.v21.constants import ( Scope, ) from monitoring.monitorlib.auth import InvalidTokenSignatureAuth +from monitoring.monitorlib.fetch import QueryError from monitoring.monitorlib.geotemporal import Volume4D from monitoring.monitorlib.infrastructure import UTMClientSession +from monitoring.monitorlib.inspection import fullname from monitoring.prober.infrastructure import register_resource_type from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import ( DSSInstanceResource, @@ -22,6 +19,7 @@ PlanningAreaResource, ) from monitoring.uss_qualifier.resources.interuss.id_generator import IDGeneratorResource +from monitoring.uss_qualifier.resources.resource import MissingResourceError from monitoring.uss_qualifier.scenarios.astm.utm.dss import test_step_fragments from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.availability_api_validator import ( AvailabilityAuthValidator, @@ -46,7 +44,7 @@ class AuthenticationValidation(TestScenario): A scenario that verifies that the DSS properly authenticates requests to all its endpoints, and properly validates the scopes of the requests depending on the action being performed. - Note that this scenario does not verif that a DSS only allows an entity owner to modify the: + Note that this scenario does not verif that a DSS only allows an entity owner to mutate or delete them: this is covered in other scenarios. """ @@ -59,17 +57,8 @@ class AuthenticationValidation(TestScenario): _test_id: str """Base identifier for the entities that will be created""" - _sub_validator: SubscriptionAuthValidator - _oir_validator: OperationalIntentRefAuthValidator - _availability_validator: AvailabilityAuthValidator - - _sub_params: SubscriptionParams - - _scd_dss: DSSInstance - _availability_dss: DSSInstance - - _wrong_scope_for_availability: Scope - _wrong_scope_for_scd: Scope + _scd_dss: Optional[DSSInstance] = None + _availability_dss: Optional[DSSInstance] = None def __init__( self, @@ -86,54 +75,74 @@ def __init__( """ super().__init__() - # This is the proper scope for interactions with the DSS for subscriptions and operational intent - # references in this scenario - scd_scopes = {Scope.StrategicCoordination: "create and delete subscriptions"} - - # For the 'wrong' scope we pick anything from the available scopes that isn't the SCD, CMSA or empty scope: - self._wrong_scope_for_scd = dss.get_authorized_scope_not_in( - [ - Scope.StrategicCoordination, - # CMSA is excluded too, as it is allowed to do certain operations on the OIR endpoints - Scope.ConformanceMonitoringForSituationalAwareness, - "", - ] - ) + # Check if we can test SCD endpoints: + if dss.can_use_scope(Scope.StrategicCoordination): + scd_scopes = { + Scope.StrategicCoordination: "create and delete subscriptions and operational intent resources" + } + self._wrong_scope_for_scd = dss.get_authorized_scope_not_in( + [ + Scope.StrategicCoordination, # Allowed to get and update + # CMSA is excluded too, as it is allowed to do certain operations on the OIR endpoints + Scope.ConformanceMonitoringForSituationalAwareness, + "", # Already Used for empty scope testing + ] + ) - if self._wrong_scope_for_scd is not None: - scd_scopes[ - self._wrong_scope_for_scd - ] = "Attempt to query subscriptions with wrong scope" - - availability_scopes = { - Scope.AvailabilityArbitration: "read and set availability for a USS" - } - - self._wrong_scope_for_availability = dss.get_authorized_scope_not_in( - [ - Scope.AvailabilityArbitration, # Allowed to get and update - Scope.ConformanceMonitoringForSituationalAwareness, # Allowed to get - Scope.StrategicCoordination, # Allowed to get - "", - ] - ) + if self._wrong_scope_for_scd is not None: + scd_scopes[ + self._wrong_scope_for_scd + ] = "Attempt to query subscriptions and OIRs with wrong scope" + else: + scd_scopes = None + self._wrong_scope_for_scd = None + + # Check if we can test availability endpoints: + if dss.can_use_scope(Scope.AvailabilityArbitration): + availability_scopes = { + Scope.AvailabilityArbitration: "read and set availability for a USS" + } + self._wrong_scope_for_availability = dss.get_authorized_scope_not_in( + [ + Scope.AvailabilityArbitration, # Allowed to get and update + Scope.ConformanceMonitoringForSituationalAwareness, # Allowed to get + Scope.StrategicCoordination, # Allowed to get + "", # Already Used for empty scope testing + ] + ) - if self._wrong_scope_for_availability is not None: - availability_scopes[ - self._wrong_scope_for_availability - ] = "Attempt to query availability with wrong scope" + if self._wrong_scope_for_availability is not None: + availability_scopes[ + self._wrong_scope_for_availability + ] = "Attempt to query availability with wrong scope" + else: + availability_scopes = None + self._wrong_scope_for_availability = None self._test_missing_scope = False if dss.can_use_scope(""): - scd_scopes[""] = "Attempt to query subscriptions with missing scope" + # Add empty scope to every map when they are non-empty: + # (Empty means the endpoint group should not be tested at all) + if scd_scopes: + scd_scopes[ + "" + ] = "Attempt to query subscriptions and OIRs with missing scope" + if availability_scopes: + availability_scopes[ + "" + ] = "Attempt to query availability with missing scope" self._test_missing_scope = True # Note: .get_instance should be called once we know every scope we will need, # in order to guarantee that they are indeed available. - self._scd_dss = dss.get_instance(scd_scopes) - self._availability_dss = dss.get_instance(availability_scopes) - - self._pid = [dss.participant_id] + # If the scopes for an endpoint group are empty, it means we're not allowed to obtain them + # and skip .get_instance altogether (otherwise the scenario would not be run) + if scd_scopes: + self._scd_dss = dss.get_instance(scd_scopes) + if availability_scopes: + self._availability_dss = dss.get_instance(availability_scopes) + + self._pid = [self._scd_dss.participant_id] self._test_id = id_generator.id_factory.make_id(self.SUB_TYPE) self._planning_area = planning_area.specification @@ -144,110 +153,124 @@ def __init__( ) # Session that won't provide a token at all - self._no_auth_session = UTMClientSession(dss.base_url, auth_adapter=None) + self._no_auth_session = UTMClientSession( + self._scd_dss.base_url, auth_adapter=None + ) # Session that should provide a well-formed token with a wrong signature self._invalid_token_session = UTMClientSession( - dss.base_url, auth_adapter=InvalidTokenSignatureAuth() + self._scd_dss.base_url, auth_adapter=InvalidTokenSignatureAuth() ) + if ( + not self._scd_dss + and not self._availability_dss + ): + raise MissingResourceError( + f"AuthAdapterResource provided to {fullname(type(self))} has none of the required scopes for this scenario.", + "", + ) + def run(self, context: ExecutionContext): - generic_validator = GenericAuthValidator( + scd_generic_validator = GenericAuthValidator( self, self._scd_dss, Scope.StrategicCoordination ) - self._sub_validator = SubscriptionAuthValidator( - scenario=self, - generic_validator=generic_validator, - dss=self._scd_dss, - test_id=self._test_id, - planning_area=self._planning_area, - planning_area_volume4d=self._planning_area_volume4d, - no_auth_session=self._no_auth_session, - invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope_for_scd, - test_missing_scope=self._test_missing_scope, - ) - - self._oir_validator = OperationalIntentRefAuthValidator( - scenario=self, - generic_validator=generic_validator, - dss=self._scd_dss, - test_id=self._test_id, - planning_area=self._planning_area, - planning_area_volume4d=self._planning_area_volume4d, - no_auth_session=self._no_auth_session, - invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope_for_scd, - test_missing_scope=self._test_missing_scope, - ) - - self._availability_validator = AvailabilityAuthValidator( - scenario=self, - generic_validator=GenericAuthValidator( - self, self._availability_dss, Scope.AvailabilityArbitration - ), - dss=self._availability_dss, - test_id=self._test_id, - no_auth_session=self._no_auth_session, - invalid_token_session=self._invalid_token_session, - test_wrong_scope=self._wrong_scope_for_availability, - test_missing_scope=self._test_missing_scope, - ) - - self._sub_params = self._planning_area.get_new_subscription_params( - subscription_id=self._test_id, - # Set this slightly in the past: we will update the subscriptions - # to a later value that still needs to be roughly 'now' without getting into the future - start_time=datetime.now().astimezone() - timedelta(seconds=10), - duration=timedelta(minutes=45), - # This is a planning area without constraint processing - notify_for_op_intents=True, - notify_for_constraints=False, - ) - self.begin_test_scenario(context) self._setup_case() self.begin_test_case("Endpoint authorization") - if self._wrong_scope_for_scd: - self.record_note( - "wrong_scope_scd", - f"Incorrect scope testing enabled for SCD endpoints with scope {self._wrong_scope_for_scd}.", - ) + if self._test_missing_scope: + self.record_note("missing_scope", "Missing scope testing enabled.") else: + self.record_note("missing_scope", "Missing scope testing disabled.") + + if self._scd_dss: self.record_note( - "wrong_scope_scd", "Incorrect scope testing disabled for SCD endpoints" + "scd", + "Testing Strategic Coordination endpoints (Subscriptions and OIRs)", ) + self.begin_test_step("Subscription endpoints authentication") - if self._wrong_scope_for_availability: - self.record_note( - "wrong_scope_availability", - f"Incorrect scope testing enabled for availability endpoints with scope {self._wrong_scope_for_availability}.", + if self._wrong_scope_for_scd: + self.record_note( + "scd_wrong_scope", + f"Incorrect scope testing enabled with scope {self._wrong_scope_for_scd}.", + ) + else: + self.record_note("scd_wrong_scope", "Incorrect scope testing disabled.") + + sub_validator = SubscriptionAuthValidator( + scenario=self, + generic_validator=scd_generic_validator, + dss=self._scd_dss, + test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_scd, + test_missing_scope=self._test_missing_scope, ) + sub_validator.verify_sub_endpoints_authentication() + + self.end_test_step() + + self.begin_test_step("Operational intents endpoints authentication") + + # The validator relies on the 'current' time, so it should be instantiated + # just before being run + oir_validator = OperationalIntentRefAuthValidator( + scenario=self, + generic_validator=scd_generic_validator, + dss=self._scd_dss, + test_id=self._test_id, + planning_area=self._planning_area, + planning_area_volume4d=self._planning_area_volume4d, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_scd, + test_missing_scope=self._test_missing_scope, + ) + oir_validator.verify_oir_endpoints_authentication() + self.end_test_step() else: self.record_note( - "wrong_scope_availability", - "Incorrect scope testing disabled for availability endpoints", + "scd", + "Skipping Strategic Coordination endpoints (Subscriptions and OIRs)", ) - if self._test_missing_scope: - self.record_note("missing_scope", "Missing scope testing enabled.") - else: - self.record_note("missing_scope", "Missing scope testing disabled.") - - self.begin_test_step("Subscription endpoints authentication") - self._sub_validator.verify_sub_endpoints_authentication() + if self._availability_dss: + self.record_note("availability", "Testing Availability endpoints") + self.begin_test_step("Availability endpoints authentication") - self.end_test_step() - - self.begin_test_step("Operational intents endpoints authentication") - self._oir_validator.verify_oir_endpoints_authentication() - self.end_test_step() + if self._wrong_scope_for_availability: + self.record_note( + "availability_wrong_scope", + f"Incorrect scope testing enabled for availability endpoints with scope {self._wrong_scope_for_availability}.", + ) + else: + self.record_note( + "availability_wrong_scope", + "Incorrect scope testing disabled for availability endpoints", + ) - self.begin_test_step("Availability endpoints authentication") - self._availability_validator.verify_availability_endpoints_authentication() - self.end_test_step() + availability_validator = AvailabilityAuthValidator( + scenario=self, + generic_validator=GenericAuthValidator( + self, self._availability_dss, Scope.AvailabilityArbitration + ), + dss=self._availability_dss, + test_id=self._test_id, + no_auth_session=self._no_auth_session, + invalid_token_session=self._invalid_token_session, + test_wrong_scope=self._wrong_scope_for_availability, + test_missing_scope=self._test_missing_scope, + ) + availability_validator.verify_availability_endpoints_authentication() + self.end_test_step() + else: + self.record_note("availability", "Skipping Availability endpoints") self.end_test_case() self.end_test_scenario()