From 5b6c3952a41aeaa3c4d86b58ff89baf5b8e0dfc9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 18 Dec 2024 10:51:30 +0100 Subject: [PATCH] Record NO_REPLACEMENT as step output for unspecified value --- lib/galaxy/model/__init__.py | 2 +- lib/galaxy/tools/parameters/__init__.py | 19 ++++++++++-- lib/galaxy/tools/parameters/basic.py | 14 +++++++-- lib/galaxy/tools/parameters/workflow_utils.py | 9 ++++++ lib/galaxy/workflow/modules.py | 26 ++++++++-------- lib/galaxy/workflow/refactor/execute.py | 6 ++-- lib/galaxy/workflow/run.py | 30 +++++++++---------- lib/galaxy/workflow/run_request.py | 4 ++- .../workflow/default_values.gxwf-tests.yml | 3 +- .../default_values_optional.gxwf-tests.yml | 3 +- 10 files changed, 73 insertions(+), 43 deletions(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index e5057dd6bee4..ead37ac4e1a7 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -8108,7 +8108,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime): tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType) position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType) config: Mapped[Optional[bytes]] = mapped_column(JSONType) - order_index: Mapped[Optional[int]] + order_index: Mapped[int] when_expression: Mapped[Optional[bytes]] = mapped_column(JSONType) uuid: Mapped[Optional[Union[UUID, str]]] = mapped_column(UUIDType) label: Mapped[Optional[str]] = mapped_column(Unicode(255)) diff --git a/lib/galaxy/tools/parameters/__init__.py b/lib/galaxy/tools/parameters/__init__.py index ee3a8709817d..04c968910b45 100644 --- a/lib/galaxy/tools/parameters/__init__.py +++ b/lib/galaxy/tools/parameters/__init__.py @@ -31,6 +31,7 @@ ) from .workflow_utils import ( is_runtime_value, + NO_REPLACEMENT, runtime_to_json, ) from .wrapped import flat_to_nested_state @@ -180,8 +181,22 @@ def callback_helper(input, input_values, name_prefix, label_prefix, parent_prefi replace = new_value != no_replacement_value if replace: input_values[input.name] = new_value - elif replace_optional_connections and is_runtime_value(value) and hasattr(input, "value"): - input_values[input.name] = input.value + elif replace_optional_connections: + # Only used in workflow context + has_default = hasattr(input, "value") + if new_value is value is NO_REPLACEMENT: + # NO_REPLACEMENT means value was connected but left unspecified + if has_default: + # Use default if we have one + input_values[input.name] = input.value + else: + # Should fail if input is not optional and does not have default value + # Effectively however depends on parameter implementation. + # We might want to raise an exception here, instead of depending on a tool parameter value error. + input_values[input.name] = None + + elif is_runtime_value(value) and has_default: + input_values[input.name] = input.value def get_current_case(input, input_values): test_parameter = input.test_param diff --git a/lib/galaxy/tools/parameters/basic.py b/lib/galaxy/tools/parameters/basic.py index 34b325a0d123..1767c7e4e145 100644 --- a/lib/galaxy/tools/parameters/basic.py +++ b/lib/galaxy/tools/parameters/basic.py @@ -49,7 +49,10 @@ ParameterParseException, text_input_is_optional, ) -from galaxy.tools.parameters.workflow_utils import workflow_building_modes +from galaxy.tools.parameters.workflow_utils import ( + NO_REPLACEMENT, + workflow_building_modes, +) from galaxy.util import ( sanitize_param, string_as_bool, @@ -247,6 +250,8 @@ def to_python(self, value, app): def value_to_basic(self, value, app, use_security=False): if is_runtime_value(value): return runtime_to_json(value) + elif value == NO_REPLACEMENT: + return {"__class__": "NoReplacement"} return self.to_json(value, app, use_security) def value_from_basic(self, value, app, ignore_errors=False): @@ -255,8 +260,11 @@ def value_from_basic(self, value, app, ignore_errors=False): if isinstance(self, HiddenToolParameter): raise ParameterValueError(message_suffix="Runtime Parameter not valid", parameter_name=self.name) return runtime_to_object(value) - elif isinstance(value, MutableMapping) and value.get("__class__") == "UnvalidatedValue": - return value["value"] + elif isinstance(value, MutableMapping): + if value.get("__class__") == "UnvalidatedValue": + return value["value"] + elif value.get("__class__") == "NoReplacement": + return NO_REPLACEMENT # Delegate to the 'to_python' method if ignore_errors: try: diff --git a/lib/galaxy/tools/parameters/workflow_utils.py b/lib/galaxy/tools/parameters/workflow_utils.py index 73fcb688dfa1..178f08a773a6 100644 --- a/lib/galaxy/tools/parameters/workflow_utils.py +++ b/lib/galaxy/tools/parameters/workflow_utils.py @@ -1,6 +1,15 @@ from collections.abc import MutableMapping +class NoReplacement: + + def __str__(self): + return "NO_REPLACEMENT singleton" + + +NO_REPLACEMENT = NoReplacement() + + class workflow_building_modes: DISABLED = False ENABLED = True diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 23602ffaef0f..0ed5c5113004 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -96,6 +96,8 @@ from galaxy.tools.parameters.workflow_utils import ( ConnectedValue, is_runtime_value, + NO_REPLACEMENT, + NoReplacement, runtime_to_json, workflow_building_modes, ) @@ -129,14 +131,6 @@ POSSIBLE_PARAMETER_TYPES: Tuple[INPUT_PARAMETER_TYPES] = get_args(INPUT_PARAMETER_TYPES) -class NoReplacement: - def __str__(self): - return "NO_REPLACEMENT singleton" - - -NO_REPLACEMENT = NoReplacement() - - class ConditionalStepWhen(BooleanToolParameter): pass @@ -954,7 +948,7 @@ class InputModule(WorkflowModule): def get_runtime_state(self): state = DefaultToolState() - state.inputs = dict(input=None) + state.inputs = dict(input=NO_REPLACEMENT) return state def get_all_inputs(self, data_only=False, connectable_only=False): @@ -966,7 +960,7 @@ def execute( invocation = invocation_step.workflow_invocation step = invocation_step.workflow_step input_value = step.state.inputs["input"] - if input_value is None: + if input_value is NO_REPLACEMENT: default_value = step.get_input_default_value(NO_REPLACEMENT) if default_value is not NO_REPLACEMENT: input_value = raw_to_galaxy(trans.app, trans.history, default_value) @@ -993,7 +987,7 @@ def execute( # everything should come in from the API and this can be eliminated. if not invocation.has_input_for_step(step.id): content = next(iter(step_outputs.values())) - if content: + if content and content is not NO_REPLACEMENT: invocation.add_input(content, step.id) progress.set_outputs_for_input(invocation_step, step_outputs) return None @@ -1582,7 +1576,7 @@ def _parameter_def_list_to_options(parameter_value): def get_runtime_state(self): state = DefaultToolState() - state.inputs = dict(input=None) + state.inputs = dict(input=NO_REPLACEMENT) return state def get_all_outputs(self, data_only=False): @@ -1609,7 +1603,7 @@ def execute( input_value = progress.inputs_by_step_id[step.id] else: input_value = step.state.inputs["input"] - if input_value is None: + if input_value is NO_REPLACEMENT: default_value = step.get_input_default_value(NO_REPLACEMENT) # TODO: look at parameter type and infer if value should be a dictionary # instead. Guessing only field parameter types in CWL branch would have @@ -2266,7 +2260,11 @@ def decode_runtime_state(self, step, runtime_state): ) def execute( - self, trans, progress: "WorkflowProgress", invocation_step, use_cached_job: bool = False + self, + trans, + progress: "WorkflowProgress", + invocation_step: "WorkflowInvocationStep", + use_cached_job: bool = False, ) -> Optional[bool]: invocation = invocation_step.workflow_invocation step = invocation_step.workflow_step diff --git a/lib/galaxy/workflow/refactor/execute.py b/lib/galaxy/workflow/refactor/execute.py index 264fa843d830..1e4c22c52bc7 100644 --- a/lib/galaxy/workflow/refactor/execute.py +++ b/lib/galaxy/workflow/refactor/execute.py @@ -9,6 +9,7 @@ from galaxy.tools.parameters.basic import contains_workflow_parameter from galaxy.tools.parameters.workflow_utils import ( ConnectedValue, + NO_REPLACEMENT, runtime_to_json, ) from .schema import ( @@ -41,10 +42,7 @@ UpgradeSubworkflowAction, UpgradeToolAction, ) -from ..modules import ( - InputParameterModule, - NO_REPLACEMENT, -) +from ..modules import InputParameterModule log = logging.getLogger(__name__) diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index d634f678999a..5210a336d4d9 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,5 +1,6 @@ import logging import uuid +from collections.abc import MutableMapping from typing import ( Any, Dict, @@ -37,6 +38,10 @@ WarningReason, ) from galaxy.tools.parameters.basic import raw_to_galaxy +from galaxy.tools.parameters.workflow_utils import ( + NO_REPLACEMENT, + NoReplacement, +) from galaxy.tools.parameters.wrapped import nested_key_to_path from galaxy.util import ExecutionTimer from galaxy.workflow import modules @@ -432,11 +437,11 @@ def remaining_steps( def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[str, Any]): replacement: Union[ - modules.NoReplacement, + NoReplacement, model.DatasetCollectionInstance, List[model.DatasetCollectionInstance], HistoryItem, - ] = modules.NO_REPLACEMENT + ] = NO_REPLACEMENT prefixed_name = input_dict["name"] multiple = input_dict["multiple"] is_data = input_dict["input_type"] in ["dataset", "dataset_collection"] @@ -494,6 +499,8 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da dependent_workflow_step_id=output_step_id, ) ) + if isinstance(replacement, MutableMapping) and replacement.get("__class__") == "NoReplacement": + return NO_REPLACEMENT if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: @@ -574,19 +581,8 @@ def set_outputs_for_input( if self.inputs_by_step_id: step_id = step.id if step_id not in self.inputs_by_step_id and "output" not in outputs: - default_value = step.get_input_default_value(modules.NO_REPLACEMENT) - if default_value is not modules.NO_REPLACEMENT: - outputs["output"] = default_value - else: - log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") - raise modules.FailWorkflowEvaluation( - why=InvocationFailureOutputNotFound( - reason=FailureReason.output_not_found, - workflow_step_id=invocation_step.workflow_step_id, - output_name="output", - dependent_workflow_step_id=invocation_step.workflow_step_id, - ) - ) + default_value = step.get_input_default_value(NO_REPLACEMENT) + outputs["output"] = default_value elif step_id in self.inputs_by_step_id: if self.inputs_by_step_id[step_id] is not None or "output" not in outputs: outputs["output"] = self.inputs_by_step_id[step_id] @@ -620,7 +616,7 @@ def set_step_outputs( # Add this non-data, non workflow-output output to the workflow outputs. # This is required for recovering the output in the next scheduling iteration, # and should be replaced with a WorkflowInvocationStepOutputValue ASAP. - if not workflow_outputs_by_name.get(output_name) and not output_object == modules.NO_REPLACEMENT: + if not workflow_outputs_by_name.get(output_name) and output_object is not NO_REPLACEMENT: workflow_output = model.WorkflowOutput(step, output_name=output_name) step.workflow_outputs.append(workflow_output) for workflow_output in step.workflow_outputs: @@ -645,6 +641,8 @@ def set_step_outputs( ) def _record_workflow_output(self, step: "WorkflowStep", workflow_output: "WorkflowOutput", output: Any) -> None: + if output is NO_REPLACEMENT: + output = {"__class__": "NoReplacement"} self.workflow_invocation.add_output(workflow_output, step, output) def mark_step_outputs_delayed(self, step: "WorkflowStep", why: Optional[str] = None) -> None: diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index baeb5492dd6d..94b47feeec31 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -30,6 +30,7 @@ from galaxy.tool_util.parameters import DataRequestUri from galaxy.tools.parameters.basic import ParameterValueError from galaxy.tools.parameters.meta import expand_workflow_inputs +from galaxy.tools.parameters.workflow_utils import NO_REPLACEMENT from galaxy.workflow.modules import WorkflowModuleInjector from galaxy.workflow.resources import get_resource_mapper_function @@ -599,7 +600,8 @@ def add_parameter(name: str, value: str, type: WorkflowRequestInputParameter.typ type=param_types.REPLACEMENT_PARAMETERS, ) for step_id, content in run_config.inputs.items(): - workflow_invocation.add_input(content, step_id) + if content is not NO_REPLACEMENT: + workflow_invocation.add_input(content, step_id) for step_id, param_dict in run_config.param_map.items(): add_parameter( name=str(step_id), diff --git a/lib/galaxy_test/workflow/default_values.gxwf-tests.yml b/lib/galaxy_test/workflow/default_values.gxwf-tests.yml index 92d0ad6e03c5..f5f7de07fee7 100644 --- a/lib/galaxy_test/workflow/default_values.gxwf-tests.yml +++ b/lib/galaxy_test/workflow/default_values.gxwf-tests.yml @@ -8,7 +8,8 @@ - that: has_text text: "1" - doc: | - Test that null is replaced with default value (follows https://www.commonwl.org/v1.2/Workflow.html#WorkflowInputParameter) + Test that explicit null is not replaced and fails + expect_failure: true job: required_int_with_default: type: raw diff --git a/lib/galaxy_test/workflow/default_values_optional.gxwf-tests.yml b/lib/galaxy_test/workflow/default_values_optional.gxwf-tests.yml index 7af1432006cb..9cd1d0abc328 100644 --- a/lib/galaxy_test/workflow/default_values_optional.gxwf-tests.yml +++ b/lib/galaxy_test/workflow/default_values_optional.gxwf-tests.yml @@ -8,7 +8,8 @@ - that: has_text text: "1" - doc: | - Test that null is replaced with default value (follows https://www.commonwl.org/v1.2/Workflow.html#WorkflowInputParameter) + Test that explicit null is not replaced and fails + expect_failure: true job: optional_int_with_default: type: raw