Skip to content

Commit

Permalink
Implement recovering normalized workflow request from invocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Oct 14, 2024
1 parent fb70217 commit 99be1fb
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 67 deletions.
63 changes: 59 additions & 4 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8014,7 +8014,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime):
type: Mapped[Optional[str]] = mapped_column(String(64))
tool_id: Mapped[Optional[str]] = mapped_column(TEXT)
tool_version: Mapped[Optional[str]] = mapped_column(TEXT)
tool_inputs: Mapped[Optional[bytes]] = mapped_column(JSONType)
tool_inputs: Mapped[Dict[str, Any]] = mapped_column(JSONType)
tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType)
position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType)
config: Mapped[Optional[bytes]] = mapped_column(JSONType)
Expand Down Expand Up @@ -8084,11 +8084,13 @@ def init_on_load(self):
def tool_uuid(self):
return self.dynamic_tool and self.dynamic_tool.uuid

@property
def is_input_type(self) -> bool:
return bool(self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE)

@property
def input_type(self):
assert (
self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE
), "step.input_type can only be called on input step types"
assert self.is_input_type, "step.input_type can only be called on input step types"
return self.STEP_TYPE_TO_INPUT_TYPE[self.type]

@property
Expand Down Expand Up @@ -8309,6 +8311,15 @@ def log_str(self):
f"WorkflowStep[index={self.order_index},type={self.type},label={self.label},uuid={self.uuid},id={self.id}]"
)

@property
def effective_label(self) -> Optional[str]:
label = self.label
if label is not None:
return label
elif self.is_input_type:
return cast(Optional[str], self.tool_inputs.get("name"))
return None

def clear_module_extras(self):
# the module code adds random dynamic state to the step, this
# attempts to clear that.
Expand Down Expand Up @@ -9111,6 +9122,50 @@ def attach_step(request_to_content):
attach_step(request_to_content)
self.input_step_parameters.append(request_to_content)

def recover_inputs(self) -> Tuple[Dict[str, Any], str]:
inputs = {}
inputs_by = "name"

have_referenced_steps_by_order_index = False

def best_step_reference(workflow_step: "WorkflowStep") -> str:
label = workflow_step.effective_label
if label is not None:
return label
nonlocal have_referenced_steps_by_order_index
have_referenced_steps_by_order_index = True
return str(workflow_step.order_index)

def ensure_step(step: Optional["WorkflowStep"]) -> "WorkflowStep":
if step is None:
raise galaxy.exceptions.InconsistentDatabase(
"workflow input found without step definition, this should not happen"
)
assert step
return step

for input_dataset_assoc in self.input_datasets:
workflow_step = ensure_step(input_dataset_assoc.workflow_step)
input_dataset = input_dataset_assoc.dataset
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset

for input_dataset_collection_assoc in self.input_dataset_collections:
workflow_step = ensure_step(input_dataset_collection_assoc.workflow_step)
input_dataset_collection = input_dataset_collection_assoc.dataset_collection
input_index = best_step_reference(workflow_step)
inputs[input_index] = input_dataset_collection

for input_step_parameter_assoc in self.input_step_parameters:
workflow_step = ensure_step(input_step_parameter_assoc.workflow_step)
value = input_step_parameter_assoc.parameter_value
input_index = best_step_reference(workflow_step)
inputs[input_index] = value

if have_referenced_steps_by_order_index:
inputs_by = "name|step_index"
return inputs, inputs_by

def add_message(self, message: "InvocationMessageUnion"):
self.messages.append(
WorkflowInvocationMessage( # type:ignore[abstract]
Expand Down
60 changes: 57 additions & 3 deletions lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@
UpdateTimeField,
WithModelClass,
)
from .workflows import (
INPUTS_BY_DESCRIPTION,
PreferredIntermediateObjectStoreIdField,
PreferredObjectStoreIdField,
PreferredOutputsObjectStoreIdField,
ReplacementParametersField,
ResourceParametersField,
STEP_PARAMETERS_DESCRIPTION,
STEP_PARAMETERS_NORMALIZED_DESCRIPTION,
STEP_PARAMETERS_NORMALIZED_TITLE,
STEP_PARAMETERS_TITLE,
TargetHistoryIdField,
UseCachedJobField,
)

INVOCATION_STEP_OUTPUT_SRC = Literal["hda"]
INVOCATION_STEP_COLLECTION_OUTPUT_SRC = Literal["hdca"]
Expand Down Expand Up @@ -531,13 +545,16 @@ class InvocationOutputCollection(InvocationIOBase):
)


InvocationWorkflowIdField = Field(
title="Workflow ID", description="The encoded Workflow ID associated with the invocation."
)


