Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[uss_qualifier] authentication validation: don't skip if a scope is missing, improve dependency on current time #749

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ Note that this does not cover authorization.
[`DSSInstanceResource`](../../../../../resources/astm/f3548/v21/dss.py) to be tested in this scenario.

Note that to benefit from the maximum coverage, the DSS' AuthAdapterResource must be able to obtain credentials
for multiple scopes (so that a wrong scope may be used in place of the correct one) as well as an empty scope (that is, provide credentials where the scope is an empty string).
for multiple scopes (so that a wrong scope may be used in place of the correct one) as well as an empty scope
(that is, provide credentials where the scope is an empty string).

This scenario will check for the scope's availability and transparently ignore checks that can't be conducted.

Required scopes for running this scenario:
At least one of the following scopes needs to be available for this scenario to at least partially run:

- `utm.strategic_coordination`
- `utm.availability_arbitration`

In order to verify each endpoint group, all scopes above must be available.

Optional scopes that will allow the scenario to provide additional coverage:

- `utm.availability_arbitration`
- `""` (empty string)

### id_generator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from datetime import datetime, timedelta
from typing import Optional

from uas_standards.astm.f3548.v21.api import UssAvailabilityState

from monitoring.monitorlib.fetch import QueryError
from monitoring.uss_qualifier.resources.astm.f3548.v21.subscription_params import (
SubscriptionParams,
)
from uas_standards.astm.f3548.v21.constants import (
Scope,
)

