Skip to content

Commit

Permalink
Revamp the can delete permission check to match reality. (#2231)
Browse files Browse the repository at this point in the history
* Allow namespace owners to delete their collections.
* A collection publisher can now also delete their collections.

Revamp related dab rbac integration tests.

No-Issue

Signed-off-by: James Tanner <[email protected]>
  • Loading branch information
jctanner authored Aug 20, 2024
1 parent 4281733 commit 6ca81e5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 165 deletions.
36 changes: 23 additions & 13 deletions galaxy_ng/app/access_control/access_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,24 +261,34 @@ def v3_can_view_repo_content(self, request, view, action):
return True

def v3_can_destroy_collections(self, request, view, action):
SOCIAL_AUTH_GITHUB_KEY = settings.get("SOCIAL_AUTH_GITHUB_KEY", default=None)
SOCIAL_AUTH_GITHUB_SECRET = settings.get("SOCIAL_AUTH_GITHUB_SECRET", default=None)
is_social_auth = all([SOCIAL_AUTH_GITHUB_KEY, SOCIAL_AUTH_GITHUB_SECRET])

user = request.user
perm = "ansible.delete_collection"
social_perm = "galaxy.change_namespace"
collection = view.get_object()

# first check for global permissions ...
for delete_permission in ["galaxy.change_namespace", "ansible.delete_collection"]:
if user.has_perm(delete_permission):
return True

# could be a collection or could be a collectionversion ...
obj = view.get_object()
model_name = obj.__class__.__name__
if model_name == 'Collection':
collection = obj
elif model_name == 'CollectionVersion':
collection = obj.collection
else:
raise Exception(
f'model type {model_name} is not suitable for v3_can_destroy_collections'
)
namespace = models.Namespace.objects.get(name=collection.namespace)

if not is_social_auth and user.has_perm(perm) and self.v3_can_view_repo_content(
request,
view,
action,
):
# check namespace object level permissions ...
if user.has_perm("galaxy.change_namespace", namespace):
return True
elif is_social_auth and user.has_perm(social_perm, namespace):

# check collection object level permissions ...
if user.has_perm("ansible.delete_collection", collection):
return True

return False

def v3_can_view_users(self, request, view, action):
Expand Down
2 changes: 2 additions & 0 deletions galaxy_ng/tests/integration/api/test_rbac_roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,14 @@
upload_collection_to_custom_staging_repo,
deprecate_collections,
undeprecate_collections,
delete_collection,
},
"galaxy.collection_publisher": {
create_collection_namespace,
change_collection_namespace,
upload_collection_to_namespace,
upload_collection_to_custom_staging_repo,
delete_collection,
deprecate_collections,
undeprecate_collections,
},
Expand Down
249 changes: 97 additions & 152 deletions galaxy_ng/tests/integration/dab/test_dab_rbac.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import os
from collections import namedtuple

import pytest

from galaxykit.client import GalaxyClient
from galaxykit.collections import upload_test_collection
from galaxykit.utils import wait_for_task

from ..utils import set_certification


