Skip to content

Commit

Permalink
Let job handler work queue call job wrapper fail
Browse files Browse the repository at this point in the history
This avoids committing the transaction and then have it fail on the next
iteration. Should fix the app startup failing in galaxyproject#17079
  • Loading branch information
mvdbeek committed Nov 24, 2023
1 parent 41b302d commit d95fe4f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
3 changes: 2 additions & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Galaxy job handler, prepares, runs, tracks, and finishes Galaxy jobs
"""
import datetime
import functools
import os
import time
from collections import defaultdict
Expand Down Expand Up @@ -1268,7 +1269,7 @@ def recover(self, job, job_wrapper):
except ObjectNotFound:
msg = "Could not recover job working directory after Galaxy restart"
log.exception(f"recover(): ({job_wrapper.job_id}) {msg}")
job_wrapper.fail(msg)
runner.work_queue.put(functools.partial(job_wrapper.fail, msg))

def shutdown(self):
failures = []
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ def __init__(self, app: "GalaxyManagerApplication", nworkers: int, **kwargs):
self.runner_params = RunnerParams(specs=runner_param_specs, params=kwargs)
self.runner_state_handlers = build_state_handlers()
self._should_stop = False
self.work_queue = Queue()

def start(self):
for start_method in self.start_methods:
getattr(self, start_method, lambda: None)()

def _init_worker_threads(self):
"""Start ``nworkers`` worker threads."""
self.work_queue = Queue()
self.work_threads = []
log.debug(f"Starting {self.nworkers} {self.runner_name} workers")
for i in range(self.nworkers):
Expand Down

0 comments on commit d95fe4f

Please sign in to comment.