from monitoring.monitorlib.auth import InvalidTokenSignatureAuth
from monitoring.monitorlib.fetch import QueryError
from monitoring.monitorlib.geotemporal import Volume4D
from monitoring.monitorlib.infrastructure import UTMClientSession
from monitoring.monitorlib.inspection import fullname
from monitoring.prober.infrastructure import register_resource_type
from monitoring.uss_qualifier.resources.astm.f3548.v21.dss import (
DSSInstanceResource,
Expand All @@ -22,6 +19,7 @@
PlanningAreaResource,
)
from monitoring.uss_qualifier.resources.interuss.id_generator import IDGeneratorResource
from monitoring.uss_qualifier.resources.resource import MissingResourceError
from monitoring.uss_qualifier.scenarios.astm.utm.dss import test_step_fragments
from monitoring.uss_qualifier.scenarios.astm.utm.dss.authentication.availability_api_validator import (
AvailabilityAuthValidator,
Expand All @@ -46,7 +44,7 @@ class AuthenticationValidation(TestScenario):
A scenario that verifies that the DSS properly authenticates requests to all its endpoints,
and properly validates the scopes of the requests depending on the action being performed.

Note that this scenario does not verif that a DSS only allows an entity owner to modify the:
Note that this scenario does not verif that a DSS only allows an entity owner to mutate or delete them:
this is covered in other scenarios.
"""

Expand All @@ -59,17 +57,8 @@ class AuthenticationValidation(TestScenario):
_test_id: str
"""Base identifier for the entities that will be created"""

_sub_validator: SubscriptionAuthValidator
_oir_validator: OperationalIntentRefAuthValidator
_availability_validator: AvailabilityAuthValidator

_sub_params: SubscriptionParams

_scd_dss: DSSInstance
_availability_dss: DSSInstance

_wrong_scope_for_availability: Scope
_wrong_scope_for_scd: Scope
_scd_dss: Optional[DSSInstance] = None
_availability_dss: Optional[DSSInstance] = None

def __init__(
self,
Expand All @@ -86,54 +75,74 @@ def __init__(
"""
super().__init__()

# This is the proper scope for interactions with the DSS for subscriptions and operational intent
# references in this scenario
scd_scopes = {Scope.StrategicCoordination: "create and delete subscriptions"}

# For the 'wrong' scope we pick anything from the available scopes that isn't the SCD, CMSA or empty scope:
self._wrong_scope_for_scd = dss.get_authorized_scope_not_in(
[
Scope.StrategicCoordination,
# CMSA is excluded too, as it is allowed to do certain operations on the OIR endpoints
Scope.ConformanceMonitoringForSituationalAwareness,
"",
]
)
# Check if we can test SCD endpoints:
if dss.can_use_scope(Scope.StrategicCoordination):
scd_scopes = {
Scope.StrategicCoordination: "create and delete subscriptions and operational intent resources"
}
self._wrong_scope_for_scd = dss.get_authorized_scope_not_in(
[
Scope.StrategicCoordination, # Allowed to get and update
# CMSA is excluded too, as it is allowed to do certain operations on the OIR endpoints
Scope.ConformanceMonitoringForSituationalAwareness,
"", # Already Used for empty scope testing
]
)

if self._wrong_scope_for_scd is not None:
scd_scopes[
self._wrong_scope_for_scd
] = "Attempt to query subscriptions with wrong scope"

availability_scopes = {
Scope.AvailabilityArbitration: "read and set availability for a USS"
}

self._wrong_scope_for_availability = dss.get_authorized_scope_not_in(
[
Scope.AvailabilityArbitration, # Allowed to get and update
Scope.ConformanceMonitoringForSituationalAwareness, # Allowed to get
Scope.StrategicCoordination, # Allowed to get
"",
]
)
if self._wrong_scope_for_scd is not None:
scd_scopes[
self._wrong_scope_for_scd
] = "Attempt to query subscriptions and OIRs with wrong scope"
else:
scd_scopes = None
self._wrong_scope_for_scd = None

# Check if we can test availability endpoints:
if dss.can_use_scope(Scope.AvailabilityArbitration):
availability_scopes = {
Scope.AvailabilityArbitration: "read and set availability for a USS"
}
self._wrong_scope_for_availability = dss.get_authorized_scope_not_in(
[
Scope.AvailabilityArbitration, # Allowed to get and update
Scope.ConformanceMonitoringForSituationalAwareness, # Allowed to get
Scope.StrategicCoordination, # Allowed to get
"", # Already Used for empty scope testing
]
)

if self._wrong_scope_for_availability is not None:
availability_scopes[
self._wrong_scope_for_availability
] = "Attempt to query availability with wrong scope"
if self._wrong_scope_for_availability is not None:
availability_scopes[
self._wrong_scope_for_availability
] = "Attempt to query availability with wrong scope"
else:
availability_scopes = None
self._wrong_scope_for_availability = None

self._test_missing_scope = False
if dss.can_use_scope(""):
scd_scopes[""] = "Attempt to query subscriptions with missing scope"
# Add empty scope to every map when they are non-empty:
# (Empty means the endpoint group should not be tested at all)
if scd_scopes:
scd_scopes[
""
] = "Attempt to query subscriptions and OIRs with missing scope"
if availability_scopes:
availability_scopes[
""
] = "Attempt to query availability with missing scope"
self._test_missing_scope = True

# Note: .get_instance should be called once we know every scope we will need,
# in order to guarantee that they are indeed available.
self._scd_dss = dss.get_instance(scd_scopes)
self._availability_dss = dss.get_instance(availability_scopes)

self._pid = [dss.participant_id]
# If the scopes for an endpoint group are empty, it means we're not allowed to obtain them
# and skip .get_instance altogether (otherwise the scenario would not be run)
if scd_scopes:
self._scd_dss = dss.get_instance(scd_scopes)
if availability_scopes:
self._availability_dss = dss.get_instance(availability_scopes)

self._pid = [self._scd_dss.participant_id]
self._test_id = id_generator.id_factory.make_id(self.SUB_TYPE)
self._planning_area = planning_area.specification

Expand All @@ -144,110 +153,121 @@ def __init__(
)

# Session that won't provide a token at all
self._no_auth_session = UTMClientSession(dss.base_url, auth_adapter=None)
self._no_auth_session = UTMClientSession(
self._scd_dss.base_url, auth_adapter=None
)

# Session that should provide a well-formed token with a wrong signature
self._invalid_token_session = UTMClientSession(
dss.base_url, auth_adapter=InvalidTokenSignatureAuth()
self._scd_dss.base_url, auth_adapter=InvalidTokenSignatureAuth()
)

if not self._scd_dss and not self._availability_dss:
raise MissingResourceError(
f"AuthAdapterResource provided to {fullname(type(self))} has none of the required scopes for this scenario.",
"<unknown>",
)

def run(self, context: ExecutionContext):
generic_validator = GenericAuthValidator(
scd_generic_validator = GenericAuthValidator(
self, self._scd_dss, Scope.StrategicCoordination
)

self._sub_validator = SubscriptionAuthValidator(
scenario=self,
generic_validator=generic_validator,
dss=self._scd_dss,
test_id=self._test_id,
planning_area=self._planning_area,
planning_area_volume4d=self._planning_area_volume4d,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_scd,
test_missing_scope=self._test_missing_scope,
)

self._oir_validator = OperationalIntentRefAuthValidator(
scenario=self,
generic_validator=generic_validator,
dss=self._scd_dss,
test_id=self._test_id,
planning_area=self._planning_area,
planning_area_volume4d=self._planning_area_volume4d,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_scd,
test_missing_scope=self._test_missing_scope,
)

self._availability_validator = AvailabilityAuthValidator(
scenario=self,
generic_validator=GenericAuthValidator(
self, self._availability_dss, Scope.AvailabilityArbitration
),
dss=self._availability_dss,
test_id=self._test_id,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_availability,
test_missing_scope=self._test_missing_scope,
)

self._sub_params = self._planning_area.get_new_subscription_params(
subscription_id=self._test_id,
# Set this slightly in the past: we will update the subscriptions
# to a later value that still needs to be roughly 'now' without getting into the future
start_time=datetime.now().astimezone() - timedelta(seconds=10),
duration=timedelta(minutes=45),
# This is a planning area without constraint processing
notify_for_op_intents=True,
notify_for_constraints=False,
)

self.begin_test_scenario(context)
self._setup_case()
self.begin_test_case("Endpoint authorization")

if self._wrong_scope_for_scd:
self.record_note(
"wrong_scope_scd",
f"Incorrect scope testing enabled for SCD endpoints with scope {self._wrong_scope_for_scd}.",
)
if self._test_missing_scope:
self.record_note("missing_scope", "Missing scope testing enabled.")
else:
self.record_note("missing_scope", "Missing scope testing disabled.")

if self._scd_dss:
self.record_note(
"wrong_scope_scd", "Incorrect scope testing disabled for SCD endpoints"
"scd",
"Testing Strategic Coordination endpoints (Subscriptions and OIRs)",
)
self.begin_test_step("Subscription endpoints authentication")

if self._wrong_scope_for_availability:
self.record_note(
"wrong_scope_availability",
f"Incorrect scope testing enabled for availability endpoints with scope {self._wrong_scope_for_availability}.",
if self._wrong_scope_for_scd:
self.record_note(
"scd_wrong_scope",
f"Incorrect scope testing enabled with scope {self._wrong_scope_for_scd}.",
)
else:
self.record_note("scd_wrong_scope", "Incorrect scope testing disabled.")

sub_validator = SubscriptionAuthValidator(
scenario=self,
generic_validator=scd_generic_validator,
dss=self._scd_dss,
test_id=self._test_id,
planning_area=self._planning_area,
planning_area_volume4d=self._planning_area_volume4d,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_scd,
test_missing_scope=self._test_missing_scope,
)
sub_validator.verify_sub_endpoints_authentication()

self.end_test_step()

self.begin_test_step("Operational intents endpoints authentication")

# The validator relies on the 'current' time, so it should be instantiated
# just before being run
oir_validator = OperationalIntentRefAuthValidator(
scenario=self,
generic_validator=scd_generic_validator,
dss=self._scd_dss,
test_id=self._test_id,
planning_area=self._planning_area,
planning_area_volume4d=self._planning_area_volume4d,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_scd,
test_missing_scope=self._test_missing_scope,
)
oir_validator.verify_oir_endpoints_authentication()
self.end_test_step()
else:
self.record_note(
"wrong_scope_availability",
"Incorrect scope testing disabled for availability endpoints",
"scd",
"Skipping Strategic Coordination endpoints (Subscriptions and OIRs)",
)

if self._test_missing_scope:
self.record_note("missing_scope", "Missing scope testing enabled.")
else:
self.record_note("missing_scope", "Missing scope testing disabled.")

self.begin_test_step("Subscription endpoints authentication")
self._sub_validator.verify_sub_endpoints_authentication()
if self._availability_dss:
self.record_note("availability", "Testing Availability endpoints")
self.begin_test_step("Availability endpoints authentication")

self.end_test_step()

self.begin_test_step("Operational intents endpoints authentication")
self._oir_validator.verify_oir_endpoints_authentication()
self.end_test_step()
if self._wrong_scope_for_availability:
self.record_note(
"availability_wrong_scope",
f"Incorrect scope testing enabled for availability endpoints with scope {self._wrong_scope_for_availability}.",
)
else:
self.record_note(
"availability_wrong_scope",
"Incorrect scope testing disabled for availability endpoints",
)

self.begin_test_step("Availability endpoints authentication")
self._availability_validator.verify_availability_endpoints_authentication()
self.end_test_step()
availability_validator = AvailabilityAuthValidator(
scenario=self,
generic_validator=GenericAuthValidator(
self, self._availability_dss, Scope.AvailabilityArbitration
),
dss=self._availability_dss,
test_id=self._test_id,
no_auth_session=self._no_auth_session,
invalid_token_session=self._invalid_token_session,
test_wrong_scope=self._wrong_scope_for_availability,
test_missing_scope=self._test_missing_scope,
)
availability_validator.verify_availability_endpoints_authentication()
self.end_test_step()
else:
self.record_note("availability", "Skipping Availability endpoints")

self.end_test_case()
self.end_test_scenario()
Expand Down
Loading