class WorkflowInvocationCollectionView(Model, WithModelClass):
id: EncodedDatabaseIdField = InvocationIdField
create_time: datetime = CreateTimeField
update_time: datetime = UpdateTimeField
workflow_id: EncodedDatabaseIdField = Field(
title="Workflow ID", description="The encoded Workflow ID associated with the invocation."
)
workflow_id: EncodedDatabaseIdField = InvocationWorkflowIdField
history_id: EncodedDatabaseIdField = Field(
default=...,
title="History ID",
Expand Down Expand Up @@ -583,6 +600,43 @@ class WorkflowInvocationResponse(RootModel):
]


class WorkflowInvocationRequestModel(Model):
"""Model a workflow invocation request (InvokeWorkflowPayload) for an existing invocation."""

history_id: str = Field(
...,
title="History ID",
description="The encoded history id the workflow was run in.",
)
workflow_id: str = Field(title="Workflow ID", description="The encoded Workflow ID associated with the invocation.")
inputs: Dict[str, Any] = Field(
...,
title="Inputs",
description="Values for inputs",
)
inputs_by: str = Field(
...,
title="Inputs by",
description=INPUTS_BY_DESCRIPTION,
)
replacement_params: Optional[Dict[str, Any]] = ReplacementParametersField
resource_params: Optional[Dict[str, Any]] = ResourceParametersField
use_cached_job: bool = UseCachedJobField
preferred_object_store_id: Optional[str] = PreferredObjectStoreIdField
preferred_intermediate_object_store_id: Optional[str] = PreferredIntermediateObjectStoreIdField
preferred_outputs_object_store_id: Optional[str] = PreferredOutputsObjectStoreIdField
parameters_normalized: Literal[True] = Field(
True,
title=STEP_PARAMETERS_NORMALIZED_TITLE,
description=STEP_PARAMETERS_NORMALIZED_DESCRIPTION,
)
parameters: Optional[Dict[str, Any]] = Field(
None,
title=STEP_PARAMETERS_TITLE,
description=f"{STEP_PARAMETERS_DESCRIPTION} If these are set, the workflow was not executed in a best-practice fashion and we the resulting invocation request may not fully reflect the executed workflow state.",
)


class InvocationJobsSummaryBaseModel(Model):
id: EncodedDatabaseIdField = InvocationIdField
states: Dict[JobState, int] = Field(
Expand Down
126 changes: 79 additions & 47 deletions lib/galaxy/schema/workflows.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
from typing import (
Any,
cast,
Dict,
List,
Optional,
Union,
)

from pydantic import (
AfterValidator,
Field,
field_validator,
)
Expand All @@ -22,13 +24,73 @@
Organization,
PauseStep,
Person,
PreferredObjectStoreIdField,
StoredWorkflowSummary,
SubworkflowStep,
ToolStep,
WorkflowInput,
)

TargetHistoryIdField = Field(
None,
title="History ID",
# description="The history to import the workflow into.",
description="The encoded history id into which to import.",
)
INPUTS_BY_DESCRIPTION = (
"How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps."
)
STEP_PARAMETERS_NORMALIZED_TITLE = "Legacy Step Parameters Normalized"
STEP_PARAMETERS_NORMALIZED_DESCRIPTION = "Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID."
STEP_PARAMETERS_TITLE = "Legacy Step Parameters"
STEP_PARAMETERS_DESCRIPTION = "Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead."
ReplacementParametersField = Field(
{},
title="Replacement Parameters",
description="Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters",
)
UseCachedJobField = Field(
False,
title="Use cached job",
description="Indicated whether to use a cached job for workflow invocation.",
)
PreferredObjectStoreIdField = Field(
default=None,
title="Preferred Object Store ID",
description="The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences",
)
PreferredIntermediateObjectStoreIdField = Field(
None,
title="Preferred Intermediate Object Store ID",
description="The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences",
)
PreferredOutputsObjectStoreIdField = Field(
None,
title="Preferred Outputs Object Store ID",
description="The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.",
)
ResourceParametersField = Field(
{},
title="Resource Parameters",
description="If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.",
)

VALID_INPUTS_BY_ITEMS = ["step_id", "step_index", "step_uuid", "name"]


def validateInputsBy(inputsBy: Optional[str]) -> Optional[str]:
if inputsBy is not None:
if not isinstance(inputsBy, str):
raise ValueError(f"Invalid type for inputsBy {inputsBy}")
inputsByStr = cast(str, inputsBy)
inputsByArray: List[str] = inputsByStr.split("|")
for inputsByItem in inputsByArray:
if inputsByItem not in VALID_INPUTS_BY_ITEMS:
raise ValueError(f"Invalid inputsBy delineation {inputsByItem}")
return inputsBy