pytestmark = pytest.mark.qa # noqa: F821
Expand Down Expand Up @@ -36,195 +39,99 @@ def test_dab_roledefs_match_pulp_roles(galaxy_client):
'''


@pytest.mark.skip(reason=(
"the galaxy.collection_namespace_owner role is global"
" and does not allow object assignment"
))
@pytest.mark.deployment_standalone
def test_dab_rbac_namespace_owner_by_user(
@pytest.mark.parametrize("use_team", [False, True])
def test_dab_rbac_namespace_owner_by_user_or_team(
use_team,
settings,
ansible_config,
galaxy_client,
random_namespace,
random_username
):
"""Tests the galaxy.system_auditor role can be added to a user and has the right perms."""
"""
Integration test to assert granting an object level namespace owner
role definition to a user gives them the ability to upload, update
and delete collections in the namespace and to alter the namespace.
* Assumes that galaxy.collection_namespace_owner roledef exists
* Assumes that galaxy.collection_namespace_owner lets the user change the
namespace's company name
* Assumes having galaxy.collection_namespace_owner implies a user can upload
* Assumes having galaxy.collection_namespace_owner implies a user can delete
* Assumes deletion is permissible even if the namespace owner may not be able
to view a private repository that includes their collection.
"""

if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:
pytest.skip("this test relies on local resource creation")

gc = galaxy_client("admin", ignore_cache=True)

if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:
if not os.environ.get("JWT_PROXY"):
pytest.skip(reason="this only works with the jwtproxy")
# create the user in the proxy ...
gc.post(
"/api/gateway/v1/users/",
body=json.dumps({"username": random_username, "password": "redhat1234"})
)
else:
# create the user in ui/v2 ...
gc.post(
"_ui/v2/users/",
body=json.dumps({
"username": random_username,
"email": random_username + '@localhost',
"password": "redhat1234"}
)
# create the user in ui/v2 ...
gc.post(
"_ui/v2/users/",
body=json.dumps({
"username": random_username,
"email": random_username + '@localhost',
"password": "redhat1234"}
)
)

# get the user's galaxy level details ...
auth = {'username': random_username, 'password': 'redhat1234'}
ugc = GalaxyClient(gc.galaxy_root, auth=auth)
me_ds = ugc.get('_ui/v1/me/')
uid = me_ds['id']

# find the role for namespace owner ...
rd = gc.get('_ui/v2/role_definitions/?name=galaxy.collection_namespace_owner')
role_id = rd['results'][0]['id']

# assign the user role ...
payload = {
'user': me_ds['id'],
'role_definition': role_id,
'content_type': 'galaxy.namespace',
'object_id': random_namespace['id'],
}
gc.post('_ui/v2/role_user_assignments/', body=payload)

# try to update the namespace ...
ugc.put(
f"_ui/v1/namespaces/{random_namespace['name']}/",
body=json.dumps({
"name": random_namespace['name'],
"company": "foobar",
})
)

# try to upload a collection as the user...
upload_test_collection(ugc, namespace=random_namespace['name'])


@pytest.mark.deployment_standalone
@pytest.mark.skip(reason=(
"the galaxy.collection_namespace_owner role is global"
" and does not allow object assignment"
))
def test_dab_rbac_namespace_owner_by_team(
settings,
galaxy_client,
random_namespace,
random_username
):
"""Tests the galaxy.system_auditor role can be added to a user and has the right perms."""

if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:
pytest.skip("galaxykit uses drf tokens, which bypass JWT auth and claims processing")

org_name = random_username.replace('user_', 'org_')
team_name = random_username.replace('user_', 'team_')

gc = galaxy_client("admin", ignore_cache=True)

# make the user ...
if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:
if not os.environ.get("JWT_PROXY"):
pytest.skip(reason="this only works with the jwtproxy")
# create the user in the proxy ...
gc.post(
"/api/gateway/v1/users/",
body=json.dumps({"username": random_username, "password": "redhat1234"})
)
if not use_team:
# assign the user role ...
payload = {
'user': uid,
'role_definition': role_id,
'content_type': 'galaxy.namespace',
'object_id': random_namespace['id'],
}
gc.post('_ui/v2/role_user_assignments/', body=payload)

auth = {'username': random_username, 'password': 'redhat1234'}
ugc = GalaxyClient(gc.galaxy_root, auth=auth)
me_ds = ugc.get('_ui/v1/me/')
user_id = me_ds['id']
else:
user_data = gc.post(
"_ui/v2/users/",
body=json.dumps({
"username": random_username,
"password": "redhat1234",
"email": random_username + '@localhost'
})
)
user_id = user_data['id']
auth = {'username': random_username, 'password': 'redhat1234'}
ugc = GalaxyClient(gc.galaxy_root, auth=auth)
org_name = random_username.replace('user_', 'org_')
team_name = random_username.replace('user_', 'team_')

# make the team ...
if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False:

# create an org (Default doesn't sync)
org_data = gc.post(
"/api/gateway/v1/organizations/",
# make the org ...
gc.post(
"_ui/v2/organizations/",
body=json.dumps({"name": org_name})
)
org_id = org_data['id']

# create a team
team_data = gc.post(
"/api/gateway/v1/teams/",
body=json.dumps({"name": team_name, "organization": org_id})
)
team_id = team_data['id']

# get the gateway's userid for this user ...
# FIXME - pagination or filtering support?
users_data = gc.get(
"/api/gateway/v1/users/",
)
gateway_uid = None
for user in users_data['results']:
if user['username'] == random_username:
gateway_uid = user['id']
break

# add user to the team
# Unforunately the API contract for this endpoint is to return
# HTTP/1.1 204 No Content ... which means galaxyclient blows up
# on a non-json response.
try:
gc.post(
f"/api/gateway/v1/teams/{team_id}/users/associate/",
body=json.dumps({"instances": [gateway_uid]})
)
except Exception:
pass

'''
# FIXME - galaxykit only wants to use tokens, which bypasses
# jwt & claims processing
# check memberships in galaxy ...
me_rr = ugc.get(f'_ui/v1/me/', use_token=False)
#user_rr = ugc.get(f'_ui/v2/users/?username={random_username}')
import epdb; epdb.st()
'''

else:
# make the team ...
team_data = gc.post(
"_ui/v2/teams/",
body=json.dumps({
"name": team_name,
"organization": org_name,
})
)
team_id = team_data['id']

# add the user to the team ...
gc.post(
f'_ui/v2/teams/{team_id}/users/associate/',
body=json.dumps({'instances': [user_id]})
body=json.dumps({'instances': [uid]})
)

# find the role for namespace owner ...
rd = gc.get('_ui/v2/role_definitions/?name=galaxy.collection_namespace_owner')
role_id = rd['results'][0]['id']

# assign the team role ...
payload = {
'team': team_id,
'role_definition': role_id,
'object_id': str(random_namespace['id']),
}
gc.post('_ui/v2/role_team_assignments/', body=payload)
# assign the user role ...
payload = {
'team': team_id,
'role_definition': role_id,
'content_type': 'galaxy.namespace',
'object_id': random_namespace['id'],
}
gc.post('_ui/v2/role_team_assignments/', body=payload)

# try to update the namespace ...
ugc.put(
Expand All @@ -235,8 +142,46 @@ def test_dab_rbac_namespace_owner_by_team(
})
)

# we need an artifact-like object for the set_certification function ..
Artifact = namedtuple('Artifact', ['name', 'namespace', 'published', 'version'])

# try to upload a collection as the user...
upload_test_collection(ugc, namespace=random_namespace['name'])
import_result = upload_test_collection(
ugc,
namespace=random_namespace['name'],
version='1.0.0'
)
artifact = Artifact(**import_result)
ir2 = upload_test_collection(
ugc,
namespace=artifact.namespace,
collection_name=artifact.name,
version='1.0.1'
)
artifact2 = Artifact(**ir2)

# certify both versions
if settings.get('GALAXY_REQUIRE_CONTENT_APPROVAL') is True:
set_certification(ansible_config(), gc, artifact)
set_certification(ansible_config(), gc, artifact2)

# try to delete the new version directly ...
cv_url = (
'v3/plugin/ansible/content/published/collections/index/'
+ f'{artifact2.namespace}/{artifact2.name}/versions/{artifact2.version}/'
)
del_task = ugc.delete(cv_url)
result = wait_for_task(ugc, del_task)
assert result['state'] == 'completed'

# try to delete the collection as the user...
collection_url = (
'v3/plugin/ansible/content/published/collections/index/'
+ f'{artifact.namespace}/{artifact.name}/'
)
del_task = ugc.delete(collection_url)
result = wait_for_task(ugc, del_task)
assert result['state'] == 'completed'


@pytest.mark.deployment_standalone
Expand Down

0 comments on commit 6ca81e5

Please sign in to comment.