Skip to content

Commit

Permalink
[WIP] Fix SA2.0 (query->select) in galaxy.jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Aug 13, 2023
1 parent a8f28b6 commit cc9856d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
20 changes: 10 additions & 10 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import yaml
from packaging.version import Version
from pulsar.client.staging import COMMAND_VERSION_FILENAME
from sqlalchemy import select

from galaxy import (
model,
Expand Down Expand Up @@ -1042,11 +1043,8 @@ def metadata_strategy(self):
def remote_command_line(self):
use_remote = self.get_destination_configuration("tool_evaluation_strategy") == "remote"
# It wouldn't be hard to support history export, but we want to do this in task queue workers anyway ...
return (
use_remote
and self.external_output_metadata.extended
and not self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=self.get_job()).first()
)
stmt = select(model.JobExportHistoryArchive).filter_by(job=self.get_job()).limit(1)
return use_remote and self.external_output_metadata.extended and not self.sa_session.scalars(stmt).first()

def tool_directory(self):
tool_dir = self.tool and self.tool.tool_dir
Expand Down Expand Up @@ -1181,7 +1179,7 @@ def galaxy_url(self):
return self.get_destination_configuration("galaxy_infrastructure_url")

def get_job(self) -> model.Job:
return self.sa_session.query(model.Job).get(self.job_id)
return self.sa_session.get(model.Job, self.job_id)

def get_id_tag(self):
# For compatibility with drmaa, which uses job_id right now, and TaskWrapper
Expand Down Expand Up @@ -1232,10 +1230,12 @@ def prepare(self, compute_environment=None):
job = self._load_job()

def get_special():
jeha = self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=job).first()
stmt = select(model.JobExportHistoryArchive).filter_by(job=job).limit(1)
jeha = self.sa_session.scalars(stmt).first()
if jeha:
return jeha.fda
return self.sa_session.query(model.GenomeIndexToolData).filter_by(job=job).first()
stmt = select(model.GenomeIndexToolData).filter_by(job=job).limit(1)
return self.sa_session.scalars(stmt).first()

# TODO: The upload tool actions that create the paramfile can probably be turned in to a configfile to remove this special casing
if job.tool_id == "upload1":
Expand Down Expand Up @@ -2579,12 +2579,12 @@ def can_split(self):

def get_job(self):
if self.job_id:
return self.sa_session.query(model.Job).get(self.job_id)
return self.sa_session.get(model.Job, self.job_id)
else:
return None

def get_task(self):
return self.sa_session.query(model.Task).get(self.task_id)
return self.sa_session.get(model.Task, self.task_id)

def get_id_tag(self):
# For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly
Expand Down
9 changes: 6 additions & 3 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
Queue,
)

from sqlalchemy import select

import galaxy.jobs
from galaxy import model
from galaxy.exceptions import ConfigurationError
Expand Down Expand Up @@ -379,11 +381,12 @@ def _walk_dataset_outputs(self, job: model.Job):
dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations
):
if isinstance(dataset, self.app.model.HistoryDatasetAssociation):
joda = (
self.sa_session.query(self.app.model.JobToOutputDatasetAssociation)
stmt = (
select(self.app.model.JobToOutputDatasetAssociation)
.filter_by(job=job, dataset=dataset)
.first()
.limit(1)
)
joda = self.sa_session.scalars(stmt).first()
yield (joda, dataset)
# TODO: why is this not just something easy like:
# for dataset_assoc in job.output_datasets + job.output_library_datasets:
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/runners/godocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def check_watched_item(self, job_state):
def stop_job(self, job_wrapper):
"""Attempts to delete a dispatched executing Job in GoDocker"""
# This function is called by fail_job() where
# param job = self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id)
# param job = self.sa_session.get(self.app.model.Job, job_state.job_wrapper.job_id)
# No Return data expected
job_id = job_wrapper.job_id
log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}")
Expand Down
7 changes: 3 additions & 4 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select

from galaxy import model
from galaxy.job_execution.compute_environment import ComputeEnvironment
Expand Down Expand Up @@ -949,10 +950,8 @@ def __async_update(self, full_status):
remote_job_id = full_status["job_id"]
if len(remote_job_id) == 32:
# It is a UUID - assign_ids = uuid in destination params...
sa_session = self.app.model.session
galaxy_job_id = (
sa_session.query(model.Job).filter(model.Job.job_runner_external_id == remote_job_id).one().id
)
stmt = select(model.Job).filter(model.Job.job_runner_external_id == remote_job_id)
galaxy_job_id = self.app.model.session.execute(stmt).scalar_one().id
else:
galaxy_job_id = remote_job_id
job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
Expand Down

0 comments on commit cc9856d

Please sign in to comment.