Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[24.0] Do not copy purged outputs to object store #18342

Merged
merged 16 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ def __copy_if_exists_command(work_dir_output):
source_file, destination = work_dir_output
if "?" in source_file or "*" in source_file:
source_file = source_file.replace("*", '"*"').replace("?", '"?"')
return f'\nif [ -f "{source_file}" ] ; then cp "{source_file}" "{destination}" ; fi'
# Check if source and destination exist.
# Users can purge outputs before the job completes,
# in that case we don't want to copy the output to a purged path.
# Static, non work_dir_output files are handled in job_finish code.
return f'\nif [ -f "{source_file}" -a -f "{destination}" ] ; then cp "{source_file}" "{destination}" ; fi'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a wild check 😭. I hate it but I don't know the alternative. I know people love the disk object store and jobs writing directly to the object store 🤔.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of an "optimization" .. I'm happy to restore the original version, the important thing is that we don't store data in purged datasets.



class CommandsBuilder:
Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ def get_work_dir_outputs(
job_tool = job_wrapper.tool
for joda, dataset in self._walk_dataset_outputs(job):
if joda and job_tool:
if dataset.dataset.purged:
log.info(
"Output dataset %s for job %s purged before job completed, skipping output collection.",
joda.name,
job.id,
)
continue
hda_tool_output = job_tool.find_output_def(joda.name)
if hda_tool_output and hda_tool_output.from_work_dir:
# Copy from working dir to HDA.
Expand Down
4 changes: 2 additions & 2 deletions lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def push_if_necessary(object_store: ObjectStore, dataset: DatasetInstance, exter
# or a remote object store from its cache path.
# empty files could happen when outputs are discovered from working dir,
# empty file check needed for e.g. test/integration/test_extended_metadata_outputs_to_working_directory.py::test_tools[multi_output_assign_primary]
if os.path.getsize(external_filename):
if not dataset.dataset.purged and os.path.getsize(external_filename):
object_store.update_from_file(dataset.dataset, file_name=external_filename, create=True)


Expand Down Expand Up @@ -477,7 +477,7 @@ def set_meta(new_dataset_instance, file_dict):
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(dataset.set_total_size))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
if dataset_instance_id not in unnamed_id_to_path and not dataset.dataset.purged:
object_store_update_actions.append(partial(collect_extra_files, object_store, dataset, "."))
dataset_state = "deferred" if (is_deferred and final_job_state == "ok") else final_job_state
if not dataset.state == dataset.states.ERROR:
Expand Down
15 changes: 8 additions & 7 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9485,13 +9485,14 @@ def dataset(self) -> Optional[Dataset]:
def update_from_file(self, file_name):
if not self.dataset:
raise Exception("Attempted to write MetadataFile, but no DatasetAssociation set")
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)
if not self.dataset.purged:
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)

def get_file_name(self, sync_cache=True):
# Ensure the directory structure and the metadata file object exist
Expand Down
21 changes: 11 additions & 10 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,17 +654,18 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
dataset_instance.state = dataset_state
if not self.object_store:
raise Exception(f"self.object_store is missing from {self}.")
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)
if not dataset_instance.dataset.purged:
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)

# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
dataset_instance.dataset.file_size = None
dataset_instance.dataset.set_total_size() # update the filesize record in the database

Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/model/store/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def create_dataset(
return primary_data

def finalize_storage(self, primary_data, dataset_attributes, extra_files, filename, link_data, output_name):
if primary_data.dataset.purged:
# metadata won't be set, maybe we should do that, then purge ?
return
# Move data from temp location to dataset location
if not link_data:
dataset = primary_data.dataset
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def persist_extra_files(
primary_data: "DatasetInstance",
extra_files_path_name: Optional[str] = None,
) -> None:
if os.path.exists(src_extra_files_path):
if not primary_data.dataset.purged and os.path.exists(src_extra_files_path):
assert primary_data.dataset
if not extra_files_path_name:
extra_files_path_name = primary_data.dataset.extra_files_path_name_from(object_store)
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def create(self, trans, job_id, payload, **kwargs):
"""
job = self.__authorize_job_access(trans, job_id, **payload)
path = payload.get("path")
if not path:
raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.")
self.__check_job_can_write_to_path(trans, job, path)

# Is this writing an unneeded file? Should this just copy in Python?
Expand Down
4 changes: 2 additions & 2 deletions test/unit/app/jobs/test_command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
TEST_FILES_PATH = "file_path"
TEE_REDIRECT = '> "$__out" 2> "$__err"'
RETURN_CODE_CAPTURE = "; return_code=$?; echo $return_code > galaxy_1.ec"
CP_WORK_DIR_OUTPUTS = '; \nif [ -f "foo" ] ; then cp "foo" "bar" ; fi'
CP_WORK_DIR_OUTPUTS = '; \nif [ -f "foo" -a -f "bar" ] ; then cp "foo" "bar" ; fi'


class TestCommandFactory(TestCase):
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_workdir_outputs_with_glob(self):
self.workdir_outputs = [("foo*bar", "foo_x_bar")]
self._assert_command_is(
self._surround_command(
MOCK_COMMAND_LINE, '; \nif [ -f "foo"*"bar" ] ; then cp "foo"*"bar" "foo_x_bar" ; fi'
MOCK_COMMAND_LINE, '; \nif [ -f "foo"*"bar" -a -f "foo_x_bar" ] ; then cp "foo"*"bar" "foo_x_bar" ; fi'
)
)

Expand Down
Loading