From 1bcf28ef159f7a8af883af9d7097078ba3b53384 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 30 Jul 2024 17:40:30 -0400 Subject: [PATCH] Improved typing/code structure around tool actions. --- lib/galaxy/managers/datasets.py | 4 +- lib/galaxy/managers/histories.py | 2 +- lib/galaxy/tools/__init__.py | 62 +++++++++++++++++--- lib/galaxy/tools/actions/__init__.py | 45 +++++++++----- lib/galaxy/tools/actions/history_imp_exp.py | 10 ++-- lib/galaxy/tools/actions/metadata.py | 60 +++++++++++++------ lib/galaxy/tools/actions/model_operations.py | 6 +- lib/galaxy/tools/execute.py | 19 ++++-- lib/galaxy/tools/execution_helpers.py | 1 + lib/galaxy/workflow/modules.py | 2 +- test/unit/app/tools/test_actions.py | 2 +- 11 files changed, 154 insertions(+), 59 deletions(-) diff --git a/lib/galaxy/managers/datasets.py b/lib/galaxy/managers/datasets.py index 3a23a25e19a1..3db7f10b0cea 100644 --- a/lib/galaxy/managers/datasets.py +++ b/lib/galaxy/managers/datasets.py @@ -547,7 +547,7 @@ def set_metadata(self, trans, dataset_assoc, overwrite=False, validate=True): if overwrite: self.overwrite_metadata(data) - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": data, "validate": validate}, @@ -883,7 +883,7 @@ def deserialize_datatype(self, item, key, val, **context): assert ( trans ), "Logic error in Galaxy, deserialize_datatype not send a transation object" # TODO: restructure this for stronger typing - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": item}, overwrite=False ) # overwrite is False as per existing behavior trans.app.job_manager.enqueue(job, tool=trans.app.datatypes_registry.set_external_metadata_tool) diff --git a/lib/galaxy/managers/histories.py b/lib/galaxy/managers/histories.py index 84db5dd38c2f..c56cf7406f99 100644 --- a/lib/galaxy/managers/histories.py +++ b/lib/galaxy/managers/histories.py @@ -424,7 +424,7 @@ def queue_history_export( # Run job to do export. history_exp_tool = trans.app.toolbox.get_tool(export_tool_id) - job, *_ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True) + job, *_ = history_exp_tool.execute(trans, incoming=params, history=history) trans.app.job_manager.enqueue(job, tool=history_exp_tool) return job diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index d0aa59f1a5fe..2d15127bdaf3 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -51,6 +51,7 @@ StoredWorkflow, ) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.tool_shed.util.repository_util import get_installed_repository from galaxy.tool_shed.util.shed_util_common import set_image_paths from galaxy.tool_util.deps import ( @@ -107,13 +108,13 @@ from galaxy.tools.actions import ( DefaultToolAction, ToolAction, - ToolExecutionCache, ) from galaxy.tools.actions.data_manager import DataManagerToolAction from galaxy.tools.actions.data_source import DataSourceToolAction from galaxy.tools.actions.model_operations import ModelOperationToolAction from galaxy.tools.cache import ToolDocumentCache from galaxy.tools.evaluation import global_tool_errors +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import JobImportHistoryArchiveWrapper from galaxy.tools.parameters import ( check_param, @@ -184,13 +185,16 @@ from galaxy.version import VERSION_MAJOR from galaxy.work.context import proxy_work_context_for_history from .execute import ( - DEFAULT_USE_CACHED_JOB, + DatasetCollectionElementsSliceT, DEFAULT_PREFERRED_OBJECT_STORE_ID, DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_USE_CACHED_JOB, execute as execute_job, ExecutionSlice, JobCallbackT, MappingParameters, + ToolParameterRequestInstanceT, + ToolParameterRequestT, ) if TYPE_CHECKING: @@ -1868,11 +1872,11 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy def handle_input( self, trans, - incoming, + incoming: ToolParameterRequestT, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, - input_format="legacy", + input_format: str = "legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, @@ -1964,7 +1968,7 @@ def handle_single_execution( resulting output data or an error message indicating the problem. """ try: - rval = self.execute( + rval = self._execute( trans, incoming=execution_slice.param_combination, history=history, @@ -2051,18 +2055,58 @@ def get_static_param_values(self, trans): args[key] = param.get_initial_value(trans, None) return args - def execute(self, trans, incoming=None, set_output_hid=True, history=None, **kwargs): + def execute( + self, trans, incoming: Optional[ToolParameterRequestInstanceT] = None, history: Optional[model.History] = None + ): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. + + _execute has many more options but should be accessed through + handle_single_execution. The public interface to execute should be + rarely used and in more specific ways. """ + return self._execute( + trans, + incoming=incoming, + history=history, + ) + + def _execute( + self, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[model.History] = None, + rerun_remap_job_id: Optional[int] = None, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = None, + completed_job: Optional[model.Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = None, + preferred_object_store_id: Optional[str] = None, + flush_job: bool = True, + skip: bool = True, + ): if incoming is None: incoming = {} try: return self.tool_action.execute( - self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs + self, + trans, + incoming=incoming, + history=history, + job_params=None, + rerun_remap_job_id=rerun_remap_job_id, + execution_cache=execution_cache, + dataset_collection_elements=dataset_collection_elements, + completed_job=completed_job, + collection_info=collection_info, + job_callback=job_callback, + preferred_object_store_id=preferred_object_store_id, + flush_job=flush_job, + skip=skip, ) except exceptions.ToolExecutionError as exc: job = exc.job @@ -2994,7 +3038,9 @@ class SetMetadataTool(Tool): requires_setting_metadata = False tool_action: "SetMetadataToolAction" - def regenerate_imported_metadata_if_needed(self, hda, history, user, session_id): + def regenerate_imported_metadata_if_needed( + self, hda: model.HistoryDatasetAssociation, history: model.History, user: model.User, session_id: int + ): if hda.has_metadata_files: job, *_ = self.tool_action.execute_via_app( self, diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index d7278e4286c4..b255258d5c32 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -9,6 +9,7 @@ cast, Dict, List, + Optional, Set, TYPE_CHECKING, Union, @@ -24,6 +25,7 @@ from galaxy.job_execution.actions.post import ActionBox from galaxy.managers.context import ProvidesHistoryContext from galaxy.model import ( + History, HistoryDatasetAssociation, Job, LibraryDatasetDatasetAssociation, @@ -31,12 +33,23 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.builder import CollectionBuilder +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools.execute import ( + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_USE_CACHED_JOB, + ExecutionSlice, + JobCallbackT, + MappingParameters, + ToolParameterRequestInstanceT, +) from galaxy.tools.execution_helpers import ( - ToolExecutionCache, filter_output, on_text_for_names, + ToolExecutionCache, ) from galaxy.tools.parameters import update_dataset_ids from galaxy.tools.parameters.basic import ( @@ -66,14 +79,15 @@ class ToolAction: """ @abstractmethod - def execute(self, tool, trans, incoming=None, set_output_hid=True, **kwargs): + def execute(self, tool, trans, incoming=None, **kwargs): pass class DefaultToolAction(ToolAction): """Default tool action is to run an external command""" - produces_real_jobs = True + produces_real_jobs: bool = True + set_output_hid: bool = True def _collect_input_datasets( self, @@ -366,20 +380,18 @@ def execute( self, tool, trans, - incoming=None, - return_job=False, - set_output_hid=True, - history=None, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, job_params=None, - rerun_remap_job_id=None, - execution_cache=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements=None, - completed_job=None, - collection_info=None, - job_callback=None, - preferred_object_store_id=None, - flush_job=True, - skip=False, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, ): """ Executes a tool, creating job and tool outputs, associating them, and @@ -401,6 +413,7 @@ def execute( preserved_hdca_tags, all_permissions, ) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info) + assert history # tell type system we've set history and it is no longer optional # Build name for output datasets based on tool name and input names on_text = self._get_on_text(inp_data) @@ -646,7 +659,7 @@ def handle_output(name, output, hidden=None): if name not in incoming and name not in child_dataset_names: # don't add already existing datasets, i.e. async created history.stage_addition(data) - history.add_pending_items(set_output_hid=set_output_hid) + history.add_pending_items(set_output_hid=self.set_output_hid) log.info(add_datasets_timer) job_setup_timer = ExecutionTimer() diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index d98764c405fd..5a77a38e5e19 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -18,9 +18,10 @@ class ImportHistoryToolAction(ToolAction): """Tool action used for importing a history to an archive.""" - produces_real_jobs = True + produces_real_jobs: bool = True + set_output_hid: bool = False - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + def execute(self, tool, trans, incoming=None, history=None, **kwargs): # # Create job. # @@ -78,9 +79,10 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr class ExportHistoryToolAction(ToolAction): """Tool action used for exporting a history to an archive.""" - produces_real_jobs = True + produces_real_jobs: bool = True + set_output_hid: bool = True - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + def execute(self, tool, trans, incoming=None, history=None, **kwargs): trans.check_user_activation() # # Get history to export. diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index 04db0ab30e17..c5ed0300cdd1 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -1,9 +1,18 @@ import logging import os from json import dumps +from typing import ( + Any, + Dict, + Optional, +) from galaxy.job_execution.datasets import DatasetPath from galaxy.metadata import get_metadata_compute_strategy +from galaxy.model import ( + History, + User, +) from galaxy.model.base import transaction from galaxy.util import asbool from . import ToolAction @@ -14,47 +23,62 @@ class SetMetadataToolAction(ToolAction): """Tool action used for setting external metadata on an existing dataset""" - produces_real_jobs = False + produces_real_jobs: bool = False + set_output_hid: bool = False - def execute( - self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, job_params=None, **kwargs - ): + def execute(self, tool, trans, incoming=None, history=None, job_params=None, **kwargs): """ Execute using a web transaction. """ + overwrite = True + job, odict = self.execute_via_trans( + tool, + trans, + incoming, + overwrite, + history, + job_params, + ) + # FIXME: can remove this when logging in execute_via_app method. + trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) + return job, odict + + def execute_via_trans( + self, + tool, + trans, + incoming: Optional[Dict[str, Any]], + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, + ): trans.check_user_activation() session = trans.get_galaxy_session() session_id = session and session.id history_id = trans.history and trans.history.id incoming = incoming or {} - job, odict = self.execute_via_app( + return self.execute_via_app( tool, trans.app, session_id, history_id, trans.user, incoming, - set_output_hid, - overwrite, history, job_params, ) - # FIXME: can remove this when logging in execute_via_app method. - trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) - return job, odict def execute_via_app( self, tool, app, - session_id, - history_id, - user=None, - incoming=None, - set_output_hid=False, - overwrite=True, - history=None, - job_params=None, + session_id: Optional[int], + history_id: Optional[int], + user: Optional[User] = None, + incoming: Optional[Dict[str, Any]] = None, + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, ): """ Execute using application. diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index bedba42d4aa3..e3c14f0d29a3 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -5,8 +5,8 @@ from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, - ToolExecutionCache, ) +from galaxy.tools.execution_helpers import ToolExecutionCache if TYPE_CHECKING: from galaxy.managers.context import ProvidesUserContext @@ -15,7 +15,7 @@ class ModelOperationToolAction(DefaultToolAction): - produces_real_jobs = False + produces_real_jobs: bool = False def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None): if execution_cache is None: @@ -33,8 +33,6 @@ def execute( tool, trans, incoming=None, - set_output_hid=False, - overwrite=True, history=None, job_params=None, execution_cache=None, diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 8ba38e948ef7..fcf500b372d2 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -52,9 +52,11 @@ ToolParameterRequestT = Dict[str, Any] # Input dictionary extracted from a tool request for running a tool individually ToolParameterRequestInstanceT = Dict[str, Any] +DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] DEFAULT_USE_CACHED_JOB = False DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None DEFAULT_RERUN_REMAP_JOB_ID: Optional[int] = None +DEFAULT_JOB_CALLBACK: Optional[JobCallbackT] = None class PartialJobExecution(Exception): @@ -78,7 +80,7 @@ def execute( workflow_invocation_uuid: Optional[str] = None, invocation_step: Optional[model.WorkflowInvocationStep] = None, max_num_jobs: Optional[int] = None, - job_callback: Optional[JobCallbackT] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, completed_jobs: Optional[CompletedJobsT] = None, workflow_resource_parameters: Optional[WorkflowResourceParametersT] = None, validate_outputs: bool = False, @@ -136,8 +138,8 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona execution_cache, completed_job, collection_info, - job_callback=job_callback, - preferred_object_store_id=preferred_object_store_id, + job_callback, + preferred_object_store_id, flush_job=False, skip=skip, ) @@ -237,7 +239,16 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona class ExecutionSlice: - def __init__(self, job_index, param_combination, dataset_collection_elements=None): + job_index: int + param_combination: ToolParameterRequestInstanceT + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] + + def __init__( + self, + job_index: int, + param_combination: ToolParameterRequestInstanceT, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = None, + ): self.job_index = job_index self.param_combination = param_combination self.dataset_collection_elements = dataset_collection_elements diff --git a/lib/galaxy/tools/execution_helpers.py b/lib/galaxy/tools/execution_helpers.py index 147a47de186d..66ae3c853681 100644 --- a/lib/galaxy/tools/execution_helpers.py +++ b/lib/galaxy/tools/execution_helpers.py @@ -3,6 +3,7 @@ Lower-level things that prevent interwoven dependencies between tool code, tool execution code, and tool action code. """ + import logging log = logging.getLogger(__name__) diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index d0eb5025d5c4..eeece34fe554 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -52,12 +52,12 @@ DefaultToolState, get_safe_version, ) -from galaxy.tools.actions import filter_output from galaxy.tools.execute import ( execute, MappingParameters, PartialJobExecution, ) +from galaxy.tools.execution_helpers import filter_output from galaxy.tools.expressions import do_eval from galaxy.tools.parameters import ( check_param, diff --git a/test/unit/app/tools/test_actions.py b/test/unit/app/tools/test_actions.py index 1e6b69528ac7..b63f2951cdfd 100644 --- a/test/unit/app/tools/test_actions.py +++ b/test/unit/app/tools/test_actions.py @@ -14,8 +14,8 @@ from galaxy.tools.actions import ( DefaultToolAction, determine_output_format, - on_text_for_names, ) +from galaxy.tools.execution_helpers import on_text_for_names from galaxy.util import XML from galaxy.util.unittest import TestCase