From ef27369680df2a5619b6cdfbc1683478a6039337 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 18 Dec 2024 10:51:30 +0100 Subject: [PATCH] Record NO_REPLACEMENT as step output for unspecified value --- lib/galaxy/tools/parameters/__init__.py | 19 ++++++++++-- lib/galaxy/tools/parameters/workflow_utils.py | 9 ++++++ lib/galaxy/workflow/modules.py | 16 +++++----- lib/galaxy/workflow/refactor/execute.py | 6 ++-- lib/galaxy/workflow/run.py | 30 +++++++++---------- 5 files changed, 49 insertions(+), 31 deletions(-) diff --git a/lib/galaxy/tools/parameters/__init__.py b/lib/galaxy/tools/parameters/__init__.py index ee3a8709817d..8bb957904bd6 100644 --- a/lib/galaxy/tools/parameters/__init__.py +++ b/lib/galaxy/tools/parameters/__init__.py @@ -31,6 +31,7 @@ ) from .workflow_utils import ( is_runtime_value, + NO_REPLACEMENT, runtime_to_json, ) from .wrapped import flat_to_nested_state @@ -180,8 +181,22 @@ def callback_helper(input, input_values, name_prefix, label_prefix, parent_prefi replace = new_value != no_replacement_value if replace: input_values[input.name] = new_value - elif replace_optional_connections and is_runtime_value(value) and hasattr(input, "value"): - input_values[input.name] = input.value + elif replace_optional_connections: + # Only used in workflow context + has_default = hasattr(input, "value") + if new_value is NO_REPLACEMENT: + # NO_REPLACEMENT means value was connected but left unspecified + if has_default: + # Use default if we have one + input_values[input.name] = input.value + else: + # Should fail if input is not optional and does not have default value + # Effectively however depends on parameter implementation. + # We might want to raise an exception here, instead of depending on a tool parameter value error. + input_values[input.name] = None + + elif is_runtime_value(value) and has_default: + input_values[input.name] = input.value def get_current_case(input, input_values): test_parameter = input.test_param diff --git a/lib/galaxy/tools/parameters/workflow_utils.py b/lib/galaxy/tools/parameters/workflow_utils.py index 73fcb688dfa1..178f08a773a6 100644 --- a/lib/galaxy/tools/parameters/workflow_utils.py +++ b/lib/galaxy/tools/parameters/workflow_utils.py @@ -1,6 +1,15 @@ from collections.abc import MutableMapping +class NoReplacement: + + def __str__(self): + return "NO_REPLACEMENT singleton" + + +NO_REPLACEMENT = NoReplacement() + + class workflow_building_modes: DISABLED = False ENABLED = True diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 23602ffaef0f..96c4eb46093d 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -96,6 +96,8 @@ from galaxy.tools.parameters.workflow_utils import ( ConnectedValue, is_runtime_value, + NO_REPLACEMENT, + NoReplacement, runtime_to_json, workflow_building_modes, ) @@ -129,14 +131,6 @@ POSSIBLE_PARAMETER_TYPES: Tuple[INPUT_PARAMETER_TYPES] = get_args(INPUT_PARAMETER_TYPES) -class NoReplacement: - def __str__(self): - return "NO_REPLACEMENT singleton" - - -NO_REPLACEMENT = NoReplacement() - - class ConditionalStepWhen(BooleanToolParameter): pass @@ -2266,7 +2260,11 @@ def decode_runtime_state(self, step, runtime_state): ) def execute( - self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False + self, + trans, + progress: "WorkflowProgress", + invocation_step: "WorkflowInvocationStep", + use_cached_job: bool = False, ) -> Optional[bool]: invocation = invocation_step.workflow_invocation step = invocation_step.workflow_step diff --git a/lib/galaxy/workflow/refactor/execute.py b/lib/galaxy/workflow/refactor/execute.py index 264fa843d830..1e4c22c52bc7 100644 --- a/lib/galaxy/workflow/refactor/execute.py +++ b/lib/galaxy/workflow/refactor/execute.py @@ -9,6 +9,7 @@ from galaxy.tools.parameters.basic import contains_workflow_parameter from galaxy.tools.parameters.workflow_utils import ( ConnectedValue, + NO_REPLACEMENT, runtime_to_json, ) from .schema import ( @@ -41,10 +42,7 @@ UpgradeSubworkflowAction, UpgradeToolAction, ) -from ..modules import ( - InputParameterModule, - NO_REPLACEMENT, -) +from ..modules import InputParameterModule log = logging.getLogger(__name__) diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index d634f678999a..ac52cb8bd22e 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,5 +1,6 @@ import logging import uuid +from collections.abc import MutableMapping from typing import ( Any, Dict, @@ -37,6 +38,10 @@ WarningReason, ) from galaxy.tools.parameters.basic import raw_to_galaxy +from galaxy.tools.parameters.workflow_utils import ( + NO_REPLACEMENT, + NoReplacement, +) from galaxy.tools.parameters.wrapped import nested_key_to_path from galaxy.util import ExecutionTimer from galaxy.workflow import modules @@ -432,11 +437,11 @@ def remaining_steps( def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]): replacement: Union[ - modules.NoReplacement, + NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], HistoryItem, - ] = modules.NO_REPLACEMENT + ] = NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] is_data = input_dict["input_type"] in ["dataset", "dataset_collection"] @@ -494,6 +499,8 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da dependent_workflow_step_id=output_step_id, ) ) + if isinstance(replacement, MutableMapping) and replacement.get("__class__") == "NoReplacement": + return NO_REPLACEMENT if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: @@ -574,19 +581,8 @@ def set_outputs_for_input( if self.inputs_by_step_id: step_id = step.id if step_id not in self.inputs_by_step_id and "output" not in outputs: - default_value = step.get_input_default_value(modules.NO_REPLACEMENT) - if default_value is not modules.NO_REPLACEMENT: - outputs["output"] = default_value - else: - log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") - raise modules.FailWorkflowEvaluation( - why=InvocationFailureOutputNotFound( - reason=FailureReason.output_not_found, - workflow_step_id=invocation_step.workflow_step_id, - output_name="output", - dependent_workflow_step_id=invocation_step.workflow_step_id, - ) - ) + default_value = step.get_input_default_value(NO_REPLACEMENT) + outputs["output"] = default_value elif step_id in self.inputs_by_step_id: if self.inputs_by_step_id[step_id] is not None or "output" not in outputs: outputs["output"] = self.inputs_by_step_id[step_id] @@ -620,7 +616,7 @@ def set_step_outputs( # Add this non-data, non workflow-output output to the workflow outputs. # This is required for recovering the output in the next scheduling iteration, # and should be replaced with a WorkflowInvocationStepOutputValue ASAP. - if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT: + if not workflow_outputs_by_name.get(output_name): workflow_output = model.WorkflowOutput(step, output_name=output_name) step.workflow_outputs.append(workflow_output) for workflow_output in step.workflow_outputs: @@ -645,6 +641,8 @@ def set_step_outputs( ) def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None: + if output is NO_REPLACEMENT: + output = {"__class__": "NoReplacement"} self.workflow_invocation.add_output(workflow_output, step, output) def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None: