diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 3f115f3c1cfa..fcb09f1b9a84 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -6,6 +6,7 @@ datetime, ) +import sqlalchemy from boltons.iterutils import remap from pydantic import ( BaseModel, @@ -27,7 +28,6 @@ from galaxy import model from galaxy.exceptions import ( - AdminRequiredException, ItemAccessibilityException, ObjectNotFound, RequestParameterInvalidException, @@ -41,9 +41,13 @@ from galaxy.managers.hdas import HDAManager from galaxy.managers.lddas import LDDAManager from galaxy.model import ( + ImplicitCollectionJobsJobAssociation, Job, JobParameter, User, + Workflow, + WorkflowInvocation, + WorkflowInvocationStep, YIELD_PER_ROWS, ) from galaxy.model.base import transaction @@ -100,76 +104,50 @@ def __init__(self, app: StructuredApp): self.app = app self.dataset_manager = DatasetManager(app) - def index_query(self, trans, payload: JobIndexQueryPayload): + def index_query(self, trans, payload: JobIndexQueryPayload) -> sqlalchemy.engine.Result: is_admin = trans.user_is_admin user_details = payload.user_details - decoded_user_id = payload.user_id + history_id = payload.history_id + workflow_id = payload.workflow_id + invocation_id = payload.invocation_id + search = payload.search + order_by = payload.order_by - if is_admin: - if decoded_user_id is not None: - query = trans.sa_session.query(model.Job).filter(model.Job.user_id == decoded_user_id) - else: - query = trans.sa_session.query(model.Job) - if user_details: - query = query.outerjoin(model.Job.user) - - else: - if user_details: - raise AdminRequiredException("Only admins can index the jobs with user details enabled") - if decoded_user_id is not None and decoded_user_id != trans.user.id: - raise AdminRequiredException("Only admins can index the jobs of others") - query = trans.sa_session.query(model.Job).filter(model.Job.user_id == trans.user.id) - - def build_and_apply_filters(query, objects, filter_func): + def build_and_apply_filters(stmt, objects, filter_func): if objects is not None: if isinstance(objects, (str, date, datetime)): - query = query.filter(filter_func(objects)) + stmt = stmt.where(filter_func(objects)) elif isinstance(objects, list): t = [] for obj in objects: t.append(filter_func(obj)) - query = query.filter(or_(*t)) - return query + stmt = stmt.where(or_(*t)) + return stmt - query = build_and_apply_filters(query, payload.states, lambda s: model.Job.state == s) - query = build_and_apply_filters(query, payload.tool_ids, lambda t: model.Job.tool_id == t) - query = build_and_apply_filters(query, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t)) - query = build_and_apply_filters(query, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin) - query = build_and_apply_filters(query, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax) - - history_id = payload.history_id - workflow_id = payload.workflow_id - invocation_id = payload.invocation_id - if history_id is not None: - query = query.filter(model.Job.history_id == history_id) - if workflow_id or invocation_id: + def add_workflow_jobs(): + wfi_step = select(WorkflowInvocationStep) if workflow_id is not None: wfi_step = ( - trans.sa_session.query(model.WorkflowInvocationStep) - .join(model.WorkflowInvocation) - .join(model.Workflow) - .filter( - model.Workflow.stored_workflow_id == workflow_id, - ) - .subquery() + wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id) ) elif invocation_id is not None: - wfi_step = ( - trans.sa_session.query(model.WorkflowInvocationStep) - .filter(model.WorkflowInvocationStep.workflow_invocation_id == invocation_id) - .subquery() - ) - query1 = query.join(wfi_step) - query2 = query.join(model.ImplicitCollectionJobsJobAssociation).join( + wfi_step = wfi_step.where(WorkflowInvocationStep.workflow_invocation_id == invocation_id) + wfi_step = wfi_step.subquery() + + stmt1 = stmt.join(wfi_step) + stmt2 = stmt.join(ImplicitCollectionJobsJobAssociation).join( wfi_step, - model.ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id + ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id == wfi_step.c.implicit_collection_jobs_id, ) - query = query1.union(query2) + # Ensure the result is models, not tuples + sq = stmt1.union(stmt2).subquery() + # SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`). + # Ref: https://github.com/galaxyproject/galaxy/pull/16852#issuecomment-1804676322 + return select(aliased(Job, sq)), sq.c - search = payload.search - if search: + def add_search_criteria(stmt): search_filters = { "tool": "tool", "t": "tool", @@ -191,36 +169,63 @@ def build_and_apply_filters(query, objects, filter_func): "h": "handler", } ) + assert search parsed_search = parse_filters_structured(search, search_filters) for term in parsed_search.terms: if isinstance(term, FilteredTerm): key = term.filter if key == "user": - query = query.filter(text_column_filter(model.User.email, term)) + stmt = stmt.where(text_column_filter(User.email, term)) elif key == "tool": - query = query.filter(text_column_filter(model.Job.tool_id, term)) + stmt = stmt.where(text_column_filter(Job.tool_id, term)) elif key == "handler": - query = query.filter(text_column_filter(model.Job.handler, term)) + stmt = stmt.where(text_column_filter(Job.handler, term)) elif key == "runner": - query = query.filter(text_column_filter(model.Job.job_runner_name, term)) + stmt = stmt.where(text_column_filter(Job.job_runner_name, term)) elif isinstance(term, RawTextTerm): - columns = [model.Job.tool_id] + columns = [Job.tool_id] if user_details: - columns.append(model.User.email) + columns.append(User.email) if is_admin: - columns.append(model.Job.handler) - columns.append(model.Job.job_runner_name) - query = query.filter(raw_text_column_filter(columns, term)) + columns.append(Job.handler) + columns.append(Job.job_runner_name) + stmt = stmt.filter(raw_text_column_filter(columns, term)) + return stmt + + stmt = select(Job) + + if is_admin: + if decoded_user_id is not None: + stmt = stmt.where(Job.user_id == decoded_user_id) + if user_details: + stmt = stmt.outerjoin(Job.user) + else: + stmt = stmt.where(Job.user_id == trans.user.id) + + stmt = build_and_apply_filters(stmt, payload.states, lambda s: model.Job.state == s) + stmt = build_and_apply_filters(stmt, payload.tool_ids, lambda t: model.Job.tool_id == t) + stmt = build_and_apply_filters(stmt, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t)) + stmt = build_and_apply_filters(stmt, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin) + stmt = build_and_apply_filters(stmt, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax) + + if history_id is not None: + stmt = stmt.where(Job.history_id == history_id) + + order_by_columns = Job + if workflow_id or invocation_id: + stmt, order_by_columns = add_workflow_jobs() + + if search: + stmt = add_search_criteria(stmt) - if payload.order_by == JobIndexSortByEnum.create_time: - order_by = model.Job.create_time.desc() + if order_by == JobIndexSortByEnum.create_time: + stmt = stmt.order_by(order_by_columns.create_time.desc()) else: - order_by = model.Job.update_time.desc() - query = query.order_by(order_by) + stmt = stmt.order_by(order_by_columns.update_time.desc()) - query = query.offset(payload.offset) - query = query.limit(payload.limit) - return query + stmt = stmt.offset(payload.offset) + stmt = stmt.limit(payload.limit) + return trans.sa_session.scalars(stmt) def job_lock(self) -> JobLock: return JobLock(active=self.app.job_manager.job_lock) diff --git a/lib/galaxy/webapps/galaxy/services/jobs.py b/lib/galaxy/webapps/galaxy/services/jobs.py index 98ac5bbb8f9e..5a92d629603a 100644 --- a/lib/galaxy/webapps/galaxy/services/jobs.py +++ b/lib/galaxy/webapps/galaxy/services/jobs.py @@ -2,6 +2,7 @@ from typing import ( Any, Dict, + Optional, ) from galaxy import ( @@ -59,15 +60,18 @@ def index( ): security = trans.security is_admin = trans.user_is_admin - if payload.view == JobIndexViewEnum.admin_job_list: + view = payload.view + if view == JobIndexViewEnum.admin_job_list: payload.user_details = True user_details = payload.user_details - if payload.view == JobIndexViewEnum.admin_job_list and not is_admin: - raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view") - query = self.job_manager.index_query(trans, payload) + decoded_user_id = payload.user_id + + if not is_admin: + self._check_nonadmin_access(view, user_details, decoded_user_id, trans.user.id) + + jobs = self.job_manager.index_query(trans, payload) out = [] - view = payload.view - for job in query.yield_per(model.YIELD_PER_ROWS): + for job in jobs.yield_per(model.YIELD_PER_ROWS): job_dict = job.to_dict(view, system_details=is_admin) j = security.encode_all_ids(job_dict, True) if view == JobIndexViewEnum.admin_job_list: @@ -77,3 +81,14 @@ def index( out.append(j) return out + + def _check_nonadmin_access( + self, view: str, user_details: bool, decoded_user_id: Optional[DecodedDatabaseIdField], trans_user_id: int + ): + """Verify admin-only resources are not being accessed.""" + if view == JobIndexViewEnum.admin_job_list: + raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view") + if user_details: + raise exceptions.AdminRequiredException("Only admins can index the jobs with user details enabled") + if decoded_user_id is not None and decoded_user_id != trans_user_id: + raise exceptions.AdminRequiredException("Only admins can index the jobs of others")