diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 7148a58884b9..33348e1c47f2 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -35,6 +35,7 @@ ) from galaxy.jobs.mapper import JobNotReadyException from galaxy.model.base import transaction +from galaxy.model.repositories import get_jobs_to_check_at_startup from galaxy.structured_app import MinimalManagerApp from galaxy.util import unicodify from galaxy.util.custom_logging import get_logger @@ -272,10 +273,9 @@ def __check_jobs_at_startup(self): the database and requeues or cleans up as necessary. Only run as the job handler starts. In case the activation is enforced it will filter out the jobs of inactive users. """ - stmt = self._build_check_jobs_at_startup_statement() with self.sa_session() as session, session.begin(): try: - for job in session.scalars(stmt): + for job in get_jobs_to_check_at_startup(session, self.track_jobs_in_database, self.app.config): with session.begin_nested(): self._check_job_at_startup(job) finally: @@ -319,22 +319,6 @@ def _check_job_at_startup(self, job): self.dispatcher.recover(job, job_wrapper) pass - def _build_check_jobs_at_startup_statement(self): - if self.track_jobs_in_database: - in_list = (model.Job.states.QUEUED, model.Job.states.RUNNING, model.Job.states.STOPPED) - else: - in_list = (model.Job.states.NEW, model.Job.states.QUEUED, model.Job.states.RUNNING) - - stmt = ( - select(model.Job) - .execution_options(yield_per=model.YIELD_PER_ROWS) - .filter(model.Job.state.in_(in_list) & (model.Job.handler == self.app.config.server_name)) - ) - if self.app.config.user_activation_on: - # Filter out the jobs of inactive users. - stmt = stmt.outerjoin(model.User).filter(or_((model.Job.user_id == null()), (model.User.active == true()))) - return stmt - def __recover_job_wrapper(self, job): # Already dispatched and running job_wrapper = self.job_wrapper(job)