diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 4b9030fbde0b..a9e0bc281c02 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -891,24 +891,27 @@ def execute( self, trans, progress, invocation, step ): # Connect up def callback( input, prefixed_name, **kwargs ): replacement = NO_REPLACEMENT - if isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ): - if iteration_elements and prefixed_name in iteration_elements: - if isinstance( input, DataToolParameter ): - # Pull out dataset instance from element. - replacement = iteration_elements[ prefixed_name ].dataset_instance - if hasattr(iteration_elements[ prefixed_name ], u'element_identifier') and iteration_elements[ prefixed_name ].element_identifier: - replacement.element_identifier = iteration_elements[ prefixed_name ].element_identifier - else: - # If collection - just use element model object. - replacement = iteration_elements[ prefixed_name ] + if iteration_elements and prefixed_name in iteration_elements: + if not isinstance( input, DataCollectionToolParameter ): + # Pull out dataset instance from element. + replacement = iteration_elements[ prefixed_name ].dataset_instance + if hasattr(iteration_elements[ prefixed_name ], u'element_identifier') and iteration_elements[ prefixed_name ].element_identifier: + replacement.element_identifier = iteration_elements[ prefixed_name ].element_identifier else: - replacement = progress.replacement_for_tool_input( step, input, prefixed_name ) + # If collection - just use element model object. + replacement = iteration_elements[ prefixed_name ] else: - replacement = progress.replacement_for_tool_input( step, input, prefixed_name ) + # TODO: I guess never exapnd in progress - only exapnd up here! + replacement = progress.replacement_for_tool_input( step, input, prefixed_name, expand_expressions=True ) if replacement is not NO_REPLACEMENT: found_replacement_keys.add(prefixed_name) + is_data = isinstance( input, DataToolParameter ) or isinstance( input, DataCollectionToolParameter ) + if not is_data and getattr( replacement, "history_content_type", None ) == "dataset": + with open( replacement.file_name, "r" ) as f: + replacement = loads(f.read()) + return replacement try: @@ -987,7 +990,7 @@ def callback( input, prefixed_name, **kwargs ): data = progress.replacement_for_tool_input( step, input, prefixed_name ) if hasattr( data, "collection" ): collections_to_match.add( prefixed_name, data ) - + return is_data_collection_param = isinstance( input, DataCollectionToolParameter ) if is_data_collection_param and not input.multiple: data = progress.replacement_for_tool_input( step, input, prefixed_name ) @@ -996,6 +999,12 @@ def callback( input, prefixed_name, **kwargs ): subcollection_type_description = history_query.can_map_over( data ) if subcollection_type_description: collections_to_match.add( prefixed_name, data, subcollection_type=subcollection_type_description.collection_type ) + return + + data = progress.replacement_for_tool_input( step, input, prefixed_name ) + if data is not NO_REPLACEMENT: + if hasattr( data, "collection" ): + collections_to_match.add( prefixed_name, data ) visit_input_values( tool.inputs, step.state.inputs, callback ) return collections_to_match diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 643a3ece43e7..78b2a96b5620 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -422,14 +422,19 @@ def replacement_for_connection( self, connection, is_data=True ): raise modules.DelayedWorkflowEvaluation(why=delayed_why) is_hda = isinstance( replacement, model.HistoryDatasetAssociation ) - if not is_data and is_hda: - if replacement.is_ok: - with open( replacement.file_name, 'r' ) as f: - replacement = json.load( f ) - elif replacement.is_pending: - raise modules.DelayedWorkflowEvaluation() + is_hdca = isinstance( replacement, model.HistoryDatasetCollectionAssociation ) + if not is_data and (is_hda or is_hdca): + dataset_instances = [] + if is_hda: + dataset_instances = [ replacement ] else: - raise modules.CancelWorkflowEvaluation() + dataset_instances = replacement.dataset_instances + + for dataset_instance in dataset_instances: + if dataset_instance.is_pending: + raise modules.DelayedWorkflowEvaluation() + elif not dataset_instance.is_ok: + raise modules.CancelWorkflowEvaluation() return replacement