diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index e1ac7456ad41..3b2e4c6272a7 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -185,14 +185,19 @@ def set_metadata( dataset_id: int, model_class: str = "HistoryDatasetAssociation", overwrite: bool = True, + ensure_can_set_metadata: bool = True, task_user_id: Optional[int] = None, ): + """ + ensure_can_set_metadata can be bypassed for new outputs. + """ manager = _get_dataset_manager(hda_manager, ldda_manager, model_class) dataset_instance = manager.by_id(dataset_id) - can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False) - if not can_set_metadata: - log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}") - return + if ensure_can_set_metadata: + can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False) + if not can_set_metadata: + log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}") + return try: if overwrite: hda_manager.overwrite_metadata(dataset_instance) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index d63ba30916df..c930229df5e0 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -2020,8 +2020,9 @@ def fail(message=job.info, exception=None): # Certain tools require tasks to be completed after job execution # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). param_dict = self.get_param_dict(job) + task_wrapper = None try: - self.tool.exec_after_process( + task_wrapper = self.tool.exec_after_process( self.app, inp_data, out_data, param_dict, job=job, final_job_state=final_job_state ) except Exception as e: @@ -2063,6 +2064,10 @@ def fail(message=job.info, exception=None): self.sa_session.commit() if job.state == job.states.ERROR: self._report_error() + elif task_wrapper: + # Only task is setting metadata (if necessary) on expression tool output. + # The dataset state is SETTING_METADATA, which delays dependent jobs until the task completes. + task_wrapper.delay() cleanup_job = self.cleanup_job delete_files = cleanup_job == "always" or (job.state == job.states.OK and cleanup_job == "onsuccess") self.cleanup(delete_files=delete_files) diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 065033fe3029..570008c7c64b 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -2876,7 +2876,27 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw break if copy_object is None: raise exceptions.MessageException("Failed to find dataset output.") - out_data[key].copy_from(copy_object, include_metadata=True) + output = out_data[key] + # if change_datatype PJA is associated with expression tool output the new output already has + # the desired datatype, so we use it. If the extension is "data" there's no change_dataset PJA and + # we want to use the existing extension. + new_ext = output.extension if output.extension != "data" else copy_object.extension + require_metadata_regeneration = copy_object.extension != new_ext + output.copy_from(copy_object, include_metadata=not require_metadata_regeneration) + output.extension = new_ext + if require_metadata_regeneration: + if app.config.enable_celery_tasks: + from galaxy.celery.tasks import set_metadata + + output._state = model.Dataset.states.SETTING_METADATA + return set_metadata.si( + dataset_id=output.id, task_user_id=output.history.user_id, ensure_can_set_metadata=False + ) + else: + # TODO: move exec_after_process into metadata script so this doesn't run on the headnode ? + output.init_meta() + output.set_meta() + output.set_metadata_success_state() def parse_environment_variables(self, tool_source): """Setup environment variable for inputs file.""" diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index c42da227984e..cfb723048934 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -2086,14 +2086,25 @@ def test_run_workflow_pick_value_bam_pja(self): pick_from: - value: __class__: RuntimeValue + consume_index: + tool_id: metadata_bam + in: + input_bam: pick_value/data_param + tool_state: + ref_names: + - chr10_random + - chr11 + - chrM + - chrX + - chr16 outputs: pick_out: outputSource: pick_value/data_param """, test_data=""" some_file: - value: 1.bam - file_type: bam + value: 3.bam + file_type: unsorted.bam type: File """, history_id=history_id, @@ -2106,6 +2117,7 @@ def test_run_workflow_pick_value_bam_pja(self): ) assert dataset_details["metadata_reference_names"] assert dataset_details["metadata_bam_index"] + assert dataset_details["file_ext"] == "bam" def test_run_workflow_simple_conditional_step(self): with self.dataset_populator.test_history() as history_id: