diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index e1ac7456ad41..3b2e4c6272a7 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -185,14 +185,19 @@ def set_metadata( dataset_id: int, model_class: str = "HistoryDatasetAssociation", overwrite: bool = True, + ensure_can_set_metadata: bool = True, task_user_id: Optional[int] = None, ): + """ + ensure_can_set_metadata can be bypassed for new outputs. + """ manager = _get_dataset_manager(hda_manager, ldda_manager, model_class) dataset_instance = manager.by_id(dataset_id) - can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False) - if not can_set_metadata: - log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}") - return + if ensure_can_set_metadata: + can_set_metadata = manager.ensure_can_set_metadata(dataset_instance, raiseException=False) + if not can_set_metadata: + log.info(f"Setting metadata is not allowed for {model_class} {dataset_instance.id}") + return try: if overwrite: hda_manager.overwrite_metadata(dataset_instance) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index d63ba30916df..c930229df5e0 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -2020,8 +2020,9 @@ def fail(message=job.info, exception=None): # Certain tools require tasks to be completed after job execution # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). param_dict = self.get_param_dict(job) + task_wrapper = None try: - self.tool.exec_after_process( + task_wrapper = self.tool.exec_after_process( self.app, inp_data, out_data, param_dict, job=job, final_job_state=final_job_state ) except Exception as e: @@ -2063,6 +2064,10 @@ def fail(message=job.info, exception=None): self.sa_session.commit() if job.state == job.states.ERROR: self._report_error() + elif task_wrapper: + # Only task is setting metadata (if necessary) on expression tool output. + # The dataset state is SETTING_METADATA, which delays dependent jobs until the task completes. + task_wrapper.delay() cleanup_job = self.cleanup_job delete_files = cleanup_job == "always" or (job.state == job.states.OK and cleanup_job == "onsuccess") self.cleanup(delete_files=delete_files) diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 065033fe3029..570008c7c64b 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -2876,7 +2876,27 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw break if copy_object is None: raise exceptions.MessageException("Failed to find dataset output.") - out_data[key].copy_from(copy_object, include_metadata=True) + output = out_data[key] + # if change_datatype PJA is associated with expression tool output the new output already has + # the desired datatype, so we use it. If the extension is "data" there's no change_dataset PJA and + # we want to use the existing extension. + new_ext = output.extension if output.extension != "data" else copy_object.extension + require_metadata_regeneration = copy_object.extension != new_ext + output.copy_from(copy_object, include_metadata=not require_metadata_regeneration) + output.extension = new_ext + if require_metadata_regeneration: + if app.config.enable_celery_tasks: + from galaxy.celery.tasks import set_metadata + + output._state = model.Dataset.states.SETTING_METADATA + return set_metadata.si( + dataset_id=output.id, task_user_id=output.history.user_id, ensure_can_set_metadata=False + ) + else: + # TODO: move exec_after_process into metadata script so this doesn't run on the headnode ? + output.init_meta() + output.set_meta() + output.set_metadata_success_state() def parse_environment_variables(self, tool_source): """Setup environment variable for inputs file."""