Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[23.1] Fix closed transaction error on galaxy startup/check jobs #16687

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,55 +269,55 @@ def job_pair_for_id(self, id):
def __check_jobs_at_startup(self):
"""
Checks all jobs that are in the 'new', 'queued', 'running', or 'stopped' state in
the database and requeues or cleans up as necessary. Only run as the
job handler starts.
the database and requeues or cleans up as necessary. Only run as the job handler starts.
In case the activation is enforced it will filter out the jobs of inactive users.
"""
stmt = self._build_check_jobs_at_startup_statement()
with self.sa_session() as session, session.begin():
jobs_at_startup = session.scalars(stmt)
for job in jobs_at_startup:
if not self.app.toolbox.has_tool(job.tool_id, job.tool_version, exact=True):
log.warning(f"({job.id}) Tool '{job.tool_id}' removed from tool config, unable to recover job")
self.job_wrapper(job).fail(
"This tool was disabled before the job completed. Please contact your Galaxy administrator."
)
elif job.job_runner_name is not None and job.job_runner_external_id is None:
# This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner.
log.debug(
f"({job.id}) Job runner assigned but no external ID recorded, adding to the job handler queue"
)
job.job_runner_name = None
if self.track_jobs_in_database:
job.set_state(model.Job.states.NEW)
else:
self.queue.put((job.id, job.tool_id))
elif (
job.job_runner_name is not None
and job.job_runner_external_id is not None
and job.destination_id is None
):
# This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist
job_wrapper = self.job_wrapper(job)
job_destination = self.dispatcher.url_to_destination(job.job_runner_name)
if job_destination.id is None:
job_destination.id = "legacy_url"
job_wrapper.set_job_destination(job_destination, job.job_runner_external_id)
self.dispatcher.recover(job, job_wrapper)
log.info(f"({job.id}) Converted job from a URL to a destination and recovered")
elif job.job_runner_name is None:
# Never (fully) dispatched
log.debug(
f"({job.id}) No job runner assigned and job still in '{job.state}' state, adding to the job handler queue"
)
if self.track_jobs_in_database:
job.set_state(model.Job.states.NEW)
else:
self.queue.put((job.id, job.tool_id))
else:
# Already dispatched and running
job_wrapper = self.__recover_job_wrapper(job)
self.dispatcher.recover(job, job_wrapper)
try:
for job in session.scalars(stmt):
with session.begin_nested():
self._check_job_at_startup(job)
finally:
session.commit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this method fails, startup will fail entirely, so anticipating failure seems weird here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the actual fix the savepoint, and the finally is there just in case something not related to the bug I posted raises an exception ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOW with begin_nested can we commit and continue to emit SQL without a new begin() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def _check_job_at_startup(self, job):
if not self.app.toolbox.has_tool(job.tool_id, job.tool_version, exact=True):
log.warning(f"({job.id}) Tool '{job.tool_id}' removed from tool config, unable to recover job")
self.job_wrapper(job).fail(
"This tool was disabled before the job completed. Please contact your Galaxy administrator."
)
elif job.job_runner_name is not None and job.job_runner_external_id is None:
# This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner.
log.debug(f"({job.id}) Job runner assigned but no external ID recorded, adding to the job handler queue")
job.job_runner_name = None
if self.track_jobs_in_database:
job.set_state(model.Job.states.NEW)
else:
self.queue.put((job.id, job.tool_id))
elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None:
# This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist
job_wrapper = self.job_wrapper(job)
job_destination = self.dispatcher.url_to_destination(job.job_runner_name)
if job_destination.id is None:
job_destination.id = "legacy_url"
job_wrapper.set_job_destination(job_destination, job.job_runner_external_id)
self.dispatcher.recover(job, job_wrapper)
log.info(f"({job.id}) Converted job from a URL to a destination and recovered")
elif job.job_runner_name is None:
# Never (fully) dispatched
log.debug(
f"({job.id}) No job runner assigned and job still in '{job.state}' state, adding to the job handler queue"
)
if self.track_jobs_in_database:
job.set_state(model.Job.states.NEW)
else:
self.queue.put((job.id, job.tool_id))
else:
# Already dispatched and running
job_wrapper = self.__recover_job_wrapper(job)
self.dispatcher.recover(job, job_wrapper)
pass

def _build_check_jobs_at_startup_statement(self):
if self.track_jobs_in_database:
Expand Down