Skip to content

Commit

Permalink
WIP:CWL default file value_from work
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Nov 4, 2023
1 parent 1c3e245 commit 62a5c79
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
11 changes: 7 additions & 4 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,11 +1782,14 @@ def __module_from_dict(
step_input.merge_type = input_dict.get("merge_type", step_input.default_merge_type)
step_input.scatter_type = input_dict.get("scatter_type", step_input.default_scatter_type)
value_from = input_dict.get("value_from", None)
if value_from is None:
# Super hacky - we probably need distinct value from and
# default handling.
value_from = input_dict.get("default")
# if value_from is None:
# # Super hacky - we probably need distinct value from and
# # default handling.
# value_from = input_dict.get("default")
step_input.value_from = value_from
step_input.default_value = input_dict.get("default")
if step_input.default_value:
step_input.default_value_set = True

# Create the model class for the step
steps.append(step)
Expand Down
22 changes: 15 additions & 7 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class ConditionalStepWhen(BooleanToolParameter):
pass


def to_cwl(value, hda_references, step):
def to_cwl(value, hda_references, step, require_ok=True):
element_identifier = None
if isinstance(value, model.DatasetCollectionElement) and value.hda:
element_identifier = value.element_identifier
Expand All @@ -135,7 +135,8 @@ def to_cwl(value, hda_references, step):
if not value.dataset.in_ready_state():
why = "dataset [%s] is needed for valueFrom expression and is non-ready" % value.id
raise DelayedWorkflowEvaluation(why=why)
if not value.is_ok:
if require_ok and not value.is_ok:
# materialize and delay ?
raise FailWorkflowEvaluation(
why=InvocationFailureDatasetFailed(
reason=FailureReason.dataset_failed, hda_id=value.id, workflow_step_id=step.id
Expand Down Expand Up @@ -2209,14 +2210,17 @@ def decode_runtime_state(self, step, runtime_state):
f"Tool {self.tool_id} missing. Cannot recover runtime state.", tool_id=self.tool_id
)

def evaluate_value_from_expressions(self, progress, step, execution_state, extra_step_state):
def evaluate_value_from_expressions(self, progress, step, execution_state, extra_step_state, use_default=False):
value_from_expressions = {}
replacements: Dict = {}

for key in execution_state.inputs.keys():
step_input = step.inputs_by_name.get(key)
if step_input and step_input.value_from is not None:
value_from_expressions[key] = step_input.value_from
if step_input:
if step_input.value_from is not None and not use_default:
value_from_expressions[key] = step_input.value_from
else:
value_from_expressions[key] = step_input.default_value

if not value_from_expressions:
return replacements
Expand All @@ -2226,7 +2230,8 @@ def evaluate_value_from_expressions(self, progress, step, execution_state, extra
for key, value in extra_step_state.items():
step_state[key] = to_cwl(value, hda_references=hda_references, step=step)
for key, value in execution_state.inputs.items():
step_state[key] = to_cwl(value, hda_references=hda_references, step=step)
# require_ok = False for deferred datasets ... might instead need to materialize ??
step_state[key] = to_cwl(value, hda_references=hda_references, step=step, require_ok=False)

for key, value_from in value_from_expressions.items():
as_cwl_value = do_eval(
Expand Down Expand Up @@ -2373,11 +2378,14 @@ def callback(input, prefixed_name, **kwargs):
execution_state.inputs["__when_value__"] = when_value

unmatched_input_connections = expected_replacement_keys - found_replacement_keys
use_default = False
if unmatched_input_connections:
log.warning(f"Failed to use input connections for inputs [{unmatched_input_connections}]")
# Hack
use_default=True

expression_replacements = self.evaluate_value_from_expressions(
progress, step, execution_state, extra_step_state
progress, step, execution_state, extra_step_state, use_default=True
)

def expression_callback(input, prefixed_name, **kwargs):
Expand Down
12 changes: 10 additions & 2 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,24 @@ def replacement_for_input(self, trans, step: "WorkflowStep", input_dict: Dict[st
else:
replacement = temp
else:
if is_data:
for step_input in step.inputs:
if step_input.name == prefixed_name and step_input.value_from:
log.debug("THIS?")
log.debug("GOT DEFAULT VALUE %s, value_from %s", step_input.default_value, step_input.value_from)
return raw_to_galaxy(trans.app, trans.history, step_input.value_from)
replacement = self.replacement_for_input_connections(
step,
input_dict,
connection,
)
else:
for step_input in step.inputs:
if step_input.name == prefixed_name and step_input.default_value_set:
if step_input.name == prefixed_name:
log.debug("GOT DEFAULT VALUE %s, value_from %s", step_input.default_value, step_input.value_from)
if step_input.name == prefixed_name and (step_input.default_value or step_input.value_from):
if is_data:
replacement = raw_to_galaxy(trans.app, trans.history, step_input.default_value)
replacement = raw_to_galaxy(trans.app, trans.history, step_input.value_from or step_input.default_value)
return replacement

def replacement_for_connection(self, connection: "WorkflowStepConnection", is_data: bool = True) -> Any:
Expand Down

0 comments on commit 62a5c79

Please sign in to comment.