Skip to content

Commit

Permalink
Merge pull request #18535 from mvdbeek/runtime_input_map_over_fix
Browse files Browse the repository at this point in the history
[24.1] Fix map over calculation for runtime inputs
  • Loading branch information
mvdbeek authored Jul 17, 2024
2 parents d184979 + b403bf8 commit 4de96cc
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
31 changes: 30 additions & 1 deletion lib/galaxy/tools/parameters/wrapped.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Any,
Dict,
List,
Sequence,
Union,
)

from galaxy.tools.parameters.basic import (
Expand Down Expand Up @@ -187,11 +189,38 @@ def process_key(incoming_key: str, incoming_value: Any, d: Dict[str, Any]):
process_key("|".join(key_parts[1:]), incoming_value=incoming_value, d=subdict)


def nested_key_to_path(key: str) -> Sequence[Union[str, int]]:
"""
Convert a tool state key that is separated with '|' and '_n' into path iterable.
E.g. "cond|repeat_0|paramA" -> ["cond", "repeat", 0, "paramA"].
Return value can be used with `boltons.iterutils.get_path`.
"""
path: List[Union[str, int]] = []
key_parts = key.split("|")
if len(key_parts) == 1:
return key_parts
for key_part in key_parts:
if "_" in key_part:
input_name, _index = key_part.rsplit("_", 1)
if _index.isdigit():
path.extend((input_name, int(_index)))
continue
path.append(key_part)
return path


def flat_to_nested_state(incoming: Dict[str, Any]):
nested_state: Dict[str, Any] = {}
for key, value in incoming.items():
process_key(key, value, nested_state)
return nested_state


__all__ = ("LegacyUnprefixedDict", "WrappedParameters", "make_dict_copy", "process_key", "flat_to_nested_state")
__all__ = (
"LegacyUnprefixedDict",
"WrappedParameters",
"make_dict_copy",
"process_key",
"flat_to_nested_state",
"nested_key_to_path",
)
6 changes: 6 additions & 0 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Union,
)

from boltons.iterutils import get_path
from typing_extensions import Protocol

from galaxy import model
Expand All @@ -35,6 +36,7 @@
WarningReason,
)
from galaxy.tools.parameters.basic import raw_to_galaxy
from galaxy.tools.parameters.wrapped import nested_key_to_path
from galaxy.util import ExecutionTimer
from galaxy.workflow import modules
from galaxy.workflow.run_request import (
Expand Down Expand Up @@ -444,6 +446,10 @@ def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[st
replacement = temp
else:
replacement = self.replacement_for_connection(connection[0], is_data=is_data)
elif step.state and (state_input := get_path(step.state.inputs, nested_key_to_path(prefixed_name), None)):
# workflow submitted with step parameters populates state directly
# via populate_module_and_state
replacement = state_input
else:
for step_input in step.inputs:
if step_input.name == prefixed_name and step_input.default_value_set:
Expand Down
29 changes: 29 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6982,6 +6982,35 @@ def test_parameter_substitution_validation_value_errors_0(self):
# Take a valid stat and make it invalid, assert workflow won't run.
self._assert_status_code_is(invocation_response, 400)

@skip_without_tool("collection_paired_test")
def test_run_map_over_with_step_parameter_dict(self):
# Tests what the legacy run form submits
with self.dataset_populator.test_history() as history_id:
hdca = self.dataset_collection_populator.create_list_of_pairs_in_history(history_id).json()["outputs"][0]
workflow_id = self._upload_yaml_workflow(
"""
class: GalaxyWorkflow
steps:
"0":
tool_id: collection_paired_conditional_structured_like
state:
cond:
input1:
__class__: RuntimeValue
"""
)
workflow_request = {
"history": f"hist_id={history_id}",
"parameters": dumps({"0": {"cond|input1": {"values": [{"id": hdca["id"], "src": "hdca"}]}}}),
"parameters_normalized": True,
}
url = f"workflows/{workflow_id}/invocations"
invocation_response = self._post(url, data=workflow_request, json=True)
invocation_response.raise_for_status()
self.workflow_populator.wait_for_invocation_and_jobs(
history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_response.json()["id"]
)

@skip_without_tool("validation_default")
def test_parameter_substitution_validation_value_errors_1(self):
substitions = dict(select_param='" ; echo "moo')
Expand Down
20 changes: 19 additions & 1 deletion test/unit/app/tools/test_parameter_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,28 @@
Dict,
)

from galaxy.tools.parameters.wrapped import process_key
from galaxy.tools.parameters.wrapped import (
nested_key_to_path,
process_key,
)
from .util import BaseParameterTestCase


def test_nested_key_to_path():
assert nested_key_to_path("param") == ["param"]
assert nested_key_to_path("param_x") == ["param_x"]
assert nested_key_to_path("cond|param_x") == ["cond", "param_x"]
assert nested_key_to_path("param_") == ["param_"]
assert nested_key_to_path("cond|param_") == ["cond", "param_"]
assert nested_key_to_path("repeat_1|inner_repeat_1|data_table_column_value") == [
"repeat",
1,
"inner_repeat",
1,
"data_table_column_value",
]


class TestProcessKey:
def test_process_key(self):
nested_dict: Dict[str, Any] = {}
Expand Down

0 comments on commit 4de96cc

Please sign in to comment.