From fa739857c942666ea365d9c84fb621985067d519 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Jul 2024 17:24:44 -0400 Subject: [PATCH] Improved typing of tool execution code. --- lib/galaxy/tools/__init__.py | 32 +++++++++++++++++++------------- lib/galaxy/tools/execute.py | 30 +++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index fdb530888a5a..d0aa59f1a5fe 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -107,6 +107,7 @@ from galaxy.tools.actions import ( DefaultToolAction, ToolAction, + ToolExecutionCache, ) from galaxy.tools.actions.data_manager import DataManagerToolAction from galaxy.tools.actions.data_source import DataSourceToolAction @@ -183,7 +184,12 @@ from galaxy.version import VERSION_MAJOR from galaxy.work.context import proxy_work_context_for_history from .execute import ( + DEFAULT_USE_CACHED_JOB, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, execute as execute_job, + ExecutionSlice, + JobCallbackT, MappingParameters, ) @@ -1863,9 +1869,9 @@ def handle_input( self, trans, incoming, - history=None, - use_cached_job=False, - preferred_object_store_id: Optional[str] = None, + history: Optional[model.History] = None, + use_cached_job: bool = DEFAULT_USE_CACHED_JOB, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, input_format="legacy", ): """ @@ -1942,16 +1948,16 @@ def handle_incoming_errors(self, all_errors): def handle_single_execution( self, trans, - rerun_remap_job_id, - execution_slice, - history, - execution_cache=None, - completed_job=None, - collection_info=None, - job_callback=None, - preferred_object_store_id=None, - flush_job=True, - skip=False, + rerun_remap_job_id: Optional[int], + execution_slice: ExecutionSlice, + history: model.History, + execution_cache: ToolExecutionCache, + completed_job: Optional[model.Job], + collection_info: Optional[MatchingCollections], + job_callback: Optional[JobCallbackT], + preferred_object_store_id: Optional[str], + flush_job: bool, + skip: bool, ): """ Return a pair with whether execution is successful as well as either diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index c81d57252843..8ba38e948ef7 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -45,14 +45,26 @@ BATCH_EXECUTION_MESSAGE = "Created ${job_count} job(s) for tool ${tool_id} request" +CompletedJobsT = Dict[int, Optional[model.Job]] +JobCallbackT = Callable +WorkflowResourceParametersT = Dict[str, Any] +# Input dictionary from the API, may include map/reduce instructions +ToolParameterRequestT = Dict[str, Any] +# Input dictionary extracted from a tool request for running a tool individually +ToolParameterRequestInstanceT = Dict[str, Any] +DEFAULT_USE_CACHED_JOB = False +DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None +DEFAULT_RERUN_REMAP_JOB_ID: Optional[int] = None + + class PartialJobExecution(Exception): - def __init__(self, execution_tracker): + def __init__(self, execution_tracker: "ExecutionTracker"): self.execution_tracker = execution_tracker class MappingParameters(NamedTuple): - param_template: Dict[str, Any] - param_combinations: List[Dict[str, Any]] + param_template: ToolParameterRequestT + param_combinations: List[ToolParameterRequestInstanceT] def execute( @@ -60,15 +72,15 @@ def execute( tool: "Tool", mapping_params: MappingParameters, history: model.History, - rerun_remap_job_id: Optional[int] = None, - preferred_object_store_id: Optional[str] = None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, collection_info: Optional[MatchingCollections] = None, workflow_invocation_uuid: Optional[str] = None, invocation_step: Optional[model.WorkflowInvocationStep] = None, max_num_jobs: Optional[int] = None, - job_callback: Optional[Callable] = None, - completed_jobs: Optional[Dict[int, Optional[model.Job]]] = None, - workflow_resource_parameters: Optional[Dict[str, Any]] = None, + job_callback: Optional[JobCallbackT] = None, + completed_jobs: Optional[CompletedJobsT] = None, + workflow_resource_parameters: Optional[WorkflowResourceParametersT] = None, validate_outputs: bool = False, ): """ @@ -95,7 +107,7 @@ def execute( ) execution_cache = ToolExecutionCache(trans) - def execute_single_job(execution_slice, completed_job, skip=False): + def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optional[model.Job], skip: bool = False): job_timer = tool.app.execution_timer_factory.get_timer( "internals.galaxy.tools.execute.job_single", SINGLE_EXECUTION_SUCCESS_MESSAGE )