Skip to content

Commit

Permalink
Fix SA2.0 usage in tool_shed.webapp.security
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Oct 6, 2023
1 parent bdcd16e commit 9d17792
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions lib/tool_shed/webapp/security/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
from typing import List

from sqlalchemy import (
and_,
false,
select,
)

from galaxy.model.base import transaction
from galaxy.util import listify
from galaxy.util.bunch import Bunch
from tool_shed.webapp.model import (
Group,
Role,
)

IUC_NAME = "Intergalactic Utilities Commission"

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -159,16 +165,7 @@ def get_item_actions(self, action, item):
return [permission for permission in item.actions if permission.action == action.action]

def get_private_user_role(self, user, auto_create=False):
role = (
self.sa_session.query(self.model.Role)
.filter(
and_(
self.model.Role.table.c.name == user.email,
self.model.Role.table.c.type == self.model.Role.types.PRIVATE,
)
)
.first()
)
role = _get_private_user_role(self.sa_session, user.email)
if not role:
if auto_create:
return self.create_private_user_role(user)
Expand Down Expand Up @@ -276,16 +273,7 @@ def user_can_import_repository_archive(self, user, archive_owner):
if user.username == archive_owner:
return True
# A member of the IUC is authorized to create new repositories that are owned by another user.
iuc_group = (
self.sa_session.query(self.model.Group)
.filter(
and_(
self.model.Group.table.c.name == "Intergalactic Utilities Commission",
self.model.Group.table.c.deleted == false(),
)
)
.first()
)
iuc_group = get_iuc_group(self.sa_session)
if iuc_group is not None:
for uga in iuc_group.users:
if uga.user.id == user.id:
Expand All @@ -300,3 +288,13 @@ def get_permitted_actions(filter=None):
tmp_bunch = Bunch()
[tmp_bunch.__dict__.__setitem__(k, v) for k, v in RBACAgent.permitted_actions.items() if k.startswith(filter)]
return tmp_bunch


def get_iuc_group(session):
stmt = select(Group).where(Group.name == IUC_NAME).where(Group.deleted == false()).limit(1)
return session.scalars(stmt).first()


def _get_private_user_role(session, user_email):
stmt = select(Role).where(Role.name == user_email).where(Role.type == Role.types.PRIVATE).limit(1)
return session.scalars(stmt).first()

0 comments on commit 9d17792

Please sign in to comment.