From cc9856d7e542b8af421caa433f0435bff67d3154 Mon Sep 17 00:00:00 2001 From: John Davis Date: Thu, 10 Aug 2023 22:04:33 -0400 Subject: [PATCH] [WIP] Fix SA2.0 (query->select) in galaxy.jobs --- lib/galaxy/jobs/__init__.py | 20 ++++++++++---------- lib/galaxy/jobs/runners/__init__.py | 9 ++++++--- lib/galaxy/jobs/runners/godocker.py | 2 +- lib/galaxy/jobs/runners/pulsar.py | 7 +++---- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 318a410d4221..af1718708100 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -25,6 +25,7 @@ import yaml from packaging.version import Version from pulsar.client.staging import COMMAND_VERSION_FILENAME +from sqlalchemy import select from galaxy import ( model, @@ -1042,11 +1043,8 @@ def metadata_strategy(self): def remote_command_line(self): use_remote = self.get_destination_configuration("tool_evaluation_strategy") == "remote" # It wouldn't be hard to support history export, but we want to do this in task queue workers anyway ... - return ( - use_remote - and self.external_output_metadata.extended - and not self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=self.get_job()).first() - ) + stmt = select(model.JobExportHistoryArchive).filter_by(job=self.get_job()).limit(1) + return use_remote and self.external_output_metadata.extended and not self.sa_session.scalars(stmt).first() def tool_directory(self): tool_dir = self.tool and self.tool.tool_dir @@ -1181,7 +1179,7 @@ def galaxy_url(self): return self.get_destination_configuration("galaxy_infrastructure_url") def get_job(self) -> model.Job: - return self.sa_session.query(model.Job).get(self.job_id) + return self.sa_session.get(model.Job, self.job_id) def get_id_tag(self): # For compatibility with drmaa, which uses job_id right now, and TaskWrapper @@ -1232,10 +1230,12 @@ def prepare(self, compute_environment=None): job = self._load_job() def get_special(): - jeha = self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=job).first() + stmt = select(model.JobExportHistoryArchive).filter_by(job=job).limit(1) + jeha = self.sa_session.scalars(stmt).first() if jeha: return jeha.fda - return self.sa_session.query(model.GenomeIndexToolData).filter_by(job=job).first() + stmt = select(model.GenomeIndexToolData).filter_by(job=job).limit(1) + return self.sa_session.scalars(stmt).first() # TODO: The upload tool actions that create the paramfile can probably be turned in to a configfile to remove this special casing if job.tool_id == "upload1": @@ -2579,12 +2579,12 @@ def can_split(self): def get_job(self): if self.job_id: - return self.sa_session.query(model.Job).get(self.job_id) + return self.sa_session.get(model.Job, self.job_id) else: return None def get_task(self): - return self.sa_session.query(model.Task).get(self.task_id) + return self.sa_session.get(model.Task, self.task_id) def get_id_tag(self): # For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index c79d1926d0e9..99d4006cae5e 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,8 @@ Queue, ) +from sqlalchemy import select + import galaxy.jobs from galaxy import model from galaxy.exceptions import ConfigurationError @@ -379,11 +381,12 @@ def _walk_dataset_outputs(self, job: model.Job): dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations ): if isinstance(dataset, self.app.model.HistoryDatasetAssociation): - joda = ( - self.sa_session.query(self.app.model.JobToOutputDatasetAssociation) + stmt = ( + select(self.app.model.JobToOutputDatasetAssociation) .filter_by(job=job, dataset=dataset) - .first() + .limit(1) ) + joda = self.sa_session.scalars(stmt).first() yield (joda, dataset) # TODO: why is this not just something easy like: # for dataset_assoc in job.output_datasets + job.output_library_datasets: diff --git a/lib/galaxy/jobs/runners/godocker.py b/lib/galaxy/jobs/runners/godocker.py index 869994a8838b..ed3379e3ee99 100644 --- a/lib/galaxy/jobs/runners/godocker.py +++ b/lib/galaxy/jobs/runners/godocker.py @@ -236,7 +236,7 @@ def check_watched_item(self, job_state): def stop_job(self, job_wrapper): """Attempts to delete a dispatched executing Job in GoDocker""" # This function is called by fail_job() where - # param job = self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id) + # param job = self.sa_session.get(self.app.model.Job, job_state.job_wrapper.job_id) # No Return data expected job_id = job_wrapper.job_id log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}") diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index b7bdd6100f6f..5fd534c48dc5 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -35,6 +35,7 @@ # TODO: Perform pulsar release with this included in the client package from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN +from sqlalchemy import select from galaxy import model from galaxy.job_execution.compute_environment import ComputeEnvironment @@ -949,10 +950,8 @@ def __async_update(self, full_status): remote_job_id = full_status["job_id"] if len(remote_job_id) == 32: # It is a UUID - assign_ids = uuid in destination params... - sa_session = self.app.model.session - galaxy_job_id = ( - sa_session.query(model.Job).filter(model.Job.job_runner_external_id == remote_job_id).one().id - ) + stmt = select(model.Job).filter(model.Job.job_runner_external_id == remote_job_id) + galaxy_job_id = self.app.model.session.execute(stmt).scalar_one().id else: galaxy_job_id = remote_job_id job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)