Skip to content

Commit

Permalink
Move jobs-to-check query from handler to repo
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Sep 20, 2023
1 parent f6a07ff commit 1d8f696
Showing 1 changed file with 2 additions and 18 deletions.
20 changes: 2 additions & 18 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from galaxy.jobs.mapper import JobNotReadyException
from galaxy.model.base import transaction
from galaxy.model.repositories import get_jobs_to_check_at_startup
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import unicodify
from galaxy.util.custom_logging import get_logger
Expand Down Expand Up @@ -272,10 +273,9 @@ def __check_jobs_at_startup(self):
the database and requeues or cleans up as necessary. Only run as the job handler starts.
In case the activation is enforced it will filter out the jobs of inactive users.
"""
stmt = self._build_check_jobs_at_startup_statement()
with self.sa_session() as session, session.begin():
try:
for job in session.scalars(stmt):
for job in get_jobs_to_check_at_startup(session, self.track_jobs_in_database, self.app.config):
with session.begin_nested():
self._check_job_at_startup(job)
finally:
Expand Down Expand Up @@ -319,22 +319,6 @@ def _check_job_at_startup(self, job):
self.dispatcher.recover(job, job_wrapper)
pass

def _build_check_jobs_at_startup_statement(self):
if self.track_jobs_in_database:
in_list = (model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.STOPPED)
else:
in_list = (model.Job.states.NEW, model.Job.states.QUEUED, model.Job.states.RUNNING)

stmt = (
select(model.Job)
.execution_options(yield_per=model.YIELD_PER_ROWS)
.filter(model.Job.state.in_(in_list) & (model.Job.handler == self.app.config.server_name))
)
if self.app.config.user_activation_on:
# Filter out the jobs of inactive users.
stmt = stmt.outerjoin(model.User).filter(or_((model.Job.user_id == null()), (model.User.active == true())))
return stmt

def __recover_job_wrapper(self, job):
# Already dispatched and running
job_wrapper = self.job_wrapper(job)
Expand Down

0 comments on commit 1d8f696

Please sign in to comment.