Skip to content

Commit

Permalink
Improved typing of tool execution code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jul 31, 2024
1 parent e90825d commit fa73985
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
32 changes: 19 additions & 13 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
from galaxy.tools.actions import (
DefaultToolAction,
ToolAction,
ToolExecutionCache,
)
from galaxy.tools.actions.data_manager import DataManagerToolAction
from galaxy.tools.actions.data_source import DataSourceToolAction
Expand Down Expand Up @@ -183,7 +184,12 @@
from galaxy.version import VERSION_MAJOR
from galaxy.work.context import proxy_work_context_for_history
from .execute import (
DEFAULT_USE_CACHED_JOB,
DEFAULT_PREFERRED_OBJECT_STORE_ID,
DEFAULT_RERUN_REMAP_JOB_ID,
execute as execute_job,
ExecutionSlice,
JobCallbackT,
MappingParameters,
)

Expand Down Expand Up @@ -1863,9 +1869,9 @@ def handle_input(
self,
trans,
incoming,
history=None,
use_cached_job=False,
preferred_object_store_id: Optional[str] = None,
history: Optional[model.History] = None,
use_cached_job: bool = DEFAULT_USE_CACHED_JOB,
preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID,
input_format="legacy",
):
"""
Expand Down Expand Up @@ -1942,16 +1948,16 @@ def handle_incoming_errors(self, all_errors):
def handle_single_execution(
self,
trans,
rerun_remap_job_id,
execution_slice,
history,
execution_cache=None,
completed_job=None,
collection_info=None,
job_callback=None,
preferred_object_store_id=None,
flush_job=True,
skip=False,
rerun_remap_job_id: Optional[int],
execution_slice: ExecutionSlice,
history: model.History,
execution_cache: ToolExecutionCache,
completed_job: Optional[model.Job],
collection_info: Optional[MatchingCollections],
job_callback: Optional[JobCallbackT],
preferred_object_store_id: Optional[str],
flush_job: bool,
skip: bool,
):
"""
Return a pair with whether execution is successful as well as either
Expand Down
30 changes: 21 additions & 9 deletions lib/galaxy/tools/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,42 @@
BATCH_EXECUTION_MESSAGE = "Created ${job_count} job(s) for tool ${tool_id} request"


CompletedJobsT = Dict[int, Optional[model.Job]]
JobCallbackT = Callable
WorkflowResourceParametersT = Dict[str, Any]
# Input dictionary from the API, may include map/reduce instructions
ToolParameterRequestT = Dict[str, Any]
# Input dictionary extracted from a tool request for running a tool individually
ToolParameterRequestInstanceT = Dict[str, Any]
DEFAULT_USE_CACHED_JOB = False
DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None
DEFAULT_RERUN_REMAP_JOB_ID: Optional[int] = None


class PartialJobExecution(Exception):
def __init__(self, execution_tracker):
def __init__(self, execution_tracker: "ExecutionTracker"):
self.execution_tracker = execution_tracker


class MappingParameters(NamedTuple):
param_template: Dict[str, Any]
param_combinations: List[Dict[str, Any]]
param_template: ToolParameterRequestT
param_combinations: List[ToolParameterRequestInstanceT]


def execute(
trans,
tool: "Tool",
mapping_params: MappingParameters,
history: model.History,
rerun_remap_job_id: Optional[int] = None,
preferred_object_store_id: Optional[str] = None,
rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID,
preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID,
collection_info: Optional[MatchingCollections] = None,
workflow_invocation_uuid: Optional[str] = None,
invocation_step: Optional[model.WorkflowInvocationStep] = None,
max_num_jobs: Optional[int] = None,
job_callback: Optional[Callable] = None,
completed_jobs: Optional[Dict[int, Optional[model.Job]]] = None,
workflow_resource_parameters: Optional[Dict[str, Any]] = None,
job_callback: Optional[JobCallbackT] = None,
completed_jobs: Optional[CompletedJobsT] = None,
workflow_resource_parameters: Optional[WorkflowResourceParametersT] = None,
validate_outputs: bool = False,
):
"""
Expand All @@ -95,7 +107,7 @@ def execute(
)
execution_cache = ToolExecutionCache(trans)

def execute_single_job(execution_slice, completed_job, skip=False):
def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optional[model.Job], skip: bool = False):
job_timer = tool.app.execution_timer_factory.get_timer(
"internals.galaxy.tools.execute.job_single", SINGLE_EXECUTION_SUCCESS_MESSAGE
)
Expand Down

0 comments on commit fa73985

Please sign in to comment.