InputsByValidator = AfterValidator(validateInputsBy)


class GetTargetHistoryPayload(Model):
# TODO - Are the descriptions correct?
Expand All @@ -38,12 +100,7 @@ class GetTargetHistoryPayload(Model):
# description="The encoded history id - passed exactly like this 'hist_id=...' - to import the workflow into. Or the name of the new history to import the workflow into.",
description="The encoded history id - passed exactly like this 'hist_id=...' - into which to import. Or the name of the new history into which to import.",
)
history_id: Optional[str] = Field(
None,
title="History ID",
# description="The history to import the workflow into.",
description="The encoded history id into which to import.",
)
history_id: Optional[str] = TargetHistoryIdField
new_history_name: Optional[str] = Field(
None,
title="New History Name",
Expand Down Expand Up @@ -85,15 +142,11 @@ class InvokeWorkflowPayload(GetTargetHistoryPayload):
title="Allow tool state corrections",
description="Indicates if tool state corrections are allowed for workflow invocation.",
)
use_cached_job: Optional[bool] = Field(
False,
title="Use cached job",
description="Indicated whether to use a cached job for workflow invocation.",
)
use_cached_job: Optional[bool] = UseCachedJobField
parameters_normalized: Optional[bool] = Field(
False,
title="Parameters Normalized",
description="Indicates if parameters are already normalized for workflow invocation.",
title=STEP_PARAMETERS_NORMALIZED_TITLE,
description=STEP_PARAMETERS_NORMALIZED_DESCRIPTION,
)

@field_validator(
Expand All @@ -102,7 +155,6 @@ class InvokeWorkflowPayload(GetTargetHistoryPayload):
"ds_map",
"resource_params",
"replacement_params",
"step_parameters",
mode="before",
check_fields=False,
)
Expand All @@ -114,34 +166,22 @@ def inputs_string_to_json(cls, v):

parameters: Optional[Dict[str, Any]] = Field(
{},
title="Parameters",
description="The raw parameters for the workflow invocation.",
title=STEP_PARAMETERS_TITLE,
description=STEP_PARAMETERS_DESCRIPTION,
)
inputs: Optional[Dict[str, Any]] = Field(
None,
title="Inputs",
description="TODO",
description="Specify values for formal inputs to the workflow",
)
ds_map: Optional[Dict[str, Dict[str, Any]]] = Field(
{},
title="Dataset Map",
description="TODO",
)
resource_params: Optional[Dict[str, Any]] = Field(
{},
title="Resource Parameters",
description="TODO",
)
replacement_params: Optional[Dict[str, Any]] = Field(
{},
title="Replacement Parameters",
description="TODO",
)
step_parameters: Optional[Dict[str, Any]] = Field(
None,
title="Step Parameters",
description="TODO",
title="Legacy Dataset Map",
description="An older alternative to specifying inputs using database IDs, do not use this and use inputs instead",
deprecated=True,
)
resource_params: Optional[Dict[str, Any]] = ResourceParametersField
replacement_params: Optional[Dict[str, Any]] = ReplacementParametersField
no_add_to_history: Optional[bool] = Field(
False,
title="No Add to History",
Expand All @@ -152,29 +192,21 @@ def inputs_string_to_json(cls, v):
title="Legacy",
description="Indicating if to use legacy workflow invocation.",
)
inputs_by: Optional[str] = Field(
inputs_by: Annotated[Optional[str], InputsByValidator] = Field(
None,
title="Inputs By",
# lib/galaxy/workflow/run_request.py - see line 60
description="How inputs maps to inputs (datasets/collections) to workflows steps.",
description=INPUTS_BY_DESCRIPTION,
)
effective_outputs: Optional[Any] = Field(
None,
title="Effective Outputs",
# lib/galaxy/workflow/run_request.py - see line 455
description="TODO",
)
preferred_intermediate_object_store_id: Optional[str] = Field(
None,
title="Preferred Intermediate Object Store ID",
description="The ID of the ? object store that should be used to store ? datasets in this history.",
)
preferred_outputs_object_store_id: Optional[str] = Field(
None,
title="Preferred Outputs Object Store ID",
description="The ID of the object store that should be used to store ? datasets in this history.",
)
preferred_object_store_id: Optional[str] = PreferredObjectStoreIdField
preferred_intermediate_object_store_id: Optional[str] = PreferredIntermediateObjectStoreIdField
preferred_outputs_object_store_id: Optional[str] = PreferredOutputsObjectStoreIdField


class StoredWorkflowDetailed(StoredWorkflowSummary):
Expand Down
Loading

0 comments on commit 99be1fb

Please sign in to comment.