Skip to content

Commit

Permalink
Merge pull request #19215 from mvdbeek/24_0_fix_various_invocation_ex…
Browse files Browse the repository at this point in the history
…port_issues

[24.0] Fix various invocation export issues
  • Loading branch information
jmchilton authored Nov 29, 2024
2 parents f4ab432 + e301ec9 commit 1e6a5c0
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 27 deletions.
7 changes: 4 additions & 3 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def read_workflow_from_path(self, app, user, path, allow_in_directory=None) -> m
import_options = ImportOptions()
import_options.deduplicate_subworkflows = True
as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=None, import_options=import_options)
raw_description = RawWorkflowDescription(as_dict, path)
raw_description = RawWorkflowDescription(as_dict)
created_workflow = self.build_workflow_from_raw_description(trans, raw_description, WorkflowCreateOptions())
return created_workflow.workflow

Expand Down Expand Up @@ -925,8 +925,9 @@ def to_format_2(wf_dict, **kwds):
return wf_dict

def _sync_stored_workflow(self, trans, stored_workflow):
workflow_path = stored_workflow.from_path
self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans)
if trans.user_is_admin:
workflow_path = stored_workflow.from_path
self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans)

def store_workflow_artifacts(self, directory, filename_base, workflow, **kwd):
modern_workflow_path = os.path.join(directory, f"{filename_base}.gxwf.yml")
Expand Down
19 changes: 13 additions & 6 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2489,7 +2489,7 @@ class PostJobAction(Base, RepresentById):
workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), index=True, nullable=True)
action_type = Column(String(255), nullable=False)
output_name = Column(String(255), nullable=True)
action_arguments = Column(MutableJSONType, nullable=True)
_action_arguments = Column("action_arguments", MutableJSONType, nullable=True)
workflow_step = relationship(
"WorkflowStep",
back_populates="post_job_actions",
Expand All @@ -2503,6 +2503,18 @@ def __init__(self, action_type, workflow_step=None, output_name=None, action_arg
self.workflow_step = workflow_step
ensure_object_added_to_session(self, object_in_session=workflow_step)

@property
def action_arguments(self):
if self.action_type in ("HideDatasetAction", "DeleteIntermediatesAction") and self._action_arguments is True:
# Fix up broken workflows resulting from imports with gxformat2 <= 0.20.0
return {}
else:
return self._action_arguments

@action_arguments.setter
def action_arguments(self, value: Dict[str, Any]):
self._action_arguments = value


class PostJobActionAssociation(Base, RepresentById):
__tablename__ = "post_job_action_association"
Expand Down Expand Up @@ -6701,11 +6713,6 @@ class HistoryDatasetCollectionAssociation(
primaryjoin=copied_from_history_dataset_collection_association_id == id,
remote_side=[id],
uselist=False,
back_populates="copied_to_history_dataset_collection_association",
)
copied_to_history_dataset_collection_association = relationship(
"HistoryDatasetCollectionAssociation",
back_populates="copied_from_history_dataset_collection_association",
)
implicit_input_collections = relationship(
"ImplicitlyCreatedDatasetCollectionInput",
Expand Down
40 changes: 22 additions & 18 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,16 @@ def _import_collection_copied_associations(
# sense.
hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks
if copied_from_object_key in object_import_tracker.hdcas_by_key:
hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[
copied_from_object_key
]
source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key]
if source_hdca is not hdca:
# We may not have the copied source, in which case the first included HDCA in the chain
# acts as the source, so here we make sure we don't create a cycle.
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
if copied_from_object_key in hdca_copied_from_sinks:
hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[
hdca_copied_from_sinks[copied_from_object_key]
]
source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]]
if source_hdca is not hdca:
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key

Expand Down Expand Up @@ -1070,7 +1072,7 @@ def attach_workflow_step(imported_object, attrs):
for step_attrs in invocation_attrs["steps"]:
imported_invocation_step = model.WorkflowInvocationStep()
imported_invocation_step.workflow_invocation = imported_invocation
ensure_object_added_to_session(imported_invocation, session=self.sa_session)
ensure_object_added_to_session(imported_invocation_step, session=self.sa_session)
attach_workflow_step(imported_invocation_step, step_attrs)
restore_times(imported_invocation_step, step_attrs)
imported_invocation_step.action = step_attrs["action"]
Expand Down Expand Up @@ -1921,12 +1923,14 @@ def __init__(
self.export_files = export_files
self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {}
self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {}
self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = []
self.included_collections: Dict[
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
] = {}
self.included_libraries: List[model.Library] = []
self.included_library_folders: List[model.LibraryFolder] = []
self.included_invocations: List[model.WorkflowInvocation] = []
self.collection_datasets: Set[int] = set()
self.collections_attrs: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = []
self.dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]] = {}

self.job_output_dataset_associations: Dict[int, Dict[str, model.DatasetInstance]] = {}
Expand Down Expand Up @@ -2287,8 +2291,7 @@ def export_collection(
def add_dataset_collection(
self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]
) -> None:
self.collections_attrs.append(collection)
self.included_collections.append(collection)
self.included_collections[collection] = collection

def add_implicit_conversion_dataset(
self,
Expand Down Expand Up @@ -2343,7 +2346,7 @@ def to_json(attributes):

collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS)
with open(collections_attrs_filename, "w") as collections_attrs_out:
collections_attrs_out.write(to_json(self.collections_attrs))
collections_attrs_out.write(to_json(self.included_collections.values()))

conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS)
with open(conversions_attrs_filename, "w") as conversions_attrs_out:
Expand All @@ -2364,12 +2367,12 @@ def to_json(attributes):
#

# Get all jobs associated with included HDAs.
jobs_dict: Dict[str, model.Job] = {}
jobs_dict: Dict[int, model.Job] = {}
implicit_collection_jobs_dict = {}

def record_job(job):
if not job:
# No viable job.
if not job or job.id in jobs_dict:
# No viable job or job already recorded.
return

jobs_dict[job.id] = job
Expand All @@ -2395,10 +2398,11 @@ def record_associated_jobs(obj):
)
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
# record job (if one exists) even if dataset was copied
# copy could have been created manually through UI/API or using database operation tool,
# in which case we have a relevant job to export.
record_associated_jobs(job_hda)
job_hda = job_hda.copied_from_history_dataset_association
if not job_hda.creating_job_associations:
# No viable HDA found.
continue

record_associated_jobs(job_hda)

Expand Down
44 changes: 44 additions & 0 deletions test/integration/test_workflow_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from galaxy_test.base import api_asserts
from galaxy_test.base.api import UsesCeleryTasks
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
RunJobsSummary,
WorkflowPopulator,
Expand All @@ -27,6 +28,7 @@

class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, UsesCeleryTasks, RunsWorkflowFixtures):
dataset_populator: DatasetPopulator
dataset_collection_populator: DatasetCollectionPopulator
framework_tool_and_types = True

@classmethod
Expand All @@ -37,6 +39,7 @@ def handle_galaxy_config_kwds(cls, config):
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
self._write_file_fixtures()

Expand Down Expand Up @@ -124,6 +127,47 @@ def test_export_import_invocation_with_step_parameter(self):
invocation_details = self._export_and_import_workflow_invocation(summary, use_uris)
self._rerun_imported_workflow(summary, invocation_details)

def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(self):
with self.dataset_populator.test_history() as history_id:
self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True).json()
new_history = self.dataset_populator.copy_history(history_id=history_id).json()
copied_collection = self.dataset_populator.get_history_collection_details(new_history["id"])
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""class: GalaxyWorkflow
inputs:
input:
type: collection
collection_type: list
steps:
extract_dataset:
tool_id: __EXTRACT_DATASET__
in:
input:
source: input
"""
)
inputs = {"input": {"src": "hdca", "id": copied_collection["id"]}}
workflow_request = {"history": f"hist_id={new_history['id']}", "inputs_by": "name", "inputs": inputs}
invocation = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
).json()
invocation_id = invocation["id"]
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
jobs = self.workflow_populator.get_invocation_jobs(invocation_id)
summary = RunJobsSummary(
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation["id"],
inputs=inputs,
jobs=jobs,
invocation=invocation,
workflow_request=workflow_request,
)
imported_invocation_details = self._export_and_import_workflow_invocation(summary)
original_contents = self.dataset_populator.get_history_contents(new_history["id"])
contents = self.dataset_populator.get_history_contents(imported_invocation_details["history_id"])
assert len(contents) == len(original_contents) == 5

def _export_and_import_workflow_invocation(
self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz"
) -> Dict[str, Any]:
Expand Down

0 comments on commit 1e6a5c0

Please sign in to comment.