Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[24.0] Do not copy purged outputs to object store #18342

Merged
merged 16 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lib/galaxy/job_execution/output_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
outdata.designation = designation
outdata.dataset.external_filename = None # resets filename_override
# Move data from temp location to dataset location
job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
if not outdata.dataset.purged:
job_context.object_store.update_from_file(outdata.dataset, file_name=filename, create=True)
primary_output_assigned = True
continue
if name not in primary_datasets:
Expand Down Expand Up @@ -554,6 +555,7 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
dataset_attributes=new_primary_datasets_attributes,
creating_job_id=job_context.get_job_id() if job_context else None,
storage_callbacks=storage_callbacks,
purged=outdata.dataset.purged,
)
# Associate new dataset with job
job_context.add_output_dataset_association(f"__new_primary_file_{name}|{designation}__", primary_data)
Expand All @@ -563,7 +565,13 @@ def collect_primary_datasets(job_context: Union[JobContext, SessionlessJobContex
if primary_output_assigned:
outdata.name = new_outdata_name
outdata.init_meta()
outdata.set_meta()
if not outdata.dataset.purged:
try:
outdata.set_meta()
except Exception:
# We don't want to fail here on a single "bad" discovered dataset
log.debug("set meta failed for %s", outdata, exc_info=True)
outdata.state = HistoryDatasetAssociation.states.FAILED_METADATA
outdata.set_peek()
outdata.discovered = True
sa_session = job_context.sa_session
Expand Down
10 changes: 7 additions & 3 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,10 +2000,14 @@ def fail(message=job.info, exception=None):
quota_source_info = None
# Once datasets are collected, set the total dataset size (includes extra files)
for dataset_assoc in job.output_datasets:
if not dataset_assoc.dataset.dataset.purged:
dataset = dataset_assoc.dataset.dataset
if not dataset.purged:
# assume all datasets in a job get written to the same objectstore
quota_source_info = dataset_assoc.dataset.dataset.quota_source_info
collected_bytes += dataset_assoc.dataset.set_total_size()
quota_source_info = dataset.quota_source_info
collected_bytes += dataset.set_total_size()
else:
# Purge, in case job wrote directly to object store
dataset.full_delete()

user = job.user
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ def __copy_if_exists_command(work_dir_output):
source_file, destination = work_dir_output
if "?" in source_file or "*" in source_file:
source_file = source_file.replace("*", '"*"').replace("?", '"?"')
return f'\nif [ -f "{source_file}" ] ; then cp "{source_file}" "{destination}" ; fi'
# Check if source and destination exist.
# Users can purge outputs before the job completes,
# in that case we don't want to copy the output to a purged path.
# Static, non work_dir_output files are handled in job_finish code.
return f'\nif [ -f "{source_file}" -a -f "{destination}" ] ; then cp "{source_file}" "{destination}" ; fi'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a wild check 😭. I hate it but I don't know the alternative. I know people love the disk object store and jobs writing directly to the object store 🤔.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit of an "optimization" .. I'm happy to restore the original version, the important thing is that we don't store data in purged datasets.



class CommandsBuilder:
Expand Down
28 changes: 24 additions & 4 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ def get_work_dir_outputs(
job_tool = job_wrapper.tool
for joda, dataset in self._walk_dataset_outputs(job):
if joda and job_tool:
if dataset.dataset.purged:
log.info(
"Output dataset %s for job %s purged before job completed, skipping output collection.",
joda.name,
job.id,
)
continue
hda_tool_output = job_tool.find_output_def(joda.name)
if hda_tool_output and hda_tool_output.from_work_dir:
# Copy from working dir to HDA.
Expand Down Expand Up @@ -618,10 +625,23 @@ def _finish_or_resubmit_job(self, job_state: "JobState", job_stdout, job_stderr,

tool_stdout_path = os.path.join(outputs_directory, "tool_stdout")
tool_stderr_path = os.path.join(outputs_directory, "tool_stderr")
with open(tool_stdout_path, "rb") as stdout_file:
tool_stdout = self._job_io_for_db(stdout_file)
with open(tool_stderr_path, "rb") as stderr_file:
tool_stderr = self._job_io_for_db(stderr_file)
try:
with open(tool_stdout_path, "rb") as stdout_file:
tool_stdout = self._job_io_for_db(stdout_file)
with open(tool_stderr_path, "rb") as stderr_file:
tool_stderr = self._job_io_for_db(stderr_file)
except FileNotFoundError:
if job.state in (model.Job.states.DELETING, model.Job.states.DELETED):
# We killed the job, so we may not even have the tool stdout / tool stderr
tool_stdout = ""
tool_stderr = "Job cancelled"
else:
# Should we instead just move on ?
# In the end the only consequence here is that we won't be able to determine
# if the job failed for known tool reasons (check_tool_output).
# OTOH I don't know if this can even be reached
# Deal with it if we ever get reports about this.
raise

check_output_detected_state = job_wrapper.check_tool_output(
tool_stdout,
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _metadata_results_from_file(self, dataset, filename_results_code):
rstring = f"Metadata results could not be read from '{filename_results_code}'"

if not rval:
log.debug(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
log.warning(f"setting metadata externally failed for {dataset.__class__.__name__} {dataset.id}: {rstring}")
return rval


Expand Down
13 changes: 9 additions & 4 deletions lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def push_if_necessary(object_store: ObjectStore, dataset: DatasetInstance, exter
# or a remote object store from its cache path.
# empty files could happen when outputs are discovered from working dir,
# empty file check needed for e.g. test/integration/test_extended_metadata_outputs_to_working_directory.py::test_tools[multi_output_assign_primary]
if os.path.getsize(external_filename):
if not dataset.dataset.purged and os.path.getsize(external_filename):
object_store.update_from_file(dataset.dataset, file_name=external_filename, create=True)


Expand Down Expand Up @@ -426,6 +426,10 @@ def set_meta(new_dataset_instance, file_dict):
# as opposed to perhaps a storage issue.
with open(external_filename, "wb"):
pass
elif not os.path.exists(dataset_filename_override):
# purged output ?
dataset.purged = True
dataset.dataset.purged = True
else:
raise Exception(f"Output file '{external_filename}' not found")

Expand Down Expand Up @@ -477,15 +481,16 @@ def set_meta(new_dataset_instance, file_dict):
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(dataset.set_total_size))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
if dataset_instance_id not in unnamed_id_to_path and not dataset.dataset.purged:
object_store_update_actions.append(partial(collect_extra_files, object_store, dataset, "."))
dataset_state = "deferred" if (is_deferred and final_job_state == "ok") else final_job_state
if not dataset.state == dataset.states.ERROR:
# Don't overwrite failed state (for invalid content) here
dataset.state = dataset.dataset.state = dataset_state
# We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata,
# so only run set_meta for fixed outputs
set_meta(dataset, file_dict)
if not dataset.dataset.purged:
set_meta(dataset, file_dict)
# TODO: merge expression_context into tool_provided_metadata so we don't have to special case this (here and in _finish_dataset)
meta = tool_provided_metadata.get_dataset_meta(output_name, dataset.dataset.id, dataset.dataset.uuid)
if meta:
Expand All @@ -512,7 +517,7 @@ def set_meta(new_dataset_instance, file_dict):
context_value = context[context_key]
setattr(dataset, context_key, context_value)
else:
if dataset_instance_id not in unnamed_id_to_path:
if dataset_instance_id not in unnamed_id_to_path and not dataset.dataset.purged:
# We're going to run through set_metadata in collect_dynamic_outputs with more contextual metadata,
# so only run set_meta for fixed outputs
set_meta(dataset, file_dict)
Expand Down
17 changes: 10 additions & 7 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4211,6 +4211,8 @@ def full_delete(self):
# TODO: purge metadata files
self.deleted = True
self.purged = True
self.file_size = 0
self.total_size = 0

def get_access_roles(self, security_agent):
roles = []
Expand Down Expand Up @@ -9485,13 +9487,14 @@ def dataset(self) -> Optional[Dataset]:
def update_from_file(self, file_name):
if not self.dataset:
raise Exception("Attempted to write MetadataFile, but no DatasetAssociation set")
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)
if not self.dataset.purged:
self.dataset.object_store.update_from_file(
self,
file_name=file_name,
extra_dir="_metadata_files",
extra_dir_at_root=True,
alt_name=os.path.basename(self.get_file_name()),
)

def get_file_name(self, sync_cache=True):
# Ensure the directory structure and the metadata file object exist
Expand Down
25 changes: 13 additions & 12 deletions lib/galaxy/model/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ class ImportDiscardedDataType(Enum):

class DatasetAttributeImportModel(BaseModel):
state: Optional[DatasetStateField] = None
deleted: Optional[bool] = None
purged: Optional[bool] = None
external_filename: Optional[str] = None
_extra_files_path: Optional[str] = None
file_size: Optional[int] = None
Expand Down Expand Up @@ -470,6 +468,8 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
)
for attribute, value in dataset_attributes.items():
setattr(dataset_instance.dataset, attribute, value)
if dataset_instance.dataset.purged:
dataset_instance.dataset.full_delete()
self._attach_dataset_hashes(dataset_attrs["dataset"], dataset_instance)
self._attach_dataset_sources(dataset_attrs["dataset"], dataset_instance)
if "id" in dataset_attrs["dataset"] and self.import_options.allow_edit:
Expand Down Expand Up @@ -654,17 +654,18 @@ def handle_dataset_object_edit(dataset_instance, dataset_attrs):
dataset_instance.state = dataset_state
if not self.object_store:
raise Exception(f"self.object_store is missing from {self}.")
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)
if not dataset_instance.dataset.purged:
self.object_store.update_from_file(
dataset_instance.dataset, file_name=temp_dataset_file_name, create=True
)

# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
# Import additional files if present. Histories exported previously might not have this attribute set.
dataset_extra_files_path = dataset_attrs.get("extra_files_path", None)
if dataset_extra_files_path:
assert file_source_root
dataset_extra_files_path = os.path.join(file_source_root, dataset_extra_files_path)
persist_extra_files(self.object_store, dataset_extra_files_path, dataset_instance)
# Don't trust serialized file size
dataset_instance.dataset.file_size = None
dataset_instance.dataset.set_total_size() # update the filesize record in the database

Expand Down
12 changes: 11 additions & 1 deletion lib/galaxy/model/store/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def create_dataset(
creating_job_id=None,
output_name=None,
storage_callbacks=None,
purged=False,
):
tag_list = tag_list or []
sources = sources or []
Expand Down Expand Up @@ -190,7 +191,11 @@ def create_dataset(

if info is not None:
primary_data.info = info
if filename:

if purged:
primary_data.dataset.purged = True
primary_data.purged = True
if filename and not purged:
if storage_callbacks is None:
self.finalize_storage(
primary_data=primary_data,
Expand All @@ -214,6 +219,11 @@ def create_dataset(
return primary_data

def finalize_storage(self, primary_data, dataset_attributes, extra_files, filename, link_data, output_name):
if primary_data.dataset.purged:
# metadata won't be set, maybe we should do that, then purge ?
primary_data.dataset.file_size = 0
primary_data.dataset.total_size = 0
return
# Move data from temp location to dataset location
if not link_data:
dataset = primary_data.dataset
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def persist_extra_files(
primary_data: "DatasetInstance",
extra_files_path_name: Optional[str] = None,
) -> None:
if os.path.exists(src_extra_files_path):
if not primary_data.dataset.purged and os.path.exists(src_extra_files_path):
assert primary_data.dataset
if not extra_files_path_name:
extra_files_path_name = primary_data.dataset.extra_files_path_name_from(object_store)
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/webapps/galaxy/api/job_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def create(self, trans, job_id, payload, **kwargs):
"""
job = self.__authorize_job_access(trans, job_id, **payload)
path = payload.get("path")
if not path:
raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.")
self.__check_job_can_write_to_path(trans, job, path)

# Is this writing an unneeded file? Should this just copy in Python?
Expand Down
90 changes: 90 additions & 0 deletions test/functional/tools/all_output_types.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<tool id="all_output_types" name="all_output_types" version="1.0.0" profile="24.0">
<command><![CDATA[
sleep $sleep_param &&
echo hi > output.txt &&
echo hi > '$static_output' &&
echo hi > '$static_output_2' &&
cp '$c1' galaxy.json
]]>
</command>
<configfiles>
<configfile name="c1">{"output_tool_supplied_metadata": {
"name": "my dynamic name",
"ext": "txt",
"info": "my dynamic info"
}}
</configfile>
</configfiles>
<inputs>
<param name="sleep_param" type="integer" value="0" />
</inputs>
<outputs>
<data name="static_output" format="txt" />
<data name="static_output_2" format="txt" />
<data name="output_workdir" from_work_dir="output.txt" format="txt" />
<data name="output_tool_supplied_metadata" from_work_dir="output.txt" format="auto" />
<data format="txt" name="discovered_output">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true"/>
</data>
<data format="txt" name="discovered_output_replaced">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true" assign_primary_output="true" />
</data>
<collection type="paired" name="static_pair" format="txt">
<data name="forward" from_work_dir="output.txt"></data>
<data name="reverse" from_work_dir="output.txt"></data>
</collection>
<collection type="list" name="discovered_list" format="txt">
<discover_datasets pattern="(?P&lt;designation&gt;.+)\.txt" ext="txt" visible="true" />
</collection>
</outputs>
<tests>
<test>
<output name="static_output">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="output_workdir">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="output_tool_supplied_metadata">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output name="discovered_output">
<discovered_dataset designation="output" ftype="txt">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</discovered_dataset>
</output>
<output name="discovered_output_replaced" count="1">
<assert_contents>
<has_text text="hi"/>
</assert_contents>
</output>
<output_collection name="static_pair" type="paired">
<element name="forward" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
<element name="reverse" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
</output_collection>
<output_collection name="discovered_list">
<element name="output" ftype="txt">
<assert_contents>
<has_text text="hi"></has_text>
</assert_contents>
</element>
</output_collection>
</test>
</tests>
</tool>
Loading
Loading