From 01e627fdd5cbc5dda10050c1a59ab08151c5c82e Mon Sep 17 00:00:00 2001 From: John Davis Date: Tue, 14 Nov 2023 09:26:22 -0500 Subject: [PATCH] Move access permissions checks to service; add typing --- lib/galaxy/managers/jobs.py | 10 ++------ lib/galaxy/webapps/galaxy/services/jobs.py | 27 +++++++++++++++++----- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 3e2513b566f0..fcb09f1b9a84 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -6,6 +6,7 @@ datetime, ) +import sqlalchemy from boltons.iterutils import remap from pydantic import ( BaseModel, @@ -27,7 +28,6 @@ from galaxy import model from galaxy.exceptions import ( - AdminRequiredException, ItemAccessibilityException, ObjectNotFound, RequestParameterInvalidException, @@ -104,7 +104,7 @@ def __init__(self, app: StructuredApp): self.app = app self.dataset_manager = DatasetManager(app) - def index_query(self, trans, payload: JobIndexQueryPayload): + def index_query(self, trans, payload: JobIndexQueryPayload) -> sqlalchemy.engine.Result: is_admin = trans.user_is_admin user_details = payload.user_details decoded_user_id = payload.user_id @@ -192,12 +192,6 @@ def add_search_criteria(stmt): stmt = stmt.filter(raw_text_column_filter(columns, term)) return stmt - if not is_admin: - if user_details: - raise AdminRequiredException("Only admins can index the jobs with user details enabled") - if decoded_user_id is not None and decoded_user_id != trans.user.id: - raise AdminRequiredException("Only admins can index the jobs of others") - stmt = select(Job) if is_admin: diff --git a/lib/galaxy/webapps/galaxy/services/jobs.py b/lib/galaxy/webapps/galaxy/services/jobs.py index 98ac5bbb8f9e..5a92d629603a 100644 --- a/lib/galaxy/webapps/galaxy/services/jobs.py +++ b/lib/galaxy/webapps/galaxy/services/jobs.py @@ -2,6 +2,7 @@ from typing import ( Any, Dict, + Optional, ) from galaxy import ( @@ -59,15 +60,18 @@ def index( ): security = trans.security is_admin = trans.user_is_admin - if payload.view == JobIndexViewEnum.admin_job_list: + view = payload.view + if view == JobIndexViewEnum.admin_job_list: payload.user_details = True user_details = payload.user_details - if payload.view == JobIndexViewEnum.admin_job_list and not is_admin: - raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view") - query = self.job_manager.index_query(trans, payload) + decoded_user_id = payload.user_id + + if not is_admin: + self._check_nonadmin_access(view, user_details, decoded_user_id, trans.user.id) + + jobs = self.job_manager.index_query(trans, payload) out = [] - view = payload.view - for job in query.yield_per(model.YIELD_PER_ROWS): + for job in jobs.yield_per(model.YIELD_PER_ROWS): job_dict = job.to_dict(view, system_details=is_admin) j = security.encode_all_ids(job_dict, True) if view == JobIndexViewEnum.admin_job_list: @@ -77,3 +81,14 @@ def index( out.append(j) return out + + def _check_nonadmin_access( + self, view: str, user_details: bool, decoded_user_id: Optional[DecodedDatabaseIdField], trans_user_id: int + ): + """Verify admin-only resources are not being accessed.""" + if view == JobIndexViewEnum.admin_job_list: + raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view") + if user_details: + raise exceptions.AdminRequiredException("Only admins can index the jobs with user details enabled") + if decoded_user_id is not None and decoded_user_id != trans_user_id: + raise exceptions.AdminRequiredException("Only admins can index the jobs of others")