diff --git a/lib/galaxy/managers/datasets.py b/lib/galaxy/managers/datasets.py index 3a23a25e19a1..3db7f10b0cea 100644 --- a/lib/galaxy/managers/datasets.py +++ b/lib/galaxy/managers/datasets.py @@ -547,7 +547,7 @@ def set_metadata(self, trans, dataset_assoc, overwrite=False, validate=True): if overwrite: self.overwrite_metadata(data) - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": data, "validate": validate}, @@ -883,7 +883,7 @@ def deserialize_datatype(self, item, key, val, **context): assert ( trans ), "Logic error in Galaxy, deserialize_datatype not send a transation object" # TODO: restructure this for stronger typing - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": item}, overwrite=False ) # overwrite is False as per existing behavior trans.app.job_manager.enqueue(job, tool=trans.app.datatypes_registry.set_external_metadata_tool) diff --git a/lib/galaxy/managers/histories.py b/lib/galaxy/managers/histories.py index 84db5dd38c2f..c56cf7406f99 100644 --- a/lib/galaxy/managers/histories.py +++ b/lib/galaxy/managers/histories.py @@ -424,7 +424,7 @@ def queue_history_export( # Run job to do export. history_exp_tool = trans.app.toolbox.get_tool(export_tool_id) - job, *_ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True) + job, *_ = history_exp_tool.execute(trans, incoming=params, history=history) trans.app.job_manager.enqueue(job, tool=history_exp_tool) return job diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index d0aa59f1a5fe..95b03fb5d3fa 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -51,6 +51,7 @@ StoredWorkflow, ) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.tool_shed.util.repository_util import get_installed_repository from galaxy.tool_shed.util.shed_util_common import set_image_paths from galaxy.tool_util.deps import ( @@ -107,13 +108,13 @@ from galaxy.tools.actions import ( DefaultToolAction, ToolAction, - ToolExecutionCache, ) from galaxy.tools.actions.data_manager import DataManagerToolAction from galaxy.tools.actions.data_source import DataSourceToolAction from galaxy.tools.actions.model_operations import ModelOperationToolAction from galaxy.tools.cache import ToolDocumentCache from galaxy.tools.evaluation import global_tool_errors +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import JobImportHistoryArchiveWrapper from galaxy.tools.parameters import ( check_param, @@ -184,13 +185,17 @@ from galaxy.version import VERSION_MAJOR from galaxy.work.context import proxy_work_context_for_history from .execute import ( - DEFAULT_USE_CACHED_JOB, + DatasetCollectionElementsSliceT, + DEFAULT_JOB_CALLBACK, DEFAULT_PREFERRED_OBJECT_STORE_ID, DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_USE_CACHED_JOB, execute as execute_job, ExecutionSlice, JobCallbackT, MappingParameters, + ToolParameterRequestInstanceT, + ToolParameterRequestT, ) if TYPE_CHECKING: @@ -1868,11 +1873,11 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy def handle_input( self, trans, - incoming, + incoming: ToolParameterRequestT, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, - input_format="legacy", + input_format: str = "legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, @@ -1964,7 +1969,7 @@ def handle_single_execution( resulting output data or an error message indicating the problem. """ try: - rval = self.execute( + rval = self._execute( trans, incoming=execution_slice.param_combination, history=history, @@ -2051,18 +2056,58 @@ def get_static_param_values(self, trans): args[key] = param.get_initial_value(trans, None) return args - def execute(self, trans, incoming=None, set_output_hid=True, history=None, **kwargs): + def execute( + self, trans, incoming: Optional[ToolParameterRequestInstanceT] = None, history: Optional[model.History] = None + ): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. + + _execute has many more options but should be accessed through + handle_single_execution. The public interface to execute should be + rarely used and in more specific ways. """ + return self._execute( + trans, + incoming=incoming, + history=history, + ) + + def _execute( + self, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[model.History] = None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = None, + completed_job: Optional[model.Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = True, + ): if incoming is None: incoming = {} try: return self.tool_action.execute( - self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs + self, + trans, + incoming=incoming, + history=history, + job_params=None, + rerun_remap_job_id=rerun_remap_job_id, + execution_cache=execution_cache, + dataset_collection_elements=dataset_collection_elements, + completed_job=completed_job, + collection_info=collection_info, + job_callback=job_callback, + preferred_object_store_id=preferred_object_store_id, + flush_job=flush_job, + skip=skip, ) except exceptions.ToolExecutionError as exc: job = exc.job @@ -2994,7 +3039,9 @@ class SetMetadataTool(Tool): requires_setting_metadata = False tool_action: "SetMetadataToolAction" - def regenerate_imported_metadata_if_needed(self, hda, history, user, session_id): + def regenerate_imported_metadata_if_needed( + self, hda: model.HistoryDatasetAssociation, history: model.History, user: model.User, session_id: int + ): if hda.has_metadata_files: job, *_ = self.tool_action.execute_via_app( self, diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index d7278e4286c4..f5e04b231280 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -9,7 +9,9 @@ cast, Dict, List, + Optional, Set, + Tuple, TYPE_CHECKING, Union, ) @@ -24,6 +26,7 @@ from galaxy.job_execution.actions.post import ActionBox from galaxy.managers.context import ProvidesHistoryContext from galaxy.model import ( + History, HistoryDatasetAssociation, Job, LibraryDatasetDatasetAssociation, @@ -31,12 +34,22 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.builder import CollectionBuilder +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) from galaxy.tools.execution_helpers import ( - ToolExecutionCache, filter_output, on_text_for_names, + ToolExecutionCache, ) from galaxy.tools.parameters import update_dataset_ids from galaxy.tools.parameters.basic import ( @@ -59,6 +72,10 @@ log = logging.getLogger(__name__) +OutputDatasetsT = Dict[str, DatasetInstance] +ToolActionExecuteResult = Union[Tuple[Job, OutputDatasetsT, History], Tuple[Job, OutputDatasetsT]] + + class ToolAction: """ The actions to be taken when a tool is run (after parameters have @@ -66,14 +83,31 @@ class ToolAction: """ @abstractmethod - def execute(self, tool, trans, incoming=None, set_output_hid=True, **kwargs): - pass + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: + """Perform target tool action.""" class DefaultToolAction(ToolAction): """Default tool action is to run an external command""" - produces_real_jobs = True + produces_real_jobs: bool = True + set_output_hid: bool = True def _collect_input_datasets( self, @@ -366,21 +400,19 @@ def execute( self, tool, trans, - incoming=None, - return_job=False, - set_output_hid=True, - history=None, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, job_params=None, - rerun_remap_job_id=None, - execution_cache=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements=None, - completed_job=None, - collection_info=None, - job_callback=None, - preferred_object_store_id=None, - flush_job=True, - skip=False, - ): + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use @@ -401,6 +433,7 @@ def execute( preserved_hdca_tags, all_permissions, ) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info) + assert history # tell type system we've set history and it is no longer optional # Build name for output datasets based on tool name and input names on_text = self._get_on_text(inp_data) @@ -646,7 +679,7 @@ def handle_output(name, output, hidden=None): if name not in incoming and name not in child_dataset_names: # don't add already existing datasets, i.e. async created history.stage_addition(data) - history.add_pending_items(set_output_hid=set_output_hid) + history.add_pending_items(set_output_hid=self.set_output_hid) log.info(add_datasets_timer) job_setup_timer = ExecutionTimer() diff --git a/lib/galaxy/tools/actions/data_manager.py b/lib/galaxy/tools/actions/data_manager.py index ce7d69e1048c..31135b5f7868 100644 --- a/lib/galaxy/tools/actions/data_manager.py +++ b/lib/galaxy/tools/actions/data_manager.py @@ -1,7 +1,26 @@ import logging +from typing import Optional from galaxy.model.base import transaction -from . import DefaultToolAction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.models import ( + History, + Job, +) +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache +from . import ( + DefaultToolAction, + ToolActionExecuteResult, +) log = logging.getLogger(__name__) @@ -9,8 +28,39 @@ class DataManagerToolAction(DefaultToolAction): """Tool action used for Data Manager Tools""" - def execute(self, tool, trans, **kwds): - rval = super().execute(tool, trans, **kwds) + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: + rval = super().execute( + tool, + trans, + incoming=incoming, + history=history, + job_params=job_params, + rerun_remap_job_id=rerun_remap_job_id, + execution_cache=execution_cache, + dataset_collection_elements=dataset_collection_elements, + completed_job=completed_job, + collection_info=collection_info, + job_callback=job_callback, + preferred_object_store_id=preferred_object_store_id, + flush_job=flush_job, + skip=skip, + ) if isinstance(rval, tuple) and len(rval) >= 2 and isinstance(rval[0], trans.app.model.Job): assoc = trans.app.model.DataManagerJobAssociation(job=rval[0], data_manager_id=tool.data_manager_id) trans.sa_session.add(assoc) diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index d98764c405fd..4d7fdc3baeb9 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -2,10 +2,29 @@ import logging import os import tempfile +from typing import Optional from galaxy.job_execution.setup import create_working_directory_for_job from galaxy.model.base import transaction -from galaxy.tools.actions import ToolAction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.models import ( + History, + Job, +) +from galaxy.tools.actions import ( + ToolAction, + ToolActionExecuteResult, +) +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import ( JobExportHistoryArchiveWrapper, JobImportHistoryArchiveWrapper, @@ -18,9 +37,26 @@ class ImportHistoryToolAction(ToolAction): """Tool action used for importing a history to an archive.""" - produces_real_jobs = True - - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + produces_real_jobs: bool = True + set_output_hid: bool = False + + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: # # Create job. # @@ -78,9 +114,26 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr class ExportHistoryToolAction(ToolAction): """Tool action used for exporting a history to an archive.""" - produces_real_jobs = True - - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + produces_real_jobs: bool = True + set_output_hid: bool = True + + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: trans.check_user_activation() # # Get history to export. diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index 04db0ab30e17..bef1795a6be7 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -1,10 +1,31 @@ import logging import os from json import dumps +from typing import ( + Any, + Dict, + Optional, +) from galaxy.job_execution.datasets import DatasetPath from galaxy.metadata import get_metadata_compute_strategy +from galaxy.model import ( + History, + Job, + User, +) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import asbool from . import ToolAction @@ -14,47 +35,79 @@ class SetMetadataToolAction(ToolAction): """Tool action used for setting external metadata on an existing dataset""" - produces_real_jobs = False + produces_real_jobs: bool = False + set_output_hid: bool = False def execute( - self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, job_params=None, **kwargs + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, ): """ Execute using a web transaction. """ + overwrite = True + job, odict = self.execute_via_trans( + tool, + trans, + incoming, + overwrite, + history, + job_params, + ) + # FIXME: can remove this when logging in execute_via_app method. + trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) + return job, odict + + def execute_via_trans( + self, + tool, + trans, + incoming: Optional[Dict[str, Any]], + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, + ): trans.check_user_activation() session = trans.get_galaxy_session() session_id = session and session.id history_id = trans.history and trans.history.id incoming = incoming or {} - job, odict = self.execute_via_app( + return self.execute_via_app( tool, trans.app, session_id, history_id, trans.user, incoming, - set_output_hid, overwrite, history, job_params, ) - # FIXME: can remove this when logging in execute_via_app method. - trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) - return job, odict def execute_via_app( self, tool, app, - session_id, - history_id, - user=None, - incoming=None, - set_output_hid=False, - overwrite=True, - history=None, - job_params=None, + session_id: Optional[int], + history_id: Optional[int], + user: Optional[User] = None, + incoming: Optional[Dict[str, Any]] = None, + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, ): """ Execute using application. diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index bedba42d4aa3..76360882e9a3 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -1,12 +1,31 @@ import logging -from typing import TYPE_CHECKING +from typing import ( + Optional, + TYPE_CHECKING, +) +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.models import ( + History, + Job, +) from galaxy.objectstore import ObjectStorePopulator from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, - ToolExecutionCache, + OutputDatasetsT, + ToolActionExecuteResult, ) +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache if TYPE_CHECKING: from galaxy.managers.context import ProvidesUserContext @@ -15,7 +34,7 @@ class ModelOperationToolAction(DefaultToolAction): - produces_real_jobs = False + produces_real_jobs: bool = False def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None): if execution_cache is None: @@ -32,17 +51,19 @@ def execute( self, tool, trans, - incoming=None, - set_output_hid=False, - overwrite=True, - history=None, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, job_params=None, - execution_cache=None, - collection_info=None, - job_callback=None, - skip=False, - **kwargs, - ): + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: incoming = incoming or {} trans.check_user_activation() @@ -65,7 +86,7 @@ def execute( # wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed wrapped_params = self._wrapped_params(trans, tool, incoming) - out_data = {} + out_data: OutputDatasetsT = {} input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()} output_collections = OutputCollections( trans, @@ -73,7 +94,7 @@ def execute( tool=tool, tool_action=self, input_collections=input_collections, - dataset_collection_elements=kwargs.get("dataset_collection_elements", None), + dataset_collection_elements=dataset_collection_elements, on_text=on_text, incoming=incoming, params=wrapped_params.params, diff --git a/lib/galaxy/tools/actions/upload.py b/lib/galaxy/tools/actions/upload.py index b38a85419513..96fa480b632a 100644 --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -1,14 +1,33 @@ import json import logging import os +from typing import Optional from galaxy.exceptions import RequestParameterMissingException from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.dataset_collections.structure import UninitializedTree +from galaxy.models import ( + History, + Job, +) from galaxy.tools.actions import upload_common +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import ExecutionTimer from galaxy.util.bunch import Bunch -from . import ToolAction +from . import ( + ToolAction, + ToolActionExecuteResult, +) log = logging.getLogger(__name__) @@ -16,7 +35,23 @@ class BaseUploadToolAction(ToolAction): produces_real_jobs = True - def execute(self, tool, trans, incoming=None, history=None, **kwargs): + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: trans.check_user_activation() incoming = incoming or {} dataset_upload_inputs = [] diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 8ba38e948ef7..a8bd63d9ea04 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -52,9 +52,12 @@ ToolParameterRequestT = Dict[str, Any] # Input dictionary extracted from a tool request for running a tool individually ToolParameterRequestInstanceT = Dict[str, Any] +DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] DEFAULT_USE_CACHED_JOB = False DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None DEFAULT_RERUN_REMAP_JOB_ID: Optional[int] = None +DEFAULT_JOB_CALLBACK: Optional[JobCallbackT] = None +DEFAULT_DATASET_COLLECTION_ELEMENTS: Optional[DatasetCollectionElementsSliceT] = None class PartialJobExecution(Exception): @@ -78,7 +81,7 @@ def execute( workflow_invocation_uuid: Optional[str] = None, invocation_step: Optional[model.WorkflowInvocationStep] = None, max_num_jobs: Optional[int] = None, - job_callback: Optional[JobCallbackT] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, completed_jobs: Optional[CompletedJobsT] = None, workflow_resource_parameters: Optional[WorkflowResourceParametersT] = None, validate_outputs: bool = False, @@ -136,8 +139,8 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona execution_cache, completed_job, collection_info, - job_callback=job_callback, - preferred_object_store_id=preferred_object_store_id, + job_callback, + preferred_object_store_id, flush_job=False, skip=skip, ) @@ -237,7 +240,17 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona class ExecutionSlice: - def __init__(self, job_index, param_combination, dataset_collection_elements=None): + job_index: int + param_combination: ToolParameterRequestInstanceT + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] + history: Optional[model.History] + + def __init__( + self, + job_index: int, + param_combination: ToolParameterRequestInstanceT, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + ): self.job_index = job_index self.param_combination = param_combination self.dataset_collection_elements = dataset_collection_elements diff --git a/lib/galaxy/tools/execution_helpers.py b/lib/galaxy/tools/execution_helpers.py index 147a47de186d..66ae3c853681 100644 --- a/lib/galaxy/tools/execution_helpers.py +++ b/lib/galaxy/tools/execution_helpers.py @@ -3,6 +3,7 @@ Lower-level things that prevent interwoven dependencies between tool code, tool execution code, and tool action code. """ + import logging log = logging.getLogger(__name__) diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index d0eb5025d5c4..eeece34fe554 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -52,12 +52,12 @@ DefaultToolState, get_safe_version, ) -from galaxy.tools.actions import filter_output from galaxy.tools.execute import ( execute, MappingParameters, PartialJobExecution, ) +from galaxy.tools.execution_helpers import filter_output from galaxy.tools.expressions import do_eval from galaxy.tools.parameters import ( check_param, diff --git a/test/unit/app/tools/test_actions.py b/test/unit/app/tools/test_actions.py index 1e6b69528ac7..b63f2951cdfd 100644 --- a/test/unit/app/tools/test_actions.py +++ b/test/unit/app/tools/test_actions.py @@ -14,8 +14,8 @@ from galaxy.tools.actions import ( DefaultToolAction, determine_output_format, - on_text_for_names, ) +from galaxy.tools.execution_helpers import on_text_for_names from galaxy.util import XML from galaxy.util.unittest import TestCase