Skip to content

Commit

Permalink
Merge pull request #18758 from jmchilton/refactoring_execute
Browse files Browse the repository at this point in the history
More typing, docs, and decomposition around tool execution
  • Loading branch information
mvdbeek authored Sep 7, 2024
2 parents b669011 + f7b9a17 commit 0e4e03a
Show file tree
Hide file tree
Showing 19 changed files with 539 additions and 264 deletions.
35 changes: 31 additions & 4 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
Safety,
)
from galaxy.managers.collections import DatasetCollectionManager
from galaxy.managers.context import ProvidesUserContext
from galaxy.managers.context import (
ProvidesHistoryContext,
ProvidesUserContext,
)
from galaxy.managers.datasets import DatasetManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
Expand All @@ -70,6 +73,10 @@
)
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.structured_app import StructuredApp
from galaxy.tools._types import (
ToolStateDumpedToJsonInternalT,
ToolStateJobInstancePopulatedT,
)
from galaxy.util import (
defaultdict,
ExecutionTimer,
Expand All @@ -83,6 +90,9 @@

log = logging.getLogger(__name__)

JobStateT = str
JobStatesT = Union[JobStateT, List[JobStateT]]


class JobLock(BaseModel):
active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch")
Expand Down Expand Up @@ -313,7 +323,15 @@ def __init__(
self.ldda_manager = ldda_manager
self.decode_id = id_encoding_helper.decode_id

def by_tool_input(self, trans, tool_id, tool_version, param=None, param_dump=None, job_state="ok"):
def by_tool_input(
self,
trans: ProvidesHistoryContext,
tool_id: str,
tool_version: Optional[str],
param: ToolStateJobInstancePopulatedT,
param_dump: ToolStateDumpedToJsonInternalT,
job_state: Optional[JobStatesT] = "ok",
):
"""Search for jobs producing same results using the 'inputs' part of a tool POST."""
user = trans.user
input_data = defaultdict(list)
Expand Down Expand Up @@ -355,7 +373,14 @@ def populate_input_data_input_id(path, key, value):
)

def __search(
self, tool_id, tool_version, user, input_data, job_state=None, param_dump=None, wildcard_param_dump=None
self,
tool_id: str,
tool_version: Optional[str],
user: model.User,
input_data,
job_state: Optional[JobStatesT],
param_dump: ToolStateDumpedToJsonInternalT,
wildcard_param_dump=None,
):
search_timer = ExecutionTimer()

Expand Down Expand Up @@ -464,7 +489,9 @@ def replace_dataset_ids(path, key, value):
log.info("No equivalent jobs found %s", search_timer)
return None

def _build_job_subquery(self, tool_id, user_id, tool_version, job_state, wildcard_param_dump):
def _build_job_subquery(
self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump
):
"""Build subquery that selects a job with correct job parameters."""
stmt = select(model.Job.id).where(
and_(
Expand Down
Loading

0 comments on commit 0e4e03a

Please sign in to comment.