diff --git a/galaxy_ng/app/migrations/_dab_rbac.py b/galaxy_ng/app/migrations/_dab_rbac.py index f7012fed23..7ea9344c27 100644 --- a/galaxy_ng/app/migrations/_dab_rbac.py +++ b/galaxy_ng/app/migrations/_dab_rbac.py @@ -8,6 +8,23 @@ logger = logging.getLogger(__name__) +PULP_TO_ROLEDEF = { + 'galaxy.auditor': 'Platform Auditor', +} + + +ROLEDEF_TO_PULP = { + 'Platform Auditor': 'galaxy.auditor', +} + + +def pulp_role_to_single_content_type_or_none(pulprole): + content_types = set(perm.content_type for perm in pulprole.permissions.all()) + if len(list(content_types)) == 1: + return list(content_types)[0] + return None + + def create_permissions_as_operation(apps, schema_editor): # TODO: possibly create permissions for more apps here for app_label in {'ansible', 'container', 'core', 'galaxy'}: @@ -54,11 +71,14 @@ def copy_roles_to_role_definitions(apps, schema_editor): dab_perms.append(dabperm) if dab_perms: + roledef_name = PULP_TO_ROLEDEF.get(corerole.name, corerole.name) + content_type = pulp_role_to_single_content_type_or_none(corerole) roledef, created = RoleDefinition.objects.get_or_create( - name=corerole.name, + name=roledef_name, defaults={ 'description': corerole.description or corerole.name, 'managed': corerole.locked, + 'content_type': content_type, } ) if created: diff --git a/galaxy_ng/app/models/__init__.py b/galaxy_ng/app/models/__init__.py index a916315a2a..3c5fc93aa2 100644 --- a/galaxy_ng/app/models/__init__.py +++ b/galaxy_ng/app/models/__init__.py @@ -53,6 +53,7 @@ CollectionRemote, ContainerRegistryRemote, Namespace, + Organization, Team, parent_field_name=None ) diff --git a/galaxy_ng/app/signals/handlers.py b/galaxy_ng/app/signals/handlers.py index b76b3cd96d..26faa1937d 100644 --- a/galaxy_ng/app/signals/handlers.py +++ b/galaxy_ng/app/signals/handlers.py @@ -168,6 +168,13 @@ def rbac_signal_in_progress(): return bool(rbac_state.dab_action or rbac_state.pulp_action) +def pulp_role_to_single_content_type_or_none(pulprole): + content_types = set(perm.content_type for perm in pulprole.permissions.all()) + if len(list(content_types)) == 1: + return list(content_types)[0] + return None + + def copy_permissions_role_to_role(roleA, roleB): """Make permissions on roleB match roleA @@ -217,9 +224,15 @@ def copy_role_to_role_definition(sender, instance, created, **kwargs): roledef_name = PULP_TO_ROLEDEF.get(instance.name, instance.name) rd = RoleDefinition.objects.filter(name=roledef_name).first() if not rd: + content_type = pulp_role_to_single_content_type_or_none(instance) + logger.info( + f'CREATE ROLEDEF name:{roledef_name}' + + f' managed:{instance.locked} ctype:{content_type}' + ) RoleDefinition.objects.create( name=roledef_name, managed=instance.locked, + content_type=content_type, description=instance.description or instance.name, ) # TODO: other fields? like description diff --git a/galaxy_ng/tests/integration/dab/test_dab_rbac.py b/galaxy_ng/tests/integration/dab/test_dab_rbac.py index 578be77183..918171ec11 100644 --- a/galaxy_ng/tests/integration/dab/test_dab_rbac.py +++ b/galaxy_ng/tests/integration/dab/test_dab_rbac.py @@ -431,3 +431,65 @@ def test_dab_team_platform_auditor_bidirectional_sync( # ensure the role was removed pulp_assignments = gc.get(f"pulp/api/v3/groups/{guid}/roles/") assert pulp_assignments['count'] == 0 + + +@pytest.mark.deployment_standalone +def test_dab_user_assignment_filtering_as_user( + settings, + galaxy_client, + random_namespace, + random_username, +): + """ + Integration test to assert a user can be assigned as the owner + of a namespace and then also be able to query their role assignments. + + * This assumes there is a galaxy.collection_namespace_owner roledef + and that it has a content type defined. + * This also assumes the role_user_assignments endpoint is user + accessible and filterable. + * The role_user_assignments endpoint behaves differently for + evaluating a superuser vs a user for access. + """ + if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: + pytest.skip("this test relies on local resource creation") + + gc = galaxy_client("admin", ignore_cache=True) + + # find the namespace owner roledef ... + roledef = gc.get( + '_ui/v2/role_definitions/?name=galaxy.collection_namespace_owner' + )['results'][0] + + # make the user ... + user_data = gc.post( + "_ui/v2/users/", + body=json.dumps({ + "username": random_username, + "password": "redhat1234", + "email": random_username + '@localhost' + }) + ) + uid = user_data['id'] + + # assign the user to the namespace ... + assignment = gc.post( + '_ui/v2/role_user_assignments/', + body=json.dumps({ + 'user': uid, + 'role_definition': roledef['id'], + 'object_id': str(random_namespace['id']), + }) + ) + + # see if we can find the assignment through filtering as the user ... + auth = {'username': random_username, 'password': 'redhat1234'} + ugc = GalaxyClient(gc.galaxy_root, auth=auth) + queryparams = [ + f"object_id={random_namespace['id']}", + f"object_id={random_namespace['id']}&content_type__model=namespace", + ] + for qp in queryparams: + resp = ugc.get(f'_ui/v2/role_user_assignments/?{qp}') + assert resp['count'] == 1 + assert resp['results'][0]['id'] == assignment['id']