-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Workflow tool state validation plumbing.
- Loading branch information
Showing
20 changed files
with
873 additions
and
0 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
"""Abstractions for reasoning about tool state within Galaxy workflows. | ||
Like everything else in galaxy-tool-util, this package should be independent of | ||
Galaxy's runtime. It is meant to provide utilities for reasonsing about tool state | ||
(largely building on the abstractions in galaxy.tool_util.parameters) within the | ||
context of workflows. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from typing import ( | ||
Any, | ||
Dict, | ||
Union, | ||
) | ||
|
||
from typing_extensions import ( | ||
Literal, | ||
Protocol, | ||
) | ||
|
||
from galaxy.tool_util.models import ParsedTool | ||
|
||
NativeWorkflowDict = Dict[str, Any] | ||
Format2WorkflowDict = Dict[str, Any] | ||
AnyWorkflowDict = Union[NativeWorkflowDict, Format2WorkflowDict] | ||
WorkflowFormat = Literal["gxformat2", "native"] | ||
NativeStepDict = Dict[str, Any] | ||
Format2StepDict = Dict[str, Any] | ||
NativeToolStateDict = Dict[str, Any] | ||
Format2StateDict = Dict[str, Any] | ||
|
||
|
||
class GetToolInfo(Protocol): | ||
"""An interface for fetching tool information for steps in a workflow.""" | ||
|
||
def get_tool_info(self, tool_id: str, tool_version: str) -> ParsedTool: ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from typing import ( | ||
Any, | ||
cast, | ||
Optional, | ||
Union, | ||
) | ||
|
||
|
||
def validate_explicit_conditional_test_value(test_parameter_name: str, value: Any) -> Optional[Union[str, bool]]: | ||
if value is not None and not isinstance(value, (str, bool)): | ||
raise Exception(f"Invalid conditional test value ({value}) for parameter ({test_parameter_name})") | ||
return cast(Optional[Union[str, bool]], value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
from typing import ( | ||
Dict, | ||
List, | ||
Optional, | ||
) | ||
|
||
from pydantic import ( | ||
BaseModel, | ||
Field, | ||
) | ||
|
||
from galaxy.tool_util.models import ParsedTool | ||
from galaxy.tool_util.parameters import ToolParameterT | ||
from ._types import ( | ||
Format2StateDict, | ||
GetToolInfo, | ||
NativeStepDict, | ||
) | ||
from .validation_format2 import validate_step_against | ||
from .validation_native import ( | ||
get_parsed_tool_for_native_step, | ||
native_tool_state, | ||
validate_native_step_against, | ||
) | ||
|
||
Format2InputsDictT = Dict[str, str] | ||
|
||
|
||
class Format2State(BaseModel): | ||
state: Format2StateDict | ||
inputs: Format2InputsDictT = Field(alias="in") | ||
|
||
|
||
class ConversionValidationFailure(Exception): | ||
pass | ||
|
||
|
||
def convert_state_to_format2(native_step_dict: NativeStepDict, get_tool_info: GetToolInfo) -> Format2State: | ||
parsed_tool = get_parsed_tool_for_native_step(native_step_dict, get_tool_info) | ||
return convert_state_to_format2_using(native_step_dict, parsed_tool) | ||
|
||
|
||
def convert_state_to_format2_using(native_step_dict: NativeStepDict, parsed_tool: Optional[ParsedTool]) -> Format2State: | ||
"""Create a "clean" gxformat2 workflow tool state from a native workflow step. | ||
gxformat2 does not know about tool specifications so it cannot reason about the native | ||
tool state attribute and just copies it as is. This native state can be pretty ugly. The purpose | ||
of this function is to build a cleaned up state to replace the gxformat2 copied native tool_state | ||
with that is more readable and has stronger typing by using the tool's inputs to guide | ||
the conversion (the parsed_tool parameter). | ||
This method validates both the native tool state and the resulting gxformat2 tool state | ||
so that we can be more confident the conversion doesn't corrupt the workflow. If no meta | ||
model to validate against is supplied or if either validation fails this method throws | ||
ConversionValidationFailure to signal the caller to just use the native tool state as is | ||
instead of trying to convert it to a cleaner gxformat2 tool state - under the assumption | ||
it is better to have an "ugly" workflow than a corrupted one during conversion. | ||
""" | ||
if parsed_tool is None: | ||
raise ConversionValidationFailure("Could not resolve tool inputs") | ||
try: | ||
validate_native_step_against(native_step_dict, parsed_tool) | ||
except Exception: | ||
raise ConversionValidationFailure( | ||
"Failed to validate native step - not going to convert a tool state that isn't understood" | ||
) | ||
result = _convert_valid_state_to_format2(native_step_dict, parsed_tool) | ||
print(result.dict()) | ||
try: | ||
validate_step_against(result.dict(), parsed_tool) | ||
except Exception: | ||
raise ConversionValidationFailure( | ||
"Failed to validate resulting cleaned step - not going to convert to an unvalidated tool state" | ||
) | ||
return result | ||
|
||
|
||
def _convert_valid_state_to_format2(native_step_dict: NativeStepDict, parsed_tool: ParsedTool) -> Format2State: | ||
format2_state: Format2StateDict = {} | ||
format2_in: Format2InputsDictT = {} | ||
|
||
root_tool_state = native_tool_state(native_step_dict) | ||
tool_inputs = parsed_tool.inputs | ||
_convert_state_level(native_step_dict, tool_inputs, root_tool_state, format2_state, format2_in) | ||
return Format2State( | ||
**{ | ||
"state": format2_state, | ||
"in": format2_in, | ||
} | ||
) | ||
|
||
|
||
def _convert_state_level( | ||
step: NativeStepDict, | ||
tool_inputs: List[ToolParameterT], | ||
native_state: dict, | ||
format2_state_at_level: dict, | ||
format2_in: Format2InputsDictT, | ||
prefix: Optional[str] = None, | ||
) -> None: | ||
for tool_input in tool_inputs: | ||
_convert_state_at_level(step, tool_input, native_state, format2_state_at_level, format2_in, prefix) | ||
|
||
|
||
def _convert_state_at_level( | ||
step: NativeStepDict, | ||
tool_input: ToolParameterT, | ||
native_state_at_level: dict, | ||
format2_state_at_level: dict, | ||
format2_in: Format2InputsDictT, | ||
prefix: str, | ||
) -> None: | ||
parameter_type = tool_input.parameter_type | ||
parameter_name = tool_input.name | ||
value = native_state_at_level.get(parameter_name, None) | ||
state_path = parameter_name if prefix is None else f"{prefix}|{parameter_name}" | ||
if parameter_type == "gx_integer": | ||
# check for runtime input | ||
format2_value = int(value) | ||
format2_state_at_level[parameter_name] = format2_value | ||
elif parameter_type == "gx_data": | ||
input_connections = step.get("input_connections", {}) | ||
print(state_path) | ||
print(input_connections) | ||
if state_path in input_connections: | ||
format2_in[state_path] = "placeholder" | ||
else: | ||
pass | ||
# raise NotImplementedError(f"Unhandled parameter type {parameter_type}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from ._types import ( | ||
AnyWorkflowDict, | ||
GetToolInfo, | ||
WorkflowFormat, | ||
) | ||
from .validation_format2 import validate_workflow_format2 | ||
from .validation_native import validate_workflow_native | ||
|
||
|
||
def validate_workflow(workflow_dict: AnyWorkflowDict, get_tool_info: GetToolInfo): | ||
if _format(workflow_dict) == "gxformat2": | ||
validate_workflow_format2(workflow_dict, get_tool_info) | ||
else: | ||
validate_workflow_native(workflow_dict, get_tool_info) | ||
|
||
|
||
def _format(workflow_dict: AnyWorkflowDict) -> WorkflowFormat: | ||
if workflow_dict.get("a_galaxy_workflow") == "true": | ||
return "native" | ||
else: | ||
return "gxformat2" |
Oops, something went wrong.