diff --git a/lib/galaxy/tool_util/verify/parse.py b/lib/galaxy/tool_util/verify/parse.py index bb1cdbcbed6e..c06929a876f4 100644 --- a/lib/galaxy/tool_util/verify/parse.py +++ b/lib/galaxy/tool_util/verify/parse.py @@ -151,7 +151,6 @@ def _process_raw_inputs( ) if case_input_sources: for input_source in case_input_sources.parse_input_sources(): - input_name = input_source.get("name") case_inputs = _process_raw_inputs( tool_source, [input_source], diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 91bf85374dc2..3979eb569510 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -97,6 +97,7 @@ ) from galaxy.tool_util.toolbox.views.sources import StaticToolBoxViewSources from galaxy.tool_util.verify.interactor import ToolTestDescription +from galaxy.tool_util.verify.parse import parse_tool_test_descriptions from galaxy.tool_util.verify.test_data import TestDataNotFoundError from galaxy.tool_util.version import ( LegacyVersion, @@ -149,7 +150,6 @@ from galaxy.tools.parameters.meta import expand_meta_parameters from galaxy.tools.parameters.workflow_utils import workflow_building_modes from galaxy.tools.parameters.wrapped_json import json_wrap -from galaxy.tools.test import parse_tests from galaxy.util import ( in_directory, listify, @@ -1306,9 +1306,10 @@ def __parse_trackster_conf(self, tool_source): self.trackster_conf = TracksterConfig.parse(trackster_conf) def parse_tests(self): - if tests_source := self.tool_source: + if self.tool_source: + test_descriptions = parse_tool_test_descriptions(self.tool_source, self.id) try: - self.__tests = json.dumps([t.to_dict() for t in parse_tests(self, tests_source)], indent=None) + self.__tests = json.dumps([t.to_dict() for t in test_descriptions], indent=None) except Exception: self.__tests = None log.exception("Failed to parse tool tests for tool '%s'", self.id) diff --git a/lib/galaxy/tools/test.py b/lib/galaxy/tools/test.py deleted file mode 100644 index 7447c17bd1d9..000000000000 --- a/lib/galaxy/tools/test.py +++ /dev/null @@ -1,350 +0,0 @@ -import logging -import os -import os.path -from typing import ( - Iterable, - List, - Tuple, - Union, -) - -import galaxy.tools.parameters.basic -import galaxy.tools.parameters.grouping -from galaxy.tool_util.verify.interactor import ( - InvalidToolTestDict, - ToolTestDescription, - ValidToolTestDict, -) -from galaxy.tool_util.verify.parse import ( - ParamContext, - process_bool_param_value, - RequiredDataTablesT, - RequiredFilesT, - RequiredLocFileT, - RootParamContext, - split_if_str, -) -from galaxy.util import ( - string_as_bool, - string_as_bool_or_none, - unicodify, -) - -log = logging.getLogger(__name__) - - -def parse_tests(tool, tests_source) -> Iterable[ToolTestDescription]: - """ - Build ToolTestDescription objects for each "" elements and - return default interactor (if any). - """ - raw_tests_dict = tests_source.parse_tests_to_dict() - tests: List[ToolTestDescription] = [] - for i, raw_test_dict in enumerate(raw_tests_dict.get("tests", [])): - test = description_from_tool_object(tool, i, raw_test_dict) - tests.append(test) - return tests - - -def description_from_tool_object(tool, test_index, raw_test_dict) -> ToolTestDescription: - required_files: RequiredFilesT = [] - required_data_tables: RequiredDataTablesT = [] - required_loc_files: RequiredLocFileT = [] - - num_outputs = raw_test_dict.get("expect_num_outputs", None) - if num_outputs: - num_outputs = int(num_outputs) - maxseconds = raw_test_dict.get("maxseconds", None) - if maxseconds is not None: - maxseconds = int(maxseconds) - - processed_test_dict: Union[ValidToolTestDict, InvalidToolTestDict] - try: - processed_inputs = _process_raw_inputs( - tool, tool.inputs, raw_test_dict["inputs"], required_files, required_data_tables, required_loc_files - ) - - processed_test_dict = ValidToolTestDict( - { - "inputs": processed_inputs, - "outputs": raw_test_dict["outputs"], - "output_collections": raw_test_dict["output_collections"], - "num_outputs": num_outputs, - "command_line": raw_test_dict.get("command", None), - "command_version": raw_test_dict.get("command_version", None), - "stdout": raw_test_dict.get("stdout", None), - "stderr": raw_test_dict.get("stderr", None), - "expect_exit_code": raw_test_dict.get("expect_exit_code", None), - "expect_failure": raw_test_dict.get("expect_failure", False), - "expect_test_failure": raw_test_dict.get("expect_test_failure", False), - "required_files": required_files, - "required_data_tables": required_data_tables, - "required_loc_files": required_loc_files, - "tool_id": tool.id, - "tool_version": tool.version, - "test_index": test_index, - "maxseconds": maxseconds, - "error": False, - } - ) - except Exception as e: - log.exception("Failed to load tool test number [%d] for %s" % (test_index, tool.id)) - processed_test_dict = InvalidToolTestDict( - { - "tool_id": tool.id, - "tool_version": tool.version, - "test_index": test_index, - "inputs": {}, - "error": True, - "exception": unicodify(e), - "maxseconds": maxseconds, - } - ) - - return ToolTestDescription(processed_test_dict) - - -def _process_raw_inputs( - tool, tool_inputs, raw_inputs, required_files, required_data_tables, required_loc_files, parent_context=None -): - """ - Recursively expand flat list of inputs into "tree" form of flat list - (| using to nest to new levels) structure and expand dataset - information as proceeding to populate self.required_files. - """ - parent_context = parent_context or RootParamContext() - expanded_inputs = {} - for value in tool_inputs.values(): - if isinstance(value, galaxy.tools.parameters.grouping.Conditional): - cond_context = ParamContext(name=value.name, parent_context=parent_context) - assert value.test_param - case_context = ParamContext(name=value.test_param.name, parent_context=cond_context) - raw_input_dict = case_context.extract_value(raw_inputs) - case_value = raw_input_dict["value"] if raw_input_dict else None - case = _matching_case_for_value(tool, value, case_value) - if case: - for input_name, input_value in case.inputs.items(): - case_inputs = _process_raw_inputs( - tool, - {input_name: input_value}, - raw_inputs, - required_files, - required_data_tables, - required_loc_files, - parent_context=cond_context, - ) - expanded_inputs.update(case_inputs) - if not value.type == "text": - expanded_case_value = split_if_str(case.value) - if case_value is not None: - # A bit tricky here - we are growing inputs with value - # that may be implicit (i.e. not defined by user just - # a default defined in tool). So we do not want to grow - # expanded_inputs and risk repeat block viewing this - # as a new instance with value defined and hence enter - # an infinite loop - hence the "case_value is not None" - # check. - processed_value = _process_simple_value( - value.test_param, expanded_case_value, required_data_tables, required_loc_files - ) - expanded_inputs[case_context.for_state()] = processed_value - elif isinstance(value, galaxy.tools.parameters.grouping.Section): - context = ParamContext(name=value.name, parent_context=parent_context) - assert value.inputs - for r_value in value.inputs.values(): - expanded_input = _process_raw_inputs( - tool, - {context.for_state(): r_value}, - raw_inputs, - required_files, - required_data_tables, - required_loc_files, - parent_context=context, - ) - if expanded_input: - expanded_inputs.update(expanded_input) - elif isinstance(value, galaxy.tools.parameters.grouping.Repeat): - repeat_index = 0 - while True: - context = ParamContext(name=value.name, index=repeat_index, parent_context=parent_context) - updated = False - assert value.inputs - for r_value in value.inputs.values(): - expanded_input = _process_raw_inputs( - tool, - {context.for_state(): r_value}, - raw_inputs, - required_files, - required_data_tables, - required_loc_files, - parent_context=context, - ) - if expanded_input: - expanded_inputs.update(expanded_input) - updated = True - if not updated: - break - repeat_index += 1 - else: - context = ParamContext(name=value.name, parent_context=parent_context) - raw_input_dict = context.extract_value(raw_inputs) - if raw_input_dict: - name = raw_input_dict["name"] - param_value = raw_input_dict["value"] - param_extra = raw_input_dict["attributes"] - location = param_extra.get("location") - if not value.type == "text": - param_value = split_if_str(param_value) - if isinstance(value, galaxy.tools.parameters.basic.DataToolParameter): - if location and value.multiple: - # We get the input/s from the location which can be a list of urls separated by commas - locations = split_if_str(location) - param_value = [] - for location in locations: - v = os.path.basename(location) - param_value.append(v) - # param_extra should contain only the corresponding location - extra = dict(param_extra) - extra["location"] = location - _add_uploaded_dataset(context.for_state(), v, extra, value, required_files) - else: - if not isinstance(param_value, list): - param_value = [param_value] - for v in param_value: - _add_uploaded_dataset(context.for_state(), v, param_extra, value, required_files) - processed_value = param_value - elif isinstance(value, galaxy.tools.parameters.basic.DataCollectionToolParameter): - assert "collection" in param_extra - collection_def = param_extra["collection"] - for input_dict in collection_def.collect_inputs(): - name = input_dict["name"] - value = input_dict["value"] - attributes = input_dict["attributes"] - require_file(name, value, attributes, required_files) - processed_value = collection_def - else: - processed_value = _process_simple_value( - value, param_value, required_data_tables, required_loc_files - ) - expanded_inputs[context.for_state()] = processed_value - return expanded_inputs - - -def _process_simple_value(param, param_value, required_data_tables, required_loc_files): - if isinstance(param, galaxy.tools.parameters.basic.SelectToolParameter): - # Tests may specify values as either raw value or the value - # as they appear in the list - the API doesn't and shouldn't - # accept the text value - so we need to convert the text - # into the form value. - def process_param_value(param_value): - found_value = False - value_for_text = None - for text, opt_value, _ in getattr(param, "static_options", []): - if param_value == opt_value: - found_value = True - if value_for_text is None and param_value == text: - value_for_text = opt_value - if param.options and not isinstance(param, galaxy.tools.parameters.basic.DrillDownSelectToolParameter): - if param.options.tool_data_table_name: - required_data_tables.append(param.options.tool_data_table_name) - elif param.options.index_file: - required_loc_files.append(param.options.index_file) - if not found_value and value_for_text is not None: - processed_value = value_for_text - else: - processed_value = param_value - return processed_value - - # Do replacement described above for lists or singleton - # values. - if isinstance(param_value, list): - processed_value = list(map(process_param_value, param_value)) - else: - processed_value = process_param_value(param_value) - elif isinstance(param, galaxy.tools.parameters.basic.BooleanToolParameter): - # Like above, tests may use the tool define values of simply - # true/false. - processed_value = _process_bool_param_value(param, param_value) - else: - processed_value = param_value - return processed_value - - -def _matching_case_for_value(tool, cond, declared_value): - test_param = cond.test_param - if isinstance(test_param, galaxy.tools.parameters.basic.BooleanToolParameter): - if declared_value is None: - # No explicit value for param in test case, determine from default - query_value = test_param.checked - else: - query_value = _process_bool_param_value(test_param, declared_value) - - def matches_declared_value(case_value): - return _process_bool_param_value(test_param, case_value) == query_value - - elif isinstance(test_param, galaxy.tools.parameters.basic.SelectToolParameter): - if declared_value is not None: - # Test case supplied explicit value to check against. - - def matches_declared_value(case_value): - return case_value == declared_value - - elif test_param.static_options: - # No explicit value in test case, not much to do if options are dynamic but - # if static options are available can find the one specified as default or - # fallback on top most option (like GUI). - for name, _, selected in test_param.static_options: - if selected: - default_option = name - else: - first_option = test_param.static_options[0] - first_option_value = first_option[1] - default_option = first_option_value - - def matches_declared_value(case_value): - return case_value == default_option - - else: - # No explicit value for this param and cannot determine a - # default - give up. Previously this would just result in a key - # error exception. - msg = f"Failed to find test parameter value specification required for conditional {cond.name}" - raise Exception(msg) - - # Check the tool's defined cases against predicate to determine - # selected or default. - for case in cond.cases: - if matches_declared_value(case.value): - return case - else: - msg_template = "%s - Failed to find case matching value (%s) for test parameter specification for conditional %s. Remainder of test behavior is unspecified." - msg = msg_template % (tool.id, declared_value, cond.name) - log.info(msg) - - -def _add_uploaded_dataset(name, value, extra, input_parameter, required_files): - if value is None: - assert input_parameter.optional, f"{name} is not optional. You must provide a valid filename." - return value - return require_file(name, value, extra, required_files) - - -def _process_bool_param_value(param, param_value): - assert isinstance(param, galaxy.tools.parameters.basic.BooleanToolParameter) - return process_bool_param_value(param.truevalue, param.falsevalue, param.optional, param_value) - - -def require_file(name, value, extra, required_files): - if (value, extra) not in required_files: - required_files.append((value, extra)) # these files will be uploaded - name_change = [att for att in extra.get("edit_attributes", []) if att.get("type") == "name"] - if name_change: - name_change = name_change[-1].get("value") # only the last name change really matters - value = name_change # change value for select to renamed uploaded file for e.g. composite dataset - else: - for end in [".zip", ".gz"]: - if value.endswith(end): - value = value[: -len(end)] - break - value = os.path.basename(value) # if uploading a file in a path other than root of test-data - return value diff --git a/test/unit/app/tools/test_test_parsing.py b/test/unit/tool_util/test_test_definition_parsing.py similarity index 86% rename from test/unit/app/tools/test_test_parsing.py rename to test/unit/tool_util/test_test_definition_parsing.py index 300f3b9b5745..25223385cc4e 100644 --- a/test/unit/app/tools/test_test_parsing.py +++ b/test/unit/tool_util/test_test_definition_parsing.py @@ -1,18 +1,13 @@ -""" Test Tool testing logic. - -I am going to migrate from using galaxy.tools.parameters and Galaxy Tool internals to -tool sources and I want to ensure the results do not change. -""" +"""Tool test parsing to dicts logic.""" from typing import ( Any, List, ) -from galaxy.app_unittest_utils import tools_support +from galaxy.tool_util.parser.factory import get_tool_source from galaxy.tool_util.unittest_utils import functional_test_tool_path from galaxy.tool_util.verify.parse import parse_tool_test_descriptions -from galaxy.tools.test import parse_tests from galaxy.util.unittest import TestCase # Not the whole response, just some keys and such to test... @@ -54,17 +49,13 @@ ] -class TestTestParsing(TestCase, tools_support.UsesTools): - tool_action: "MockAction" - - def setUp(self): - self.setup_app() - - def tearDown(self): - self.tear_down_app() - +class TestTestParsing(TestCase): def _parse_tests(self): - return parse_tests(self.tool, self.tool_source) + return parse_tool_test_descriptions(self.tool_source) + + def _init_tool_for_path(self, path): + tool_source = get_tool_source(path) + self.tool_source = tool_source def test_simple_state_parsing(self): self._init_tool_for_path(functional_test_tool_path("simple_constructs.xml")) @@ -106,9 +97,3 @@ def _verify(self, target_dict: dict, expectation_path: List[str], expectation: A for path_part in expectation_path: rest = rest[path_part] assert rest == expectation, f"{rest} != {expectation} for {expectation_path}" - - -class TestToolSourceTestParsing(TestTestParsing): - - def _parse_tests(self): - return parse_tool_test_descriptions(self.tool_source)