diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 59d6d74fd603..0497277d4cd6 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -25,6 +25,7 @@ import yaml from packaging.version import Version from pulsar.client.staging import COMMAND_VERSION_FILENAME +from sqlalchemy import select from galaxy import ( model, @@ -59,7 +60,11 @@ JobState, ) from galaxy.metadata import get_metadata_compute_strategy -from galaxy.model import store +from galaxy.model import ( + Job, + store, + Task, +) from galaxy.model.base import transaction from galaxy.model.store.discover import MaxDiscoveredFilesExceededError from galaxy.objectstore import ObjectStorePopulator @@ -1043,11 +1048,8 @@ def metadata_strategy(self): def remote_command_line(self): use_remote = self.get_destination_configuration("tool_evaluation_strategy") == "remote" # It wouldn't be hard to support history export, but we want to do this in task queue workers anyway ... - return ( - use_remote - and self.external_output_metadata.extended - and not self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=self.get_job()).first() - ) + stmt = select(model.JobExportHistoryArchive).filter_by(job=self.get_job()).limit(1) + return use_remote and self.external_output_metadata.extended and not self.sa_session.scalars(stmt).first() def tool_directory(self): tool_dir = self.tool and self.tool.tool_dir @@ -1182,7 +1184,7 @@ def galaxy_url(self): return self.get_destination_configuration("galaxy_infrastructure_url") def get_job(self) -> model.Job: - return self.sa_session.query(model.Job).get(self.job_id) + return self.sa_session.get(Job, self.job_id) def get_id_tag(self): # For compatibility with drmaa, which uses job_id right now, and TaskWrapper @@ -1233,10 +1235,12 @@ def prepare(self, compute_environment=None): job = self._load_job() def get_special(): - jeha = self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=job).first() + stmt = select(model.JobExportHistoryArchive).filter_by(job=job).limit(1) + jeha = self.sa_session.scalars(stmt).first() if jeha: return jeha.fda - return self.sa_session.query(model.GenomeIndexToolData).filter_by(job=job).first() + stmt = select(model.GenomeIndexToolData).filter_by(job=job).limit(1) + return self.sa_session.scalars(stmt).first() # TODO: The upload tool actions that create the paramfile can probably be turned in to a configfile to remove this special casing if job.tool_id == "upload1": @@ -2583,12 +2587,12 @@ def can_split(self): def get_job(self): if self.job_id: - return self.sa_session.query(model.Job).get(self.job_id) + return self.sa_session.get(Job, self.job_id) else: return None def get_task(self): - return self.sa_session.query(model.Task).get(self.task_id) + return self.sa_session.get(Task, self.task_id) def get_id_tag(self): # For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index 6c95de6312b3..c14aea726f03 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,7 @@ Queue, ) +from sqlalchemy import select from sqlalchemy.orm import object_session import galaxy.jobs @@ -397,11 +398,12 @@ def _walk_dataset_outputs(self, job: model.Job): dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations ): if isinstance(dataset, self.app.model.HistoryDatasetAssociation): - joda = ( - self.sa_session.query(self.app.model.JobToOutputDatasetAssociation) + stmt = ( + select(self.app.model.JobToOutputDatasetAssociation) .filter_by(job=job, dataset=dataset) - .first() + .limit(1) ) + joda = self.sa_session.scalars(stmt).first() yield (joda, dataset) # TODO: why is this not just something easy like: # for dataset_assoc in job.output_datasets + job.output_library_datasets: diff --git a/lib/galaxy/jobs/runners/godocker.py b/lib/galaxy/jobs/runners/godocker.py index 869994a8838b..bcacb84fb700 100644 --- a/lib/galaxy/jobs/runners/godocker.py +++ b/lib/galaxy/jobs/runners/godocker.py @@ -235,8 +235,7 @@ def check_watched_item(self, job_state): def stop_job(self, job_wrapper): """Attempts to delete a dispatched executing Job in GoDocker""" - # This function is called by fail_job() where - # param job = self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id) + # This function is called by fail_job() # No Return data expected job_id = job_wrapper.job_id log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}") diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index aa3ee8cb6f17..2bba3504024a 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -36,6 +36,7 @@ # TODO: Perform pulsar release with this included in the client package from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN +from sqlalchemy import select from galaxy import model from galaxy.job_execution.compute_environment import ( @@ -974,10 +975,8 @@ def __async_update(self, full_status): remote_job_id = full_status["job_id"] if len(remote_job_id) == 32: # It is a UUID - assign_ids = uuid in destination params... - sa_session = self.app.model.session - galaxy_job_id = ( - sa_session.query(model.Job).filter(model.Job.job_runner_external_id == remote_job_id).one().id - ) + stmt = select(model.Job).filter(model.Job.job_runner_external_id == remote_job_id) + galaxy_job_id = self.app.model.session.execute(stmt).scalar_one().id else: galaxy_job_id = remote_job_id job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)