Skip to content

Commit

Permalink
Test case WIP...
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Aug 8, 2024
1 parent d4401d1 commit 4b7377f
Show file tree
Hide file tree
Showing 14 changed files with 423 additions and 54 deletions.
185 changes: 185 additions & 0 deletions lib/galaxy/tool_util/parameters/case.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from dataclasses import dataclass
from re import compile
from typing import (
Any,
cast,
Dict,
List,
Optional,
Set,
)

from galaxy.tool_util.parser.interface import (
ToolSourceTest,
ToolSourceTestInput,
ToolSourceTestInputs,
)
from galaxy.tool_util.workflow_state._validation_util import validate_explicit_conditional_test_value
from galaxy.util import asbool
from .models import (
BooleanParameterModel,
ConditionalParameterModel,
ConditionalWhen,
DataCollectionParameterModel,
DataColumnParameterModel,
DataParameterModel,
FloatParameterModel,
IntegerParameterModel,
parameters_by_name,
ToolParameterBundle,
ToolParameterT,
)
from .state import TestCaseToolState
from .visitor import flat_state_path

INTEGER_STR_PATTERN = compile(r"(\d+)")
COLUMN_NAME_STR_PATTERN = compile(r"c(\d+): .*")


@dataclass
class TestCaseStateAndWarnings:
tool_state: TestCaseToolState
warnings: List[str]


def legacy_from_string(parameter: ToolParameterT, value: str, warnings: List[str], profile: str) -> Any:
"""Convert string values in XML test cases into typed variants.
This should only be used when parsing XML test cases into a TestCaseToolState object.
We have to maintain backward compatibility on these for older Galaxy tool profile versions.
"""
is_string = isinstance(value, str)
result_value: Any = value
if is_string and isinstance(parameter, (IntegerParameterModel,)):
warnings.append(
f"Implicitly converted {parameter.name} to an integer from a string value, please use 'value_json' to define this test input parameter value instead."
)
result_value = int(value)
elif is_string and isinstance(parameter, (FloatParameterModel,)):
warnings.append(
f"Implicitly converted {parameter.name} to a floating point number from a string value, please use 'value_json' to define this test input parameter value instead."
)
result_value = float(value)
elif is_string and isinstance(parameter, (BooleanParameterModel,)):
warnings.append(
f"Implicitly converted {parameter.name} to a boolean from a string value, please use 'value_json' to define this test input parameter value instead."
)
result_value = asbool(value)
elif is_string and isinstance(parameter, (DataColumnParameterModel,)):
integer_match = INTEGER_STR_PATTERN.match(value)
if integer_match:
warnings.append(
f"Implicitly converted {parameter.name} to a column index integer from a string value, please use 'value_json' to define this test input parameter value instead."
)
result_value = int(value)
else:
warnings.append(
f"Using column names as test case values is deprecated, please adjust {parameter.name} to just use an integer column index."
)
column_name_value_match = COLUMN_NAME_STR_PATTERN.match(value)
column_part = column_name_value_match.group(1)
result_value = int(column_part)
return result_value


def test_case_state(
test_dict: ToolSourceTest, tool_parameter_bundle: List[ToolParameterT], profile: str
) -> TestCaseStateAndWarnings:
warnings: List[str] = []
inputs: ToolSourceTestInputs = test_dict["inputs"]
state = {}

handled_inputs = _merge_level_into_state(tool_parameter_bundle, inputs, state, profile, warnings, None)

for test_input in inputs:
input_name = test_input["name"]
if input_name not in handled_inputs:
raise Exception(f"Invalid parameter name found {input_name}")

tool_state = TestCaseToolState(state)
tool_state.validate(tool_parameter_bundle)
return TestCaseStateAndWarnings(tool_state, warnings)


def _merge_level_into_state(
tool_inputs: List[ToolParameterT],
inputs: ToolSourceTestInputs,
state_at_level: dict,
profile: str,
warnings: List[str],
prefix: Optional[str],
) -> Set[str]:
handled_inputs: Set[str] = set()
for tool_input in tool_inputs:
handled_inputs.update(_merge_into_state(tool_input, inputs, state_at_level, profile, warnings, prefix))

return handled_inputs


def _merge_into_state(
tool_input: ToolParameterT,
inputs: ToolSourceTestInputs,
state_at_level: dict,
profile: str,
warnings: List[str],
prefix: Optional[str],
) -> Set[str]:
handled_inputs = set()

input_name = tool_input.name
state_path = flat_state_path(input_name, prefix)
handled_inputs.add(state_path)

if isinstance(tool_input, (ConditionalParameterModel,)):
conditional_state = state_at_level.get(input_name, {})
if input_name not in state_at_level:
state_at_level[input_name] = conditional_state

conditional = cast(ConditionalParameterModel, tool_input)
when: ConditionalWhen = _select_which_when(conditional, conditional_state, inputs, state_path)
test_parameter = conditional.test_parameter
handled_inputs.update(
_merge_into_state(test_parameter, inputs, conditional_state, profile, warnings, state_path)
)
handled_inputs.update(
_merge_level_into_state(when.parameters, inputs, conditional_state, profile, warnings, state_path)
)
else:
test_input = _input_for(state_path, inputs)
if test_input is not None:
if isinstance(tool_input, (DataCollectionParameterModel,)):
input_value = test_input.get("attributes", {}).get("collection")
else:
input_value = test_input["value"]
input_value = legacy_from_string(tool_input, input_value, warnings, profile)

state_at_level[input_name] = input_value

return handled_inputs


def _select_which_when(
conditional: ConditionalParameterModel, state: dict, inputs: ToolSourceTestInputs, prefix: str
) -> ConditionalWhen:
test_parameter = conditional.test_parameter
test_parameter_name = test_parameter.name
test_parameter_flat_path = flat_state_path(test_parameter_name, prefix)

test_input = _input_for(test_parameter_flat_path, inputs)
explicit_test_value = test_input["value"] if test_input else None
test_value = validate_explicit_conditional_test_value(test_parameter_name, explicit_test_value)
for when in conditional.whens:
if test_value is None and when.is_default_when:
return when
elif test_value == when.discriminator:
return when
else:
raise Exception(f"Invalid conditional test value ({explicit_test_value}) for parameter ({test_parameter_name})")


def _input_for(flat_state_path: str, inputs: ToolSourceTestInputs) -> Optional[ToolSourceTestInput]:
for input in inputs:
if input["name"] == flat_state_path:
return input
else:
return None
53 changes: 41 additions & 12 deletions lib/galaxy/tool_util/parameters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
)

from galaxy.exceptions import RequestParameterInvalidException
from galaxy.tool_util.parser.interface import DrillDownOptionsDict
from galaxy.tool_util.parser.interface import (
DrillDownOptionsDict,
TestCollectionDict,
)
from ._types import (
cast_as_type,
is_optional,
Expand All @@ -57,7 +60,7 @@
# + request_internal: This is a pydantic model to validate what Galaxy expects to find in the database,
# in particular dataset and collection references should be decoded integers.
StateRepresentationT = Literal[
"request", "request_internal", "job_internal", "test_case", "workflow_step", "workflow_step_linked"
"request", "request_internal", "job_internal", "test_case_xml", "workflow_step", "workflow_step_linked"
]


Expand Down Expand Up @@ -310,9 +313,9 @@ def py_type_internal(self) -> Type:
def py_type_test_case(self) -> Type:
base_model: Type
if self.multiple:
base_model = MultiDataRequestInternal
base_model = str
else:
base_model = DataTestCaseValue
base_model = str
return optional_if_needed(base_model, self.optional)

def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation:
Expand All @@ -324,7 +327,7 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
)
elif state_representation == "job_internal":
return dynamic_model_information_from_py_type(self, self.py_type_internal)
elif state_representation == "test_case":
elif state_representation == "test_case_xml":
return dynamic_model_information_from_py_type(self, self.py_type_test_case)
elif state_representation == "workflow_step":
return dynamic_model_information_from_py_type(self, type(None), requires_value=False)
Expand Down Expand Up @@ -368,6 +371,8 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
return dynamic_model_information_from_py_type(self, type(None), requires_value=False)
elif state_representation == "workflow_step_linked":
return dynamic_model_information_from_py_type(self, ConnectedValue)
elif state_representation == "test_case_xml":
return dynamic_model_information_from_py_type(self, TestCollectionDict)
else:
raise NotImplementedError(
f"Have not implemented data collection parameter models for state representation {state_representation}"
Expand Down Expand Up @@ -535,7 +540,7 @@ class SelectParameterModel(BaseGalaxyToolParameterModelDefinition):
options: Optional[List[LabelValue]] = None
multiple: bool

def py_type_if_required(self, allow_connections=False) -> Type:
def py_type_if_required(self, allow_connections: bool = False, expect_list: bool = True) -> Type:
if self.options is not None:
if len(self.options) > 0:
literal_options: List[Type] = [cast_as_type(Literal[o.value]) for o in self.options]
Expand All @@ -546,9 +551,15 @@ def py_type_if_required(self, allow_connections=False) -> Type:
py_type = StrictStr
if self.multiple:
if allow_connections:
py_type = list_type(allow_connected_value(py_type))
if expect_list:
py_type = list_type(allow_connected_value(py_type))
else:
py_type = allow_connected_value(py_type)
else:
py_type = list_type(py_type)
if expect_list:
py_type = list_type(py_type)
else:
py_type = py_type
elif allow_connections:
py_type = allow_connected_value(py_type)
return py_type
Expand All @@ -568,6 +579,10 @@ def pydantic_template(self, state_representation: StateRepresentationT) -> Dynam
elif state_representation == "workflow_step_linked":
py_type = self.py_type_if_required(allow_connections=True)
return dynamic_model_information_from_py_type(self, optional_if_needed(py_type, self.optional))
elif state_representation == "test_case_xml":
# in a YAML test case representation this can be string, in XML we are still expecting a comma separated string
py_type = self.py_type_if_required(allow_connections=False, expect_list=False)
return dynamic_model_information_from_py_type(self, optional_if_needed(py_type, self.optional))
else:
return dynamic_model_information_from_py_type(self, self.py_type)

Expand Down Expand Up @@ -661,8 +676,16 @@ def py_type(self) -> Type:

return py_type

@property
def py_type_test_case_xml(self) -> Type:
base_model = str
return optional_if_needed(base_model, not self.request_requires_value)

def pydantic_template(self, state_representation: StateRepresentationT) -> DynamicModelInformation:
return dynamic_model_information_from_py_type(self, self.py_type)
if state_representation == "test_case_xml":
return dynamic_model_information_from_py_type(self, self.py_type_test_case_xml)
else:
return dynamic_model_information_from_py_type(self, self.py_type)

@property
def request_requires_value(self) -> bool:
Expand Down Expand Up @@ -1116,9 +1139,15 @@ class ToolParameterBundleModel(BaseModel):
input_models: List[ToolParameterT]


def parameters_by_name(tool_parameter_bundle: ToolParameterBundle) -> Dict[str, ToolParameterT]:
def parameters_by_name(
inputs: Union[Iterable[ToolParameterModel], Iterable[ToolParameterT], ToolParameterBundle]
) -> Dict[str, ToolParameterT]:
as_dict = {}
for input_model in simple_input_models(tool_parameter_bundle.input_models):
if hasattr(inputs, "input_models"):
inputs_list = simple_input_models(cast(ToolParameterBundle, inputs.input_models))
else:
inputs_list = cast(Union[Iterable[ToolParameterModel], Iterable[ToolParameterT]], inputs)
for input_model in inputs_list:
as_dict[input_model.name] = input_model
return as_dict

Expand Down Expand Up @@ -1156,7 +1185,7 @@ def create_job_internal_model(tool: ToolParameterBundle, name: str = "DynamicMod


def create_test_case_model(tool: ToolParameterBundle, name: str = "DynamicModelForTool") -> Type[BaseModel]:
return create_field_model(tool.input_models, name, "test_case")
return create_field_model(tool.input_models, name, "test_case_xml")


def create_workflow_step_model(tool: ToolParameterBundle, name: str = "DynamicModelForTool") -> Type[BaseModel]:
Expand Down
5 changes: 3 additions & 2 deletions lib/galaxy/tool_util/parameters/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
create_job_internal_model,
create_request_internal_model,
create_request_model,
create_test_case_model,
create_workflow_step_linked_model,
create_workflow_step_model,
StateRepresentationT,
Expand Down Expand Up @@ -91,12 +92,12 @@ def _parameter_model_for(cls, input_models: ToolParameterBundle) -> Type[BaseMod


class TestCaseToolState(ToolState):
state_representation: Literal["test_case"] = "test_case"
state_representation: Literal["test_case_xml"] = "test_case_xml"

@classmethod
def _parameter_model_for(cls, input_models: ToolParameterBundle) -> Type[BaseModel]:
# implement a test case model...
return create_request_internal_model(input_models)
return create_test_case_model(input_models)


class WorkflowStepToolState(ToolState):
Expand Down
20 changes: 19 additions & 1 deletion lib/galaxy/tool_util/parser/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import packaging.version
from pydantic import BaseModel
from typing_extensions import (
Literal,
NotRequired,
TypedDict,
)
Expand Down Expand Up @@ -376,7 +377,7 @@ def paths_and_modtimes(self):
paths_and_modtimes[self.source_path] = os.path.getmtime(self.source_path)
return paths_and_modtimes

def parse_tests_to_dict(self) -> ToolSourceTests:
def parse_tests_to_dict(self, for_json: bool = False) -> ToolSourceTests:
return {"tests": []}

def __str__(self):
Expand Down Expand Up @@ -549,6 +550,23 @@ def parse_input_sources(self) -> List[InputSource]:
"""Return a list of InputSource objects."""


TestCollectionAttributeDict = Dict[str, Any]
CollectionType = str


class TestCollectionDictElement(TypedDict):
element_identifier: str
element_definition: Union["TestCollectionDict", "ToolSourceTestInput"]


class TestCollectionDict(TypedDict):
model_class: Literal["TestCollectionDef"] = "TestCollectionDef"
attributes: TestCollectionAttributeDict
collection_type: CollectionType
elements: List[TestCollectionDictElement]
name: str


class TestCollectionDef:
__test__ = False # Prevent pytest from discovering this class (issue #12071)

Expand Down
Loading

0 comments on commit 4b7377f

Please sign in to comment.