Skip to content

Commit

Permalink
[WIP] Fix SA2.0 (query->select) in galaxy.jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Sep 20, 2023
1 parent 849b50d commit f6a07ff
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
26 changes: 15 additions & 11 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import yaml
from packaging.version import Version
from pulsar.client.staging import COMMAND_VERSION_FILENAME
from sqlalchemy import select

from galaxy import (
model,
Expand Down Expand Up @@ -59,7 +60,11 @@
JobState,
)
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import store
from galaxy.model import (
Job,
store,
Task,
)
from galaxy.model.base import transaction
from galaxy.model.store.discover import MaxDiscoveredFilesExceededError
from galaxy.objectstore import ObjectStorePopulator
Expand Down Expand Up @@ -1043,11 +1048,8 @@ def metadata_strategy(self):
def remote_command_line(self):
use_remote = self.get_destination_configuration("tool_evaluation_strategy") == "remote"
# It wouldn't be hard to support history export, but we want to do this in task queue workers anyway ...
return (
use_remote
and self.external_output_metadata.extended
and not self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=self.get_job()).first()
)
stmt = select(model.JobExportHistoryArchive).filter_by(job=self.get_job()).limit(1)
return use_remote and self.external_output_metadata.extended and not self.sa_session.scalars(stmt).first()

def tool_directory(self):
tool_dir = self.tool and self.tool.tool_dir
Expand Down Expand Up @@ -1182,7 +1184,7 @@ def galaxy_url(self):
return self.get_destination_configuration("galaxy_infrastructure_url")

def get_job(self) -> model.Job:
return self.sa_session.query(model.Job).get(self.job_id)
return self.sa_session.get(Job, self.job_id)

def get_id_tag(self):
# For compatibility with drmaa, which uses job_id right now, and TaskWrapper
Expand Down Expand Up @@ -1233,10 +1235,12 @@ def prepare(self, compute_environment=None):
job = self._load_job()

def get_special():
jeha = self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=job).first()
stmt = select(model.JobExportHistoryArchive).filter_by(job=job).limit(1)
jeha = self.sa_session.scalars(stmt).first()
if jeha:
return jeha.fda
return self.sa_session.query(model.GenomeIndexToolData).filter_by(job=job).first()
stmt = select(model.GenomeIndexToolData).filter_by(job=job).limit(1)
return self.sa_session.scalars(stmt).first()

# TODO: The upload tool actions that create the paramfile can probably be turned in to a configfile to remove this special casing
if job.tool_id == "upload1":
Expand Down Expand Up @@ -2583,12 +2587,12 @@ def can_split(self):

def get_job(self):
if self.job_id:
return self.sa_session.query(model.Job).get(self.job_id)
return self.sa_session.get(Job, self.job_id)
else:
return None

def get_task(self):
return self.sa_session.query(model.Task).get(self.task_id)
return self.sa_session.get(Task, self.task_id)

def get_id_tag(self):
# For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly
Expand Down
8 changes: 5 additions & 3 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Queue,
)

from sqlalchemy import select
from sqlalchemy.orm import object_session

import galaxy.jobs
Expand Down Expand Up @@ -397,11 +398,12 @@ def _walk_dataset_outputs(self, job: model.Job):
dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations
):
if isinstance(dataset, self.app.model.HistoryDatasetAssociation):
joda = (
self.sa_session.query(self.app.model.JobToOutputDatasetAssociation)
stmt = (
select(self.app.model.JobToOutputDatasetAssociation)
.filter_by(job=job, dataset=dataset)
.first()
.limit(1)
)
joda = self.sa_session.scalars(stmt).first()
yield (joda, dataset)
# TODO: why is this not just something easy like:
# for dataset_assoc in job.output_datasets + job.output_library_datasets:
Expand Down
3 changes: 1 addition & 2 deletions lib/galaxy/jobs/runners/godocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ def check_watched_item(self, job_state):

def stop_job(self, job_wrapper):
"""Attempts to delete a dispatched executing Job in GoDocker"""
# This function is called by fail_job() where
# param job = self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id)
# This function is called by fail_job()
# No Return data expected
job_id = job_wrapper.job_id
log.debug(f"STOP JOB EXECUTION OF JOB ID: {str(job_id)}")
Expand Down
7 changes: 3 additions & 4 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select

from galaxy import model
from galaxy.job_execution.compute_environment import (
Expand Down Expand Up @@ -974,10 +975,8 @@ def __async_update(self, full_status):
remote_job_id = full_status["job_id"]
if len(remote_job_id) == 32:
# It is a UUID - assign_ids = uuid in destination params...
sa_session = self.app.model.session
galaxy_job_id = (
sa_session.query(model.Job).filter(model.Job.job_runner_external_id == remote_job_id).one().id
)
stmt = select(model.Job).filter(model.Job.job_runner_external_id == remote_job_id)
galaxy_job_id = self.app.model.session.execute(stmt).scalar_one().id
else:
galaxy_job_id = remote_job_id
job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
Expand Down

0 comments on commit f6a07ff

Please sign in to comment.