From 1d8f696024d2d4d70d40bd74d067865b9bf7b184 Mon Sep 17 00:00:00 2001 From: John Davis Date: Wed, 20 Sep 2023 16:22:34 -0400 Subject: [PATCH] Move jobs-to-check query from handler to repo --- lib/galaxy/jobs/handler.py | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 7148a58884b9..33348e1c47f2 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -35,6 +35,7 @@ ) from galaxy.jobs.mapper import JobNotReadyException from galaxy.model.base import transaction +from galaxy.model.repositories import get_jobs_to_check_at_startup from galaxy.structured_app import MinimalManagerApp from galaxy.util import unicodify from galaxy.util.custom_logging import get_logger @@ -272,10 +273,9 @@ def __check_jobs_at_startup(self): the database and requeues or cleans up as necessary. Only run as the job handler starts. In case the activation is enforced it will filter out the jobs of inactive users. """ - stmt = self._build_check_jobs_at_startup_statement() with self.sa_session() as session, session.begin(): try: - for job in session.scalars(stmt): + for job in get_jobs_to_check_at_startup(session, self.track_jobs_in_database, self.app.config): with session.begin_nested(): self._check_job_at_startup(job) finally: @@ -319,22 +319,6 @@ def _check_job_at_startup(self, job): self.dispatcher.recover(job, job_wrapper) pass - def _build_check_jobs_at_startup_statement(self): - if self.track_jobs_in_database: - in_list = (model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.STOPPED) - else: - in_list = (model.Job.states.NEW, model.Job.states.QUEUED, model.Job.states.RUNNING) - - stmt = ( - select(model.Job) - .execution_options(yield_per=model.YIELD_PER_ROWS) - .filter(model.Job.state.in_(in_list) & (model.Job.handler == self.app.config.server_name)) - ) - if self.app.config.user_activation_on: - # Filter out the jobs of inactive users. - stmt = stmt.outerjoin(model.User).filter(or_((model.Job.user_id == null()), (model.User.active == true()))) - return stmt - def __recover_job_wrapper(self, job): # Already dispatched and running job_wrapper = self.job_wrapper(job)