Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[24.0] Fix various invocation export issues #19215

7 changes: 4 additions & 3 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def read_workflow_from_path(self, app, user, path, allow_in_directory=None) -> m
import_options = ImportOptions()
import_options.deduplicate_subworkflows = True
as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=None, import_options=import_options)
raw_description = RawWorkflowDescription(as_dict, path)
raw_description = RawWorkflowDescription(as_dict)
created_workflow = self.build_workflow_from_raw_description(trans, raw_description, WorkflowCreateOptions())
return created_workflow.workflow

Expand Down Expand Up @@ -925,8 +925,9 @@ def to_format_2(wf_dict, **kwds):
return wf_dict

def _sync_stored_workflow(self, trans, stored_workflow):
workflow_path = stored_workflow.from_path
self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans)
if trans.user_is_admin:
workflow_path = stored_workflow.from_path
self.store_workflow_to_path(workflow_path, stored_workflow, stored_workflow.latest_workflow, trans=trans)

def store_workflow_artifacts(self, directory, filename_base, workflow, **kwd):
modern_workflow_path = os.path.join(directory, f"{filename_base}.gxwf.yml")
Expand Down
19 changes: 13 additions & 6 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ class PostJobAction(Base, RepresentById):
workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), index=True, nullable=True)
action_type = Column(String(255), nullable=False)
output_name = Column(String(255), nullable=True)
action_arguments = Column(MutableJSONType, nullable=True)
_action_arguments = Column("action_arguments", MutableJSONType, nullable=True)
workflow_step = relationship(
"WorkflowStep",
back_populates="post_job_actions",
Expand All @@ -2500,6 +2500,18 @@ def __init__(self, action_type, workflow_step=None, output_name=None, action_arg
self.workflow_step = workflow_step
ensure_object_added_to_session(self, object_in_session=workflow_step)

@property
def action_arguments(self):
if self.action_type in ("HideDatasetAction", "DeleteIntermediatesAction") and self._action_arguments is True:
# Fix up broken workflows resulting from imports with gxformat2 <= 0.20.0
return {}
else:
return self._action_arguments

@action_arguments.setter
def action_arguments(self, value: Dict[str, Any]):
self._action_arguments = value


class PostJobActionAssociation(Base, RepresentById):
__tablename__ = "post_job_action_association"
Expand Down Expand Up @@ -6693,11 +6705,6 @@ class HistoryDatasetCollectionAssociation(
primaryjoin=copied_from_history_dataset_collection_association_id == id,
remote_side=[id],
uselist=False,
back_populates="copied_to_history_dataset_collection_association",
)
copied_to_history_dataset_collection_association = relationship(
"HistoryDatasetCollectionAssociation",
back_populates="copied_from_history_dataset_collection_association",
)
implicit_input_collections = relationship(
"ImplicitlyCreatedDatasetCollectionInput",
Expand Down
40 changes: 22 additions & 18 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,16 @@ def _import_collection_copied_associations(
# sense.
hdca_copied_from_sinks = object_import_tracker.hdca_copied_from_sinks
if copied_from_object_key in object_import_tracker.hdcas_by_key:
hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[
copied_from_object_key
]
source_hdca = object_import_tracker.hdcas_by_key[copied_from_object_key]
if source_hdca is not hdca:
# We may not have the copied source, in which case the first included HDCA in the chain
# acts as the source, so here we make sure we don't create a cycle.
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
if copied_from_object_key in hdca_copied_from_sinks:
hdca.copied_from_history_dataset_collection_association = object_import_tracker.hdcas_by_key[
hdca_copied_from_sinks[copied_from_object_key]
]
source_hdca = object_import_tracker.hdcas_by_key[hdca_copied_from_sinks[copied_from_object_key]]
if source_hdca is not hdca:
hdca.copied_from_history_dataset_collection_association = source_hdca
else:
hdca_copied_from_sinks[copied_from_object_key] = dataset_collection_key

Expand Down Expand Up @@ -1070,7 +1072,7 @@ def attach_workflow_step(imported_object, attrs):
for step_attrs in invocation_attrs["steps"]:
imported_invocation_step = model.WorkflowInvocationStep()
imported_invocation_step.workflow_invocation = imported_invocation
ensure_object_added_to_session(imported_invocation, session=self.sa_session)
ensure_object_added_to_session(imported_invocation_step, session=self.sa_session)
attach_workflow_step(imported_invocation_step, step_attrs)
restore_times(imported_invocation_step, step_attrs)
imported_invocation_step.action = step_attrs["action"]
Expand Down Expand Up @@ -1921,12 +1923,14 @@ def __init__(
self.export_files = export_files
self.included_datasets: Dict[model.DatasetInstance, Tuple[model.DatasetInstance, bool]] = {}
self.dataset_implicit_conversions: Dict[model.DatasetInstance, model.ImplicitlyConvertedDatasetAssociation] = {}
self.included_collections: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = []
self.included_collections: Dict[
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation],
] = {}
self.included_libraries: List[model.Library] = []
self.included_library_folders: List[model.LibraryFolder] = []
self.included_invocations: List[model.WorkflowInvocation] = []
self.collection_datasets: Set[int] = set()
self.collections_attrs: List[Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]] = []
self.dataset_id_to_path: Dict[int, Tuple[Optional[str], Optional[str]]] = {}

self.job_output_dataset_associations: Dict[int, Dict[str, model.DatasetInstance]] = {}
Expand Down Expand Up @@ -2287,8 +2291,7 @@ def export_collection(
def add_dataset_collection(
self, collection: Union[model.DatasetCollection, model.HistoryDatasetCollectionAssociation]
) -> None:
self.collections_attrs.append(collection)
self.included_collections.append(collection)
self.included_collections[collection] = collection

def add_implicit_conversion_dataset(
self,
Expand Down Expand Up @@ -2343,7 +2346,7 @@ def to_json(attributes):

collections_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_COLLECTIONS)
with open(collections_attrs_filename, "w") as collections_attrs_out:
collections_attrs_out.write(to_json(self.collections_attrs))
collections_attrs_out.write(to_json(self.included_collections.values()))

conversions_attrs_filename = os.path.join(export_directory, ATTRS_FILENAME_CONVERSIONS)
with open(conversions_attrs_filename, "w") as conversions_attrs_out:
Expand All @@ -2364,12 +2367,12 @@ def to_json(attributes):
#

# Get all jobs associated with included HDAs.
jobs_dict: Dict[str, model.Job] = {}
jobs_dict: Dict[int, model.Job] = {}
implicit_collection_jobs_dict = {}

def record_job(job):
if not job:
# No viable job.
if not job or job.id in jobs_dict:
# No viable job or job already recorded.
return

jobs_dict[job.id] = job
Expand All @@ -2395,10 +2398,11 @@ def record_associated_jobs(obj):
)
job_hda = hda
while job_hda.copied_from_history_dataset_association: # should this check library datasets as well?
# record job (if one exists) even if dataset was copied
# copy could have been created manually through UI/API or using database operation tool,
# in which case we have a relevant job to export.
record_associated_jobs(job_hda)
job_hda = job_hda.copied_from_history_dataset_association
if not job_hda.creating_job_associations:
# No viable HDA found.
continue

record_associated_jobs(job_hda)

Expand Down
44 changes: 44 additions & 0 deletions test/integration/test_workflow_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from galaxy_test.base import api_asserts
from galaxy_test.base.api import UsesCeleryTasks
from galaxy_test.base.populators import (
DatasetCollectionPopulator,
DatasetPopulator,
RunJobsSummary,
WorkflowPopulator,
Expand All @@ -27,6 +28,7 @@

class TestWorkflowTasksIntegration(PosixFileSourceSetup, IntegrationTestCase, UsesCeleryTasks, RunsWorkflowFixtures):
dataset_populator: DatasetPopulator
dataset_collection_populator: DatasetCollectionPopulator
framework_tool_and_types = True

@classmethod
Expand All @@ -37,6 +39,7 @@ def handle_galaxy_config_kwds(cls, config):
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
self.dataset_collection_populator = DatasetCollectionPopulator(self.galaxy_interactor)
self.workflow_populator = WorkflowPopulator(self.galaxy_interactor)
self._write_file_fixtures()

Expand Down Expand Up @@ -124,6 +127,47 @@ def test_export_import_invocation_with_step_parameter(self):
invocation_details = self._export_and_import_workflow_invocation(summary, use_uris)
self._rerun_imported_workflow(summary, invocation_details)

def test_export_import_invocation_with_copied_hdca_and_database_operation_tool(self):
with self.dataset_populator.test_history() as history_id:
self.dataset_collection_populator.create_list_in_history(history_id=history_id, wait=True).json()
new_history = self.dataset_populator.copy_history(history_id=history_id).json()
copied_collection = self.dataset_populator.get_history_collection_details(new_history["id"])
workflow_id = self.workflow_populator.upload_yaml_workflow(
"""class: GalaxyWorkflow
inputs:
input:
type: collection
collection_type: list
steps:
extract_dataset:
tool_id: __EXTRACT_DATASET__
in:
input:
source: input
"""
)
inputs = {"input": {"src": "hdca", "id": copied_collection["id"]}}
workflow_request = {"history": f"hist_id={new_history['id']}", "inputs_by": "name", "inputs": inputs}
invocation = self.workflow_populator.invoke_workflow_raw(
workflow_id, workflow_request, assert_ok=True
).json()
invocation_id = invocation["id"]
self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id)
jobs = self.workflow_populator.get_invocation_jobs(invocation_id)
summary = RunJobsSummary(
history_id=history_id,
workflow_id=workflow_id,
invocation_id=invocation["id"],
inputs=inputs,
jobs=jobs,
invocation=invocation,
workflow_request=workflow_request,
)
imported_invocation_details = self._export_and_import_workflow_invocation(summary)
original_contents = self.dataset_populator.get_history_contents(new_history["id"])
contents = self.dataset_populator.get_history_contents(imported_invocation_details["history_id"])
assert len(contents) == len(original_contents) == 5

def _export_and_import_workflow_invocation(
self, summary: RunJobsSummary, use_uris: bool = True, model_store_format="tgz"
) -> Dict[str, Any]:
Expand Down
Loading