From bcdfba0f09d3d68f550bc844e00243be947742b9 Mon Sep 17 00:00:00 2001 From: John Davis Date: Tue, 5 Sep 2023 19:50:49 -0400 Subject: [PATCH 1/6] Ensure Job belongs to current SA session --- lib/galaxy/jobs/runners/__init__.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index c79d1926d0e9..6c95de6312b3 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -15,6 +15,8 @@ Queue, ) +from sqlalchemy.orm import object_session + import galaxy.jobs from galaxy import model from galaxy.exceptions import ConfigurationError @@ -154,6 +156,10 @@ def run_next(self): name = method.__name__ except Exception: name = UNKNOWN + + # Ensure a Job object belongs to a session + self._ensure_db_session(arg) + try: action_str = f"galaxy.jobs.runners.{self.__class__.__name__.lower()}.{name}" action_timer = self.app.execution_timer_factory.get_timer( @@ -171,6 +177,18 @@ def run_next(self): # Prevent fail_job cycle in the work_queue self.work_queue.put((self.fail_job, job_state)) + def _ensure_db_session(self, arg: typing.Union["JobWrapper", "JobState"]) -> None: + """Ensure Job object belongs to current session.""" + try: + job_wrapper = arg.job_wrapper # type: ignore[union-attr] + except AttributeError: + job_wrapper = arg + + if job_wrapper._job_io: + job = job_wrapper._job_io.job + if object_session(job) is None: + self.app.model.session().add(job) + # Causes a runner's `queue_job` method to be called from a worker thread def put(self, job_wrapper: "MinimalJobWrapper"): """Add a job to the queue (by job identifier), indicate that the job is ready to run.""" From df54cb6ee19d16700babff8713c9c7ce94d3203b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 8 Sep 2023 10:31:30 +0200 Subject: [PATCH 2/6] Try to always push outputs back to object store --- lib/galaxy/metadata/set_metadata.py | 48 +++++++++++++++++++---------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/lib/galaxy/metadata/set_metadata.py b/lib/galaxy/metadata/set_metadata.py index e25910c4af95..361cd0ed135d 100644 --- a/lib/galaxy/metadata/set_metadata.py +++ b/lib/galaxy/metadata/set_metadata.py @@ -16,6 +16,7 @@ import os import sys import traceback +from functools import partial from pathlib import Path from typing import Optional @@ -42,6 +43,7 @@ from galaxy.job_execution.setup import TOOL_PROVIDED_JOB_METADATA_KEYS from galaxy.model import ( Dataset, + DatasetInstance, HistoryDatasetAssociation, Job, store, @@ -75,6 +77,12 @@ MAX_STDIO_READ_BYTES = 100 * 10**6 # 100 MB +def reset_external_filename(dataset_instance: DatasetInstance): + assert dataset_instance.dataset + dataset_instance.dataset.external_filename = None + dataset_instance.dataset.extra_files_path = None + + def set_validated_state(dataset_instance): datatype_validation = validate(dataset_instance) @@ -371,6 +379,7 @@ def set_meta(new_dataset_instance, file_dict): set_meta_kwds = stringify_dictionary_keys( json.load(open(filename_kwds)) ) # load kwds; need to ensure our keywords are not unicode + object_store_update_actions = [] try: is_deferred = bool(unnamed_is_deferred.get(dataset_instance_id)) dataset.metadata_deferred = is_deferred @@ -423,16 +432,6 @@ def set_meta(new_dataset_instance, file_dict): setattr(dataset.metadata, metadata_name, metadata_file_override) if output_dict.get("validate", False): set_validated_state(dataset) - if dataset_instance_id not in unnamed_id_to_path: - # We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata, - # so skip set_meta here. - set_meta(dataset, file_dict) - if extended_metadata_collection: - collect_extra_files(object_store, dataset, ".") - dataset_state = "deferred" if (is_deferred and final_job_state == "ok") else final_job_state - if not dataset.state == dataset.states.ERROR: - # Don't overwrite failed state (for invalid content) here - dataset.state = dataset.dataset.state = dataset_state if extended_metadata_collection: if not object_store or not export_store: @@ -441,7 +440,22 @@ def set_meta(new_dataset_instance, file_dict): if not is_deferred and not link_data_only and os.path.getsize(external_filename): # Here we might be updating a disk based objectstore when outputs_to_working_directory is used, # or a remote object store from its cache path. - object_store.update_from_file(dataset.dataset, file_name=external_filename, create=True) + object_store_update_actions.append( + partial( + object_store.update_from_file, dataset.dataset, file_name=external_filename, create=True + ) + ) + object_store_update_actions.append(partial(reset_external_filename, dataset)) + object_store_update_actions.append(partial(export_store.add_dataset, dataset)) + if dataset_instance_id not in unnamed_id_to_path: + object_store_update_actions.append(partial(collect_extra_files, object_store, dataset, ".")) + dataset_state = "deferred" if (is_deferred and final_job_state == "ok") else final_job_state + if not dataset.state == dataset.states.ERROR: + # Don't overwrite failed state (for invalid content) here + dataset.state = dataset.dataset.state = dataset_state + # We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata, + # so only run set_meta for fixed outputs + set_meta(dataset, file_dict) # TODO: merge expression_context into tool_provided_metadata so we don't have to special case this (here and in _finish_dataset) meta = tool_provided_metadata.get_dataset_meta(output_name, dataset.dataset.id, dataset.dataset.uuid) if meta: @@ -472,12 +486,11 @@ def set_meta(new_dataset_instance, file_dict): if context_key in context: context_value = context[context_key] setattr(dataset, context_key, context_value) - # We only want to persist the external_filename if the dataset has been linked in. - if not is_deferred and not link_data_only: - dataset.dataset.external_filename = None - dataset.dataset.extra_files_path = None - export_store.add_dataset(dataset) else: + if dataset_instance_id not in unnamed_id_to_path: + # We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata, + # so only run set_meta for fixed outputs + set_meta(dataset, file_dict) dataset.metadata.to_JSON_dict(filename_out) # write out results of set_meta with open(filename_results_code, "w+") as tf: @@ -485,6 +498,9 @@ def set_meta(new_dataset_instance, file_dict): except Exception: with open(filename_results_code, "w+") as tf: json.dump((False, traceback.format_exc()), tf) # setting metadata has failed somehow + finally: + for action in object_store_update_actions: + action() if export_store: export_store.push_metadata_files() From d35414e5466701a5dd6bd8bd38e319c8e77ca73f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 8 Sep 2023 10:47:49 +0200 Subject: [PATCH 3/6] Add test case for expression tool data output with bam pja --- lib/galaxy_test/api/test_workflows.py | 39 +++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index c744a4e7c585..f4fcd72dab21 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -1938,6 +1938,45 @@ def test_workflow_metadata_validation_0(self): history_id=history_id, ) + def test_run_workflow_pick_value_bam_pja(self): + # Makes sure that setting metadata on expression tool data outputs + # doesn't break result evaluation. + with self.dataset_populator.test_history() as history_id: + self._run_workflow( + """class: GalaxyWorkflow +inputs: + some_file: + type: data +steps: + pick_value: + tool_id: pick_value + in: + style_cond|type_cond|pick_from_0|value: + source: some_file + out: + data_param: + change_datatype: bam + tool_state: + style_cond: + __current_case__: 2 + pick_style: first_or_error + type_cond: + __current_case__: 4 + param_type: data + pick_from: + - __index__: 0 + value: + __class__: RuntimeValue +""", + test_data=""" +some_file: + value: 1.bam + file_type: bam + type: File +""", + history_id=history_id, + ) + def test_run_workflow_simple_conditional_step(self): with self.dataset_populator.test_history() as history_id: summary = self._run_workflow( From 5e26987d4fb4263daebfd37525c072439c93931b Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 8 Sep 2023 11:42:28 +0200 Subject: [PATCH 4/6] Ignore failed metadata for expression.json outputs --- lib/galaxy/jobs/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index d167f4cdda00..4cb8aa6095b6 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1765,7 +1765,7 @@ def _finish_dataset(self, output_name, dataset, job, context, final_job_state, r metadata_set_successfully = self.external_output_metadata.external_metadata_set_successfully( dataset, output_name, self.sa_session, working_directory=self.working_directory ) - if retry_internally and not metadata_set_successfully: + if retry_internally and not metadata_set_successfully and not self.tool.tool_type == "expression": # If Galaxy was expected to sniff type and didn't - do so. if dataset.ext == "_sniff_": extension = sniff.handle_uploaded_dataset_file( @@ -1776,7 +1776,10 @@ def _finish_dataset(self, output_name, dataset, job, context, final_job_state, r # call datatype.set_meta directly for the initial set_meta call during dataset creation dataset.datatype.set_meta(dataset, overwrite=False) elif job.states.ERROR != final_job_state and not metadata_set_successfully: - dataset._state = model.Dataset.states.FAILED_METADATA + if self.tool.tool_type == "expression": + dataset._state = model.Dataset.states.OK + else: + dataset._state = model.Dataset.states.FAILED_METADATA else: self.external_output_metadata.load_metadata( dataset, From e6578937a2eee74ed7f7e77b4ab09942528b61b2 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Fri, 8 Sep 2023 13:05:59 +0200 Subject: [PATCH 5/6] Simplify ``if not metadata_set_successfully:`` branch --- lib/galaxy/jobs/__init__.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 4cb8aa6095b6..59d6d74fd603 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1765,19 +1765,19 @@ def _finish_dataset(self, output_name, dataset, job, context, final_job_state, r metadata_set_successfully = self.external_output_metadata.external_metadata_set_successfully( dataset, output_name, self.sa_session, working_directory=self.working_directory ) - if retry_internally and not metadata_set_successfully and not self.tool.tool_type == "expression": - # If Galaxy was expected to sniff type and didn't - do so. - if dataset.ext == "_sniff_": - extension = sniff.handle_uploaded_dataset_file( - dataset.dataset.file_name, self.app.datatypes_registry - ) - dataset.extension = extension - - # call datatype.set_meta directly for the initial set_meta call during dataset creation - dataset.datatype.set_meta(dataset, overwrite=False) - elif job.states.ERROR != final_job_state and not metadata_set_successfully: + if not metadata_set_successfully: if self.tool.tool_type == "expression": dataset._state = model.Dataset.states.OK + elif retry_internally: + # If Galaxy was expected to sniff type and didn't - do so. + if dataset.ext == "_sniff_": + extension = sniff.handle_uploaded_dataset_file( + dataset.dataset.file_name, self.app.datatypes_registry + ) + dataset.extension = extension + + # call datatype.set_meta directly for the initial set_meta call during dataset creation + dataset.datatype.set_meta(dataset, overwrite=False) else: dataset._state = model.Dataset.states.FAILED_METADATA else: From 3f76e1d81c764b31f1a32480f70fcb6bd1d714c9 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sat, 9 Sep 2023 12:02:31 +0200 Subject: [PATCH 6/6] Fix pulsar_embedded_metadata_extended_metadata tests --- lib/galaxy/metadata/set_metadata.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/metadata/set_metadata.py b/lib/galaxy/metadata/set_metadata.py index 361cd0ed135d..e4ce53173652 100644 --- a/lib/galaxy/metadata/set_metadata.py +++ b/lib/galaxy/metadata/set_metadata.py @@ -109,9 +109,7 @@ def set_meta_with_tool_provided( extension = dataset_instance.extension if extension == "_sniff_": try: - extension = sniff.handle_uploaded_dataset_file( - dataset_instance.dataset.external_filename, datatypes_registry - ) + extension = sniff.handle_uploaded_dataset_file(dataset_instance.dataset.file_name, datatypes_registry) # We need to both set the extension so it is available to set_meta # and record it in the metadata so it can be reloaded on the server # side and the model updated (see MetadataCollection.{from,to}_JSON_dict) @@ -401,7 +399,9 @@ def set_meta(new_dataset_instance, file_dict): if not link_data_only: # Only set external filename if we're dealing with files in job working directory. # Fixes link_data_only uploads - dataset.dataset.external_filename = external_filename + if not object_store: + # overriding the external filename would break pushing to object stores + dataset.dataset.external_filename = external_filename # We derive extra_files_dir_name from external_filename, because OutputsToWorkingDirectoryPathRewriter # always rewrites the path to include the uuid, even if store_by is set to id, and the extra files # rewrite is derived from the dataset path (since https://github.com/galaxyproject/galaxy/pull/16541).