From 6ca81e512d6632dd03d8db016de94f0c6e028830 Mon Sep 17 00:00:00 2001 From: jctanner Date: Tue, 20 Aug 2024 14:30:30 -0400 Subject: [PATCH] Revamp the can delete permission check to match reality. (#2231) * Allow namespace owners to delete their collections. * A collection publisher can now also delete their collections. Revamp related dab rbac integration tests. No-Issue Signed-off-by: James Tanner --- galaxy_ng/app/access_control/access_policy.py | 36 ++- .../tests/integration/api/test_rbac_roles.py | 2 + .../tests/integration/dab/test_dab_rbac.py | 249 +++++++----------- 3 files changed, 122 insertions(+), 165 deletions(-) diff --git a/galaxy_ng/app/access_control/access_policy.py b/galaxy_ng/app/access_control/access_policy.py index 47e39a267b..54f4810b30 100644 --- a/galaxy_ng/app/access_control/access_policy.py +++ b/galaxy_ng/app/access_control/access_policy.py @@ -261,24 +261,34 @@ def v3_can_view_repo_content(self, request, view, action): return True def v3_can_destroy_collections(self, request, view, action): - SOCIAL_AUTH_GITHUB_KEY = settings.get("SOCIAL_AUTH_GITHUB_KEY", default=None) - SOCIAL_AUTH_GITHUB_SECRET = settings.get("SOCIAL_AUTH_GITHUB_SECRET", default=None) - is_social_auth = all([SOCIAL_AUTH_GITHUB_KEY, SOCIAL_AUTH_GITHUB_SECRET]) - user = request.user - perm = "ansible.delete_collection" - social_perm = "galaxy.change_namespace" - collection = view.get_object() + + # first check for global permissions ... + for delete_permission in ["galaxy.change_namespace", "ansible.delete_collection"]: + if user.has_perm(delete_permission): + return True + + # could be a collection or could be a collectionversion ... + obj = view.get_object() + model_name = obj.__class__.__name__ + if model_name == 'Collection': + collection = obj + elif model_name == 'CollectionVersion': + collection = obj.collection + else: + raise Exception( + f'model type {model_name} is not suitable for v3_can_destroy_collections' + ) namespace = models.Namespace.objects.get(name=collection.namespace) - if not is_social_auth and user.has_perm(perm) and self.v3_can_view_repo_content( - request, - view, - action, - ): + # check namespace object level permissions ... + if user.has_perm("galaxy.change_namespace", namespace): return True - elif is_social_auth and user.has_perm(social_perm, namespace): + + # check collection object level permissions ... + if user.has_perm("ansible.delete_collection", collection): return True + return False def v3_can_view_users(self, request, view, action): diff --git a/galaxy_ng/tests/integration/api/test_rbac_roles.py b/galaxy_ng/tests/integration/api/test_rbac_roles.py index c097d54638..857b27e7d5 100644 --- a/galaxy_ng/tests/integration/api/test_rbac_roles.py +++ b/galaxy_ng/tests/integration/api/test_rbac_roles.py @@ -273,12 +273,14 @@ upload_collection_to_custom_staging_repo, deprecate_collections, undeprecate_collections, + delete_collection, }, "galaxy.collection_publisher": { create_collection_namespace, change_collection_namespace, upload_collection_to_namespace, upload_collection_to_custom_staging_repo, + delete_collection, deprecate_collections, undeprecate_collections, }, diff --git a/galaxy_ng/tests/integration/dab/test_dab_rbac.py b/galaxy_ng/tests/integration/dab/test_dab_rbac.py index 918171ec11..ebb99e7677 100644 --- a/galaxy_ng/tests/integration/dab/test_dab_rbac.py +++ b/galaxy_ng/tests/integration/dab/test_dab_rbac.py @@ -1,10 +1,13 @@ import json -import os +from collections import namedtuple import pytest from galaxykit.client import GalaxyClient from galaxykit.collections import upload_test_collection +from galaxykit.utils import wait_for_task + +from ..utils import set_certification pytestmark = pytest.mark.qa # noqa: F821 @@ -36,174 +39,81 @@ def test_dab_roledefs_match_pulp_roles(galaxy_client): ''' -@pytest.mark.skip(reason=( - "the galaxy.collection_namespace_owner role is global" - " and does not allow object assignment" -)) @pytest.mark.deployment_standalone -def test_dab_rbac_namespace_owner_by_user( +@pytest.mark.parametrize("use_team", [False, True]) +def test_dab_rbac_namespace_owner_by_user_or_team( + use_team, settings, + ansible_config, galaxy_client, random_namespace, random_username ): - """Tests the galaxy.system_auditor role can be added to a user and has the right perms.""" + """ + Integration test to assert granting an object level namespace owner + role definition to a user gives them the ability to upload, update + and delete collections in the namespace and to alter the namespace. + + * Assumes that galaxy.collection_namespace_owner roledef exists + * Assumes that galaxy.collection_namespace_owner lets the user change the + namespace's company name + * Assumes having galaxy.collection_namespace_owner implies a user can upload + * Assumes having galaxy.collection_namespace_owner implies a user can delete + * Assumes deletion is permissible even if the namespace owner may not be able + to view a private repository that includes their collection. + """ + + if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: + pytest.skip("this test relies on local resource creation") gc = galaxy_client("admin", ignore_cache=True) - if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: - if not os.environ.get("JWT_PROXY"): - pytest.skip(reason="this only works with the jwtproxy") - # create the user in the proxy ... - gc.post( - "/api/gateway/v1/users/", - body=json.dumps({"username": random_username, "password": "redhat1234"}) - ) - else: - # create the user in ui/v2 ... - gc.post( - "_ui/v2/users/", - body=json.dumps({ - "username": random_username, - "email": random_username + '@localhost', - "password": "redhat1234"} - ) + # create the user in ui/v2 ... + gc.post( + "_ui/v2/users/", + body=json.dumps({ + "username": random_username, + "email": random_username + '@localhost', + "password": "redhat1234"} ) + ) # get the user's galaxy level details ... auth = {'username': random_username, 'password': 'redhat1234'} ugc = GalaxyClient(gc.galaxy_root, auth=auth) me_ds = ugc.get('_ui/v1/me/') + uid = me_ds['id'] # find the role for namespace owner ... rd = gc.get('_ui/v2/role_definitions/?name=galaxy.collection_namespace_owner') role_id = rd['results'][0]['id'] - # assign the user role ... - payload = { - 'user': me_ds['id'], - 'role_definition': role_id, - 'content_type': 'galaxy.namespace', - 'object_id': random_namespace['id'], - } - gc.post('_ui/v2/role_user_assignments/', body=payload) - - # try to update the namespace ... - ugc.put( - f"_ui/v1/namespaces/{random_namespace['name']}/", - body=json.dumps({ - "name": random_namespace['name'], - "company": "foobar", - }) - ) - - # try to upload a collection as the user... - upload_test_collection(ugc, namespace=random_namespace['name']) - - -@pytest.mark.deployment_standalone -@pytest.mark.skip(reason=( - "the galaxy.collection_namespace_owner role is global" - " and does not allow object assignment" -)) -def test_dab_rbac_namespace_owner_by_team( - settings, - galaxy_client, - random_namespace, - random_username -): - """Tests the galaxy.system_auditor role can be added to a user and has the right perms.""" - - if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: - pytest.skip("galaxykit uses drf tokens, which bypass JWT auth and claims processing") - - org_name = random_username.replace('user_', 'org_') - team_name = random_username.replace('user_', 'team_') - - gc = galaxy_client("admin", ignore_cache=True) - - # make the user ... - if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: - if not os.environ.get("JWT_PROXY"): - pytest.skip(reason="this only works with the jwtproxy") - # create the user in the proxy ... - gc.post( - "/api/gateway/v1/users/", - body=json.dumps({"username": random_username, "password": "redhat1234"}) - ) + if not use_team: + # assign the user role ... + payload = { + 'user': uid, + 'role_definition': role_id, + 'content_type': 'galaxy.namespace', + 'object_id': random_namespace['id'], + } + gc.post('_ui/v2/role_user_assignments/', body=payload) - auth = {'username': random_username, 'password': 'redhat1234'} - ugc = GalaxyClient(gc.galaxy_root, auth=auth) - me_ds = ugc.get('_ui/v1/me/') - user_id = me_ds['id'] else: - user_data = gc.post( - "_ui/v2/users/", - body=json.dumps({ - "username": random_username, - "password": "redhat1234", - "email": random_username + '@localhost' - }) - ) - user_id = user_data['id'] - auth = {'username': random_username, 'password': 'redhat1234'} - ugc = GalaxyClient(gc.galaxy_root, auth=auth) + org_name = random_username.replace('user_', 'org_') + team_name = random_username.replace('user_', 'team_') - # make the team ... - if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: - - # create an org (Default doesn't sync) - org_data = gc.post( - "/api/gateway/v1/organizations/", + # make the org ... + gc.post( + "_ui/v2/organizations/", body=json.dumps({"name": org_name}) ) - org_id = org_data['id'] - # create a team - team_data = gc.post( - "/api/gateway/v1/teams/", - body=json.dumps({"name": team_name, "organization": org_id}) - ) - team_id = team_data['id'] - - # get the gateway's userid for this user ... - # FIXME - pagination or filtering support? - users_data = gc.get( - "/api/gateway/v1/users/", - ) - gateway_uid = None - for user in users_data['results']: - if user['username'] == random_username: - gateway_uid = user['id'] - break - - # add user to the team - # Unforunately the API contract for this endpoint is to return - # HTTP/1.1 204 No Content ... which means galaxyclient blows up - # on a non-json response. - try: - gc.post( - f"/api/gateway/v1/teams/{team_id}/users/associate/", - body=json.dumps({"instances": [gateway_uid]}) - ) - except Exception: - pass - - ''' - # FIXME - galaxykit only wants to use tokens, which bypasses - # jwt & claims processing - - # check memberships in galaxy ... - me_rr = ugc.get(f'_ui/v1/me/', use_token=False) - #user_rr = ugc.get(f'_ui/v2/users/?username={random_username}') - import epdb; epdb.st() - ''' - - else: + # make the team ... team_data = gc.post( "_ui/v2/teams/", body=json.dumps({ "name": team_name, + "organization": org_name, }) ) team_id = team_data['id'] @@ -211,20 +121,17 @@ def test_dab_rbac_namespace_owner_by_team( # add the user to the team ... gc.post( f'_ui/v2/teams/{team_id}/users/associate/', - body=json.dumps({'instances': [user_id]}) + body=json.dumps({'instances': [uid]}) ) - # find the role for namespace owner ... - rd = gc.get('_ui/v2/role_definitions/?name=galaxy.collection_namespace_owner') - role_id = rd['results'][0]['id'] - - # assign the team role ... - payload = { - 'team': team_id, - 'role_definition': role_id, - 'object_id': str(random_namespace['id']), - } - gc.post('_ui/v2/role_team_assignments/', body=payload) + # assign the user role ... + payload = { + 'team': team_id, + 'role_definition': role_id, + 'content_type': 'galaxy.namespace', + 'object_id': random_namespace['id'], + } + gc.post('_ui/v2/role_team_assignments/', body=payload) # try to update the namespace ... ugc.put( @@ -235,8 +142,46 @@ def test_dab_rbac_namespace_owner_by_team( }) ) + # we need an artifact-like object for the set_certification function .. + Artifact = namedtuple('Artifact', ['name', 'namespace', 'published', 'version']) + # try to upload a collection as the user... - upload_test_collection(ugc, namespace=random_namespace['name']) + import_result = upload_test_collection( + ugc, + namespace=random_namespace['name'], + version='1.0.0' + ) + artifact = Artifact(**import_result) + ir2 = upload_test_collection( + ugc, + namespace=artifact.namespace, + collection_name=artifact.name, + version='1.0.1' + ) + artifact2 = Artifact(**ir2) + + # certify both versions + if settings.get('GALAXY_REQUIRE_CONTENT_APPROVAL') is True: + set_certification(ansible_config(), gc, artifact) + set_certification(ansible_config(), gc, artifact2) + + # try to delete the new version directly ... + cv_url = ( + 'v3/plugin/ansible/content/published/collections/index/' + + f'{artifact2.namespace}/{artifact2.name}/versions/{artifact2.version}/' + ) + del_task = ugc.delete(cv_url) + result = wait_for_task(ugc, del_task) + assert result['state'] == 'completed' + + # try to delete the collection as the user... + collection_url = ( + 'v3/plugin/ansible/content/published/collections/index/' + + f'{artifact.namespace}/{artifact.name}/' + ) + del_task = ugc.delete(collection_url) + result = wait_for_task(ugc, del_task) + assert result['state'] == 'completed' @pytest.mark.deployment_standalone