Skip to content

Commit

Permalink
Merge pull request #18342 from mvdbeek/fix_copying_purged_files
Browse files Browse the repository at this point in the history
[24.0] Do not copy purged outputs to object store
  • Loading branch information
mvdbeek authored Jun 11, 2024
2 parents d7dad3a + f0e09cc commit 96c9be3
Show file tree
Hide file tree
Showing 17 changed files with 332 additions and 66 deletions.
12 changes: 10 additions & 2 deletions lib/galaxy/job_execution/output_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
outdata.designation = designation
outdata.dataset.external_filename = None # resets filename_override
# Move data from temp location to dataset location
job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
if not outdata.dataset.purged:
job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
primary_output_assigned = True
continue
if name not in primary_datasets:
Expand Down Expand Up @@ -554,6 +555,7 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
dataset_attributes=new_primary_datasets_attributes,
creating_job_id=job_context.get_job_id() if job_context else None,
storage_callbacks=storage_callbacks,
purged=outdata.dataset.purged,
)
# Associate new dataset with job
job_context.add_output_dataset_association(f"__new_primary_file_{name}|{designation}__", primary_data)
Expand All @@ -563,7 +565,13 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
if primary_output_assigned:
outdata.name = new_outdata_name
outdata.init_meta()
outdata.set_meta()
if not outdata.dataset.purged:
try:
outdata.set_meta()
except Exception:
# We don't want to fail here on a single "bad" discovered dataset
log.debug("set meta failed for %s", outdata, exc_info=True)
outdata.state = HistoryDatasetAssociation.states.FAILED_METADATA
outdata.set_peek()
outdata.discovered = True
sa_session = job_context.sa_session
Expand Down
10 changes: 7 additions & 3 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,10 +2000,14 @@ def fail(message=job.info, exception=None):
quota_source_info = None
# Once datasets are collected, set the total dataset size (includes extra files)
for dataset_assoc in job.output_datasets:
if not dataset_assoc.dataset.dataset.purged:
dataset = dataset_assoc.dataset.dataset
if not dataset.purged:
# assume all datasets in a job get written to the same objectstore
quota_source_info = dataset_assoc.dataset.dataset.quota_source_info
collected_bytes += dataset_assoc.dataset.set_total_size()
quota_source_info = dataset.quota_source_info
collected_bytes += dataset.set_total_size()
else:
# Purge, in case job wrote directly to object store
dataset.full_delete()

user = job.user
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ def __copy_if_exists_command(work_dir_output):
source_file, destination = work_dir_output
if "?" in source_file or "*" in source_file:
source_file = source_file.replace("*", '"*"').replace("?", '"?"')
return f'\nif [ -f "{source_file}" ] ; then cp "{source_file}" "{destination}" ; fi'
# Check if source and destination exist.
# Users can purge outputs before the job completes,
# in that case we don't want to copy the output to a purged path.
# Static, non work_dir_output files are handled in job_finish code.
return f'\nif [ -f "{source_file}" -a -f "{destination}" ] ; then cp "{source_file}" "{destination}" ; fi'


class CommandsBuilder:
Expand Down
28 changes: 24 additions & 4 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ def get_work_dir_outputs(
job_tool = job_wrapper.tool
for joda, dataset in self._walk_dataset_outputs(job):
if joda and job_tool:
if dataset.dataset.purged:
log.info(
"Output dataset %s for job %s purged before job completed, skipping output collection.",
joda.name,
job.id,
)
continue
hda_tool_output = job_tool.find_output_def(joda.name)
if hda_tool_output and hda_tool_output.from_work_dir:
# Copy from working dir to HDA.
Expand Down Expand Up @@ -618,10 +625,23 @@ def _finish_or_resubmit_job(self, job_state: "JobState", job_stdout, job_stderr,

tool_stdout_path = os.path.join(outputs_directory, "tool_stdout")
tool_stderr_path = os.path.join(outputs_directory, "tool_stderr")
with open(tool_stdout_path, "rb") as stdout_file:
tool_stdout = self._job_io_for_db(stdout_file)
with open(tool_stderr_path, "rb") as stderr_file:
tool_stderr = self._job_io_for_db(stderr_file)
try:
with open(tool_stdout_path, "rb") as stdout_file:
tool_stdout = self._job_io_for_db(stdout_file)
with open(tool_stderr_path, "rb") as stderr_file:
tool_stderr = self._job_io_for_db(stderr_file)
except FileNotFoundError:
if job.state in (model.Job.states.DELETING, model.Job.states.DELETED):
# We killed the job, so we may not even have the tool stdout / tool stderr
tool_stdout = ""
tool_stderr = "Job cancelled"
else:
# Should we instead just move on ?
# In the end the only consequence here is that we won't be able to determine
# if the job failed for known tool reasons (check_tool_output).
# OTOH I don't know if this can even be reached
# Deal with it if we ever get reports about this.
raise

check_output_detected_state = job_wrapper.check_tool_output(
tool_stdout,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _metadata_results_from_file(self, dataset, filename_results_code):
rstring = f"Metadata results could not be read from '{filename_results_code}'"

if not rval:
log.debug(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
log.warning(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
return rval


Expand Down
13 changes: 9 additions & 4 deletions lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def push_if_necessary(object_store: ObjectStore, dataset: DatasetInstance, exter
# or a remote object store from its cache path.
# empty files could happen when outputs are discovered from working dir,
# empty file check needed for e.g. test/integration/test_extended_metadata_outputs_to_working_directory.py::test_tools[multi_output_assign_primary]
if os.path.getsize(external_filename):
if not dataset.dataset.purged and os.path.getsize(external_filename):
object_store.update_from_file(dataset.dataset, file_name=external_filename, create=True)


Expand Down Expand Up @@ -426,6 +426,10 @@ def set_meta(new_dataset_instance, file_dict):
# as opposed to perhaps a storage issue.
with open(external_filename, "wb"):
pass
elif not os.path.exists(dataset_filename_override):
# purged output ?
dataset.purged = True
dataset.dataset.purged = True
else:
raise Exception(f"Output file '{external_filename}' not found")

Expand Down Expand Up @@ -477,15 +481,16 @@ def set_meta(new_dataset_instance, file_dict):
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(dataset.set_total_size))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
if dataset_instance_id not in unnamed_id_to_path and not dataset.dataset.purged:
object_store_update_actions.append(partial(collect_extra_files, object_store, dataset, "."))
dataset_state = "deferred" if (is_deferred and final_job_state == "ok") else final_job_state
if not dataset.state == dataset.states.ERROR:
# Don't overwrite failed state (for invalid content) here
dataset.state = dataset.dataset.state = dataset_state
# We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata,
# so only run set_meta for fixed outputs
set_meta(dataset, file_dict)
if not dataset.dataset.purged:
set_meta(dataset, file_dict)
# TODO: merge expression_context into tool_provided_metadata so we don't have to special case this (here and in _finish_dataset)
meta = tool_provided_metadata.get_dataset_meta(output_name, dataset.dataset.id, dataset.dataset.uuid)
if meta:
Expand All @@ -512,7 +517,7 @@ def set_meta(new_dataset_instance, file_dict):
context_value = context[context_key]
setattr(dataset, context_key, context_value)
else:
if dataset_instance_id not in unnamed_id_to_path:
if dataset_instance_id not in unnamed_id_to_path and not dataset.dataset.purged:
# We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata,
# so only run set_meta for fixed outputs
set_meta(dataset, file_dict)
Expand Down
17 changes: 10 additions & 7 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4211,6 +4211,8 @@ def full_delete(self):
# TODO: purge metadata files
self.deleted = True
self.purged = True
self.file_size = 0
self.total_size = 0

def get_access_roles(self, security_agent):
roles = []
Expand Down Expand Up @@ -9485,13 +9487,14 @@ def dataset(self) -> Optional[Dataset]:
def update_from_file(self, file_name):
if not self.dataset:
raise Exception("Attempted to write MetadataFile, but no DatasetAssociation set")
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)
if not self.dataset.purged:
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)

def get_file_name(self, sync_cache=True):
# Ensure the directory structure and the metadata file object exist
Expand Down
25 changes: 13 additions & 12 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class ImportDiscardedDataType(Enum):

class DatasetAttributeImportModel(BaseModel):
state: Optional[DatasetStateField] = None
deleted: Optional[bool] = None
purged: Optional[bool] = None
external_filename: Optional[str] = None
_extra_files_path: Optional[str] = None
file_size: Optional[int] = None
Expand Down Expand Up @@ -470,6 +468,8 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
)
for attribute, value in dataset_attributes.items():
setattr(dataset_instance.dataset, attribute, value)
if dataset_instance.dataset.purged:
dataset_instance.dataset.full_delete()
self._attach_dataset_hashes(dataset_attrs["dataset"], dataset_instance)
self._attach_dataset_sources(dataset_attrs["dataset"], dataset_instance)
if "id" in dataset_attrs["dataset"] and self.import_options.allow_edit:
Expand Down Expand Up @@ -654,17 +654,18 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
dataset_instance.state = dataset_state
if not self.object_store:
raise Exception(f"self.object_store is missing from {self}.")
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)
if not dataset_instance.dataset.purged:
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)

# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
dataset_instance.dataset.file_size = None
dataset_instance.dataset.set_total_size() # update the filesize record in the database

Expand Down
12 changes: 11 additions & 1 deletion lib/galaxy/model/store/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def create_dataset(
creating_job_id=None,
output_name=None,
storage_callbacks=None,
purged=False,
):
tag_list = tag_list or []
sources = sources or []
Expand Down Expand Up @@ -190,7 +191,11 @@ def create_dataset(

if info is not None:
primary_data.info = info
if filename:

if purged:
primary_data.dataset.purged = True
primary_data.purged = True
if filename and not purged:
if storage_callbacks is None:
self.finalize_storage(
primary_data=primary_data,
Expand All @@ -214,6 +219,11 @@ def create_dataset(
return primary_data

def finalize_storage(self, primary_data, dataset_attributes, extra_files, filename, link_data, output_name):
if primary_data.dataset.purged:
# metadata won't be set, maybe we should do that, then purge ?
primary_data.dataset.file_size = 0
primary_data.dataset.total_size = 0
return
# Move data from temp location to dataset location
if not link_data:
dataset = primary_data.dataset
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def persist_extra_files(
primary_data: "DatasetInstance",
extra_files_path_name: Optional[str] = None,
) -> None:
if os.path.exists(src_extra_files_path):
if not primary_data.dataset.purged and os.path.exists(src_extra_files_path):
assert primary_data.dataset
if not extra_files_path_name:
extra_files_path_name = primary_data.dataset.extra_files_path_name_from(object_store)
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def create(self, trans, job_id, payload, **kwargs):
"""
job = self.__authorize_job_access(trans, job_id, **payload)
path = payload.get("path")
if not path:
raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.")
self.__check_job_can_write_to_path(trans, job, path)

# Is this writing an unneeded file? Should this just copy in Python?
Expand Down
90 changes: 90 additions & 0 deletions test/functional/tools/all_output_types.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<tool id="all_output_types" name="all_output_types" version="1.0.0" profile="24.0">
<command><![CDATA[
sleep $sleep_param &&
echo hi > output.txt &&
echo hi > '$static_output' &&
echo hi > '$static_output_2' &&
cp '$c1' galaxy.json
]]>
</command>
<configfiles>
<configfile name="c1">{"output_tool_supplied_metadata": {
"name": "my dynamic name",
"ext": "txt",
"info": "my dynamic info"
}}
</configfile>
</configfiles>
<inputs>
<param name="sleep_param" type="integer" value="0" />
</inputs>
<outputs>
<data name="static_output" format="txt" />
<data name="static_output_2" format="txt" />
<data name="output_workdir" from_work_dir="output.txt" format="txt" />
<data name="output_tool_supplied_metadata" from_work_dir="output.txt" format="auto" />
<data format="txt" name="discovered_output">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true"/>
</data>
<data format="txt" name="discovered_output_replaced">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true" assign_primary_output="true" />
</data>
<collection type="paired" name="static_pair" format="txt">
<data name="forward" from_work_dir="output.txt"></data>
<data name="reverse" from_work_dir="output.txt"></data>
</collection>
<collection type="list" name="discovered_list" format="txt">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true" />
</collection>
</outputs>
<tests>
<test>
<output name="static_output">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="output_workdir">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="output_tool_supplied_metadata">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="discovered_output">
<discovered_dataset designation="output" ftype="txt">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</discovered_dataset>
</output>
<output name="discovered_output_replaced" count="1">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output_collection name="static_pair" type="paired">
<element name="forward" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
<element name="reverse" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
</output_collection>
<output_collection name="discovered_list">
<element name="output" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
</output_collection>
</test>
</tests>
</tool>
Loading

0 comments on commit 96c9be3

Please sign in to comment.