From eeacd6cde80c3f1c7de025af12d69d8789ae9019 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 20 Dec 2024 21:18:56 +0100 Subject: [PATCH] Check if we can determine extension as soon as possible Ideally we'd use session.begin_nested() and session.rollback() to undo staged changes, but I couldn't get that to work without changing the entire commit strategy. --- lib/galaxy/tools/actions/__init__.py | 182 +++++++++++++++------------ 1 file changed, 101 insertions(+), 81 deletions(-) diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 91bb60202d3d..e9b83bd73bce 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -496,7 +496,7 @@ def execute( async_tool = tool.tool_type == "data_source_async" - def handle_output(name, output, hidden=None): + def handle_output(name, output, hidden=None, ext: Optional[str] = None): if async_tool and name in incoming: # HACK: output data has already been created as a result of the async controller dataid = incoming[name] @@ -504,15 +504,16 @@ def handle_output(name, output, hidden=None): assert data is not None out_data[name] = data else: - ext = determine_output_format( - output, - wrapped_params.params, - inp_data, - inp_dataset_collections, - input_ext, - python_template_version=tool.python_template_version, - execution_cache=execution_cache, - ) + if ext is None: + ext = determine_output_format( + output, + wrapped_params.params, + inp_data, + inp_dataset_collections, + input_ext, + python_template_version=tool.python_template_version, + execution_cache=execution_cache, + ) create_datasets = True dataset = None @@ -586,82 +587,99 @@ def handle_output(name, output, hidden=None): return data child_dataset_names = set() - - for name, output in tool.outputs.items(): - if not filter_output(tool, output, incoming): - handle_output_timer = ExecutionTimer() - if output.collection: - if completed_job and dataset_collection_elements and name in dataset_collection_elements: - # Output collection is mapped over and has already been copied from original job - continue - collections_manager = app.dataset_collection_manager - element_identifiers: List[Dict[str, Union[str, List[Dict[str, Union[str, List[Any]]]]]]] = [] - # mypy doesn't yet support recursive type definitions - known_outputs = output.known_outputs(input_collections, collections_manager.type_registry) - # Just to echo TODO elsewhere - this should be restructured to allow - # nested collections. - for output_part_def in known_outputs: - # Add elements to top-level collection, unless nested... - current_element_identifiers = element_identifiers - current_collection_type = output.structure.collection_type - - for parent_id in output_part_def.parent_ids or []: - # TODO: replace following line with formal abstractions for doing this. - current_collection_type = ":".join(current_collection_type.split(":")[1:]) - name_to_index = { - value["name"]: index for (index, value) in enumerate(current_element_identifiers) - } - if parent_id not in name_to_index: - if parent_id not in current_element_identifiers: - index = len(current_element_identifiers) - current_element_identifiers.append( - dict( - name=parent_id, - collection_type=current_collection_type, - src="new_collection", - element_identifiers=[], - ) + outputs = {name: output for name, output in tool.outputs.items() if not filter_output(tool, output, incoming)} + name_output_extension = [ + ( + name, + output, + determine_output_format( + output, + wrapped_params.params, + inp_data, + inp_dataset_collections, + input_ext, + python_template_version=tool.python_template_version, + execution_cache=execution_cache, + ), + ) + for name, output in outputs.items() + ] + + for name, output, ext in name_output_extension: + handle_output_timer = ExecutionTimer() + handle_output(name, output, ext=ext) + if output.collection: + if completed_job and dataset_collection_elements and name in dataset_collection_elements: + # Output collection is mapped over and has already been copied from original job + continue + collections_manager = app.dataset_collection_manager + element_identifiers: List[Dict[str, Union[str, List[Dict[str, Union[str, List[Any]]]]]]] = [] + # mypy doesn't yet support recursive type definitions + known_outputs = output.known_outputs(input_collections, collections_manager.type_registry) + # Just to echo TODO elsewhere - this should be restructured to allow + # nested collections. + for output_part_def in known_outputs: + # Add elements to top-level collection, unless nested... + current_element_identifiers = element_identifiers + current_collection_type = output.structure.collection_type + + for parent_id in output_part_def.parent_ids or []: + # TODO: replace following line with formal abstractions for doing this. + current_collection_type = ":".join(current_collection_type.split(":")[1:]) + name_to_index = { + value["name"]: index for (index, value) in enumerate(current_element_identifiers) + } + if parent_id not in name_to_index: + if parent_id not in current_element_identifiers: + index = len(current_element_identifiers) + current_element_identifiers.append( + dict( + name=parent_id, + collection_type=current_collection_type, + src="new_collection", + element_identifiers=[], ) - else: - index = name_to_index[parent_id] - current_element_identifiers = cast( - List[ - Dict[ - str, - Union[str, List[Dict[str, Union[str, List[Any]]]]], - ] - ], - current_element_identifiers[index]["element_identifiers"], - ) - - effective_output_name = output_part_def.effective_output_name - child_dataset_names.add(effective_output_name) - element = handle_output(effective_output_name, output_part_def.output_def, hidden=True) - history.stage_addition(element) - # TODO: this shouldn't exist in the top-level of the history at all - # but for now we are still working around that by hiding the contents - # there. - # Following hack causes dataset to no be added to history... - trans.sa_session.add(element) - current_element_identifiers.append( - { - "__object__": element, - "name": output_part_def.element_identifier, - } + ) + else: + index = name_to_index[parent_id] + current_element_identifiers = cast( + List[ + Dict[ + str, + Union[str, List[Dict[str, Union[str, List[Any]]]]], + ] + ], + current_element_identifiers[index]["element_identifiers"], ) - if output.dynamic_structure: - assert not element_identifiers # known_outputs must have been empty - element_kwds = dict(elements=collections_manager.ELEMENTS_UNINITIALIZED) - else: - element_kwds = dict(element_identifiers=element_identifiers) - output_collections.create_collection( - output=output, name=name, completed_job=completed_job, **element_kwds + effective_output_name = output_part_def.effective_output_name + child_dataset_names.add(effective_output_name) + element = handle_output(effective_output_name, output_part_def.output_def, hidden=True) + history.stage_addition(element) + # TODO: this shouldn't exist in the top-level of the history at all + # but for now we are still working around that by hiding the contents + # there. + # Following hack causes dataset to no be added to history... + trans.sa_session.add(element) + current_element_identifiers.append( + { + "__object__": element, + "name": output_part_def.element_identifier, + } ) - log.info(f"Handled collection output named {name} for tool {tool.id} {handle_output_timer}") + + if output.dynamic_structure: + assert not element_identifiers # known_outputs must have been empty + element_kwds = dict(elements=collections_manager.ELEMENTS_UNINITIALIZED) else: - handle_output(name, output) - log.info(f"Handled output named {name} for tool {tool.id} {handle_output_timer}") + element_kwds = dict(element_identifiers=element_identifiers) + output_collections.create_collection( + output=output, name=name, completed_job=completed_job, **element_kwds + ) + log.info(f"Handled collection output named {name} for tool {tool.id} {handle_output_timer}") + else: + handle_output(name, output) + log.info(f"Handled output named {name} for tool {tool.id} {handle_output_timer}") add_datasets_timer = tool.app.execution_timer_factory.get_timer( "internals.galaxy.tools.actions.add_datasets", @@ -1240,6 +1258,8 @@ def determine_output_format( break input_dataset = input_element.element_object ext = get_ext_or_implicit_ext(input_dataset) + except ToolInputsNotReadyException: + raise except Exception as e: log.debug("Exception while trying to determine format_source: %s", e)