Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade job manager's index_query method to SA2.0 #17020

Merged
merged 9 commits into from
Nov 14, 2023
143 changes: 77 additions & 66 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.model import (
ImplicitCollectionJobsJobAssociation,
Job,
JobParameter,
User,
Workflow,
WorkflowInvocation,
WorkflowInvocationStep,
YIELD_PER_ROWS,
)
from galaxy.model.base import transaction
Expand Down Expand Up @@ -103,73 +107,47 @@ def __init__(self, app: StructuredApp):
def index_query(self, trans, payload: JobIndexQueryPayload):
is_admin = trans.user_is_admin
user_details = payload.user_details

decoded_user_id = payload.user_id
history_id = payload.history_id
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
search = payload.search
order_by = payload.order_by

if is_admin:
if decoded_user_id is not None:
query = trans.sa_session.query(model.Job).filter(model.Job.user_id == decoded_user_id)
else:
query = trans.sa_session.query(model.Job)
if user_details:
query = query.outerjoin(model.Job.user)

else:
if user_details:
raise AdminRequiredException("Only admins can index the jobs with user details enabled")
if decoded_user_id is not None and decoded_user_id != trans.user.id:
raise AdminRequiredException("Only admins can index the jobs of others")
query = trans.sa_session.query(model.Job).filter(model.Job.user_id == trans.user.id)

def build_and_apply_filters(query, objects, filter_func):
def build_and_apply_filters(stmt, objects, filter_func):
if objects is not None:
if isinstance(objects, (str, date, datetime)):
query = query.filter(filter_func(objects))
stmt = stmt.where(filter_func(objects))
elif isinstance(objects, list):
t = []
for obj in objects:
t.append(filter_func(obj))
query = query.filter(or_(*t))
return query

query = build_and_apply_filters(query, payload.states, lambda s: model.Job.state == s)
query = build_and_apply_filters(query, payload.tool_ids, lambda t: model.Job.tool_id == t)
query = build_and_apply_filters(query, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t))
query = build_and_apply_filters(query, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin)
query = build_and_apply_filters(query, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax)
stmt = stmt.where(or_(*t))
return stmt

history_id = payload.history_id
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
if history_id is not None:
query = query.filter(model.Job.history_id == history_id)
if workflow_id or invocation_id:
def add_workflow_jobs():
wfi_step = select(WorkflowInvocationStep)
if workflow_id is not None:
wfi_step = (
trans.sa_session.query(model.WorkflowInvocationStep)
.join(model.WorkflowInvocation)
.join(model.Workflow)
.filter(
model.Workflow.stored_workflow_id == workflow_id,
)
.subquery()
wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id)
)
elif invocation_id is not None:
wfi_step = (
trans.sa_session.query(model.WorkflowInvocationStep)
.filter(model.WorkflowInvocationStep.workflow_invocation_id == invocation_id)
.subquery()
)
query1 = query.join(wfi_step)
query2 = query.join(model.ImplicitCollectionJobsJobAssociation).join(
wfi_step = wfi_step.where(WorkflowInvocationStep.workflow_invocation_id == invocation_id)
wfi_step = wfi_step.subquery()

stmt1 = stmt.join(wfi_step)
stmt2 = stmt.join(ImplicitCollectionJobsJobAssociation).join(
wfi_step,
model.ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
== wfi_step.c.implicit_collection_jobs_id,
)
query = query1.union(query2)
# Ensure the result is models, not tuples
sq = stmt1.union(stmt2).subquery()
# SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`).
# Ref: https://github.com/galaxyproject/galaxy/pull/16852#issuecomment-1804676322
return select(aliased(Job, sq)), sq.c

search = payload.search
if search:
def add_search_criteria(stmt):
search_filters = {
"tool": "tool",
"t": "tool",
Expand All @@ -191,36 +169,69 @@ def build_and_apply_filters(query, objects, filter_func):
"h": "handler",
}
)
assert search
parsed_search = parse_filters_structured(search, search_filters)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
if key == "user":
query = query.filter(text_column_filter(model.User.email, term))
stmt = stmt.where(text_column_filter(User.email, term))
elif key == "tool":
query = query.filter(text_column_filter(model.Job.tool_id, term))
stmt = stmt.where(text_column_filter(Job.tool_id, term))
elif key == "handler":
query = query.filter(text_column_filter(model.Job.handler, term))
stmt = stmt.where(text_column_filter(Job.handler, term))
elif key == "runner":
query = query.filter(text_column_filter(model.Job.job_runner_name, term))
stmt = stmt.where(text_column_filter(Job.job_runner_name, term))
elif isinstance(term, RawTextTerm):
columns = [model.Job.tool_id]
columns = [Job.tool_id]
if user_details:
columns.append(model.User.email)
columns.append(User.email)
if is_admin:
columns.append(model.Job.handler)
columns.append(model.Job.job_runner_name)
query = query.filter(raw_text_column_filter(columns, term))
columns.append(Job.handler)
columns.append(Job.job_runner_name)
stmt = stmt.filter(raw_text_column_filter(columns, term))
return stmt

if not is_admin:
jdavcs marked this conversation as resolved.
Show resolved Hide resolved
if user_details:
raise AdminRequiredException("Only admins can index the jobs with user details enabled")
if decoded_user_id is not None and decoded_user_id != trans.user.id:
raise AdminRequiredException("Only admins can index the jobs of others")

stmt = select(Job)

if is_admin:
if decoded_user_id is not None:
stmt = stmt.where(Job.user_id == decoded_user_id)
if user_details:
stmt = stmt.outerjoin(Job.user)
else:
stmt = stmt.where(Job.user_id == trans.user.id)

stmt = build_and_apply_filters(stmt, payload.states, lambda s: model.Job.state == s)
stmt = build_and_apply_filters(stmt, payload.tool_ids, lambda t: model.Job.tool_id == t)
stmt = build_and_apply_filters(stmt, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t))
stmt = build_and_apply_filters(stmt, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin)
stmt = build_and_apply_filters(stmt, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax)

if history_id is not None:
stmt = stmt.where(Job.history_id == history_id)

order_by_columns = Job
if workflow_id or invocation_id:
stmt, order_by_columns = add_workflow_jobs()

if search:
stmt = add_search_criteria(stmt)

if payload.order_by == JobIndexSortByEnum.create_time:
order_by = model.Job.create_time.desc()
if order_by == JobIndexSortByEnum.create_time:
stmt = stmt.order_by(order_by_columns.create_time.desc())
else:
order_by = model.Job.update_time.desc()
query = query.order_by(order_by)
stmt = stmt.order_by(order_by_columns.update_time.desc())

query = query.offset(payload.offset)
query = query.limit(payload.limit)
return query
stmt = stmt.offset(payload.offset)
stmt = stmt.limit(payload.limit)
return trans.sa_session.scalars(stmt)

def job_lock(self) -> JobLock:
return JobLock(active=self.app.job_manager.job_lock)
Expand Down
Loading