From a6e58e00b74e976462d34dc47425f2dc7d134e2c Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 24 Sep 2024 13:52:51 -0400 Subject: [PATCH] Allow dataset and source checksum validation during materialization. --- lib/galaxy/managers/hdas.py | 4 +- lib/galaxy/model/__init__.py | 24 ++++++- lib/galaxy/model/deferred.py | 70 +++++++++++++++---- .../model/unittest_utils/store_fixtures.py | 2 +- lib/galaxy/schema/schema.py | 10 ++- lib/galaxy/schema/tasks.py | 5 ++ lib/galaxy/tools/data_fetch.py | 8 +-- lib/galaxy/util/hash_util.py | 8 +++ .../webapps/galaxy/api/history_contents.py | 4 ++ .../galaxy/services/history_contents.py | 1 + lib/galaxy_test/base/populators.py | 6 +- ...test_materialize_dataset_instance_tasks.py | 20 ++++++ .../unit/data/test_dataset_materialization.py | 63 +++++++++++++++++ 13 files changed, 200 insertions(+), 25 deletions(-) diff --git a/lib/galaxy/managers/hdas.py b/lib/galaxy/managers/hdas.py index 2b6d20388398..9f848c412618 100644 --- a/lib/galaxy/managers/hdas.py +++ b/lib/galaxy/managers/hdas.py @@ -190,7 +190,9 @@ def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place: else: dataset_instance = self.ldda_manager.get_accessible(request.content, user) history = self.app.history_manager.by_id(request.history_id) - new_hda = materializer.ensure_materialized(dataset_instance, target_history=history, in_place=in_place) + new_hda = materializer.ensure_materialized( + dataset_instance, target_history=history, validate_hashes=request.validate_hashes, in_place=in_place + ) if not in_place: history.add_dataset(new_hda, set_hid=True) session = self.session() diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 13a183212562..9b59c616e7ac 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -214,6 +214,7 @@ WorkflowMappingField, ) from galaxy.util.hash_util import ( + HashFunctionNameEnum, md5_hash_str, new_insecure_hash, ) @@ -4493,7 +4494,17 @@ def copy(self) -> "DatasetSource": return new_source -class DatasetSourceHash(Base, Serializable): +class HasHashFunctionName: + hash_function: Mapped[Optional[str]] + + @property + def hash_func_name(self) -> HashFunctionNameEnum: + as_str = self.hash_function + assert as_str + return HashFunctionNameEnum(self.hash_function) + + +class DatasetSourceHash(Base, Serializable, HasHashFunctionName): __tablename__ = "dataset_source_hash" id: Mapped[int] = mapped_column(primary_key=True) @@ -4518,7 +4529,7 @@ def copy(self) -> "DatasetSourceHash": return new_hash -class DatasetHash(Base, Dictifiable, Serializable): +class DatasetHash(Base, Dictifiable, Serializable, HasHashFunctionName): __tablename__ = "dataset_hash" id: Mapped[int] = mapped_column(primary_key=True) @@ -4547,6 +4558,15 @@ def copy(self) -> "DatasetHash": new_hash.extra_files_path = self.extra_files_path return new_hash + @property + def hash_func_name(self) -> HashFunctionNameEnum: + as_str = self.hash_function + assert as_str + return HashFunctionNameEnum(self.hash_function) + + +DescribesHash = Union[DatasetSourceHash, DatasetHash] + def datatype_for_extension(extension, datatypes_registry=None) -> "Data": if extension is not None: diff --git a/lib/galaxy/model/deferred.py b/lib/galaxy/model/deferred.py index 32ee9f954dda..f9b5f1414d3c 100644 --- a/lib/galaxy/model/deferred.py +++ b/lib/galaxy/model/deferred.py @@ -4,6 +4,7 @@ import shutil from typing import ( cast, + List, NamedTuple, Optional, Union, @@ -25,7 +26,9 @@ Dataset, DatasetCollection, DatasetCollectionElement, + DatasetHash, DatasetSource, + DescribesHash, History, HistoryDatasetAssociation, HistoryDatasetCollectionAssociation, @@ -36,6 +39,7 @@ ObjectStore, ObjectStorePopulator, ) +from galaxy.util.hash_util import verify_hash log = logging.getLogger(__name__) @@ -95,6 +99,7 @@ def ensure_materialized( self, dataset_instance: Union[HistoryDatasetAssociation, LibraryDatasetDatasetAssociation], target_history: Optional[History] = None, + validate_hashes: bool = False, in_place: bool = False, ) -> HistoryDatasetAssociation: """Create a new detached dataset instance from the supplied instance. @@ -107,15 +112,21 @@ def ensure_materialized( if dataset.state != Dataset.states.DEFERRED and isinstance(dataset_instance, HistoryDatasetAssociation): return dataset_instance - materialized_dataset = Dataset() - materialized_dataset.state = Dataset.states.OK - materialized_dataset.deleted = False - materialized_dataset.purged = False - materialized_dataset.sources = [s.copy() for s in dataset.sources] - materialized_dataset.hashes = [h.copy() for h in dataset.hashes] - + materialized_dataset_hashes = [h.copy() for h in dataset.hashes] + if in_place: + materialized_dataset = dataset_instance.dataset + materialized_dataset.state = Dataset.states.OK + else: + materialized_dataset = Dataset() + materialized_dataset.state = Dataset.states.OK + materialized_dataset.deleted = False + materialized_dataset.purged = False + materialized_dataset.sources = [s.copy() for s in dataset.sources] + materialized_dataset.hashes = materialized_dataset_hashes target_source = self._find_closest_dataset_source(dataset) transient_paths = None + + exception_materializing: Optional[Exception] = None if attached: object_store_populator = self._object_store_populator assert object_store_populator @@ -131,17 +142,28 @@ def ensure_materialized( with transaction(sa_session): sa_session.commit() object_store_populator.set_dataset_object_store_id(materialized_dataset) - path = self._stream_source(target_source, datatype=dataset_instance.datatype) - object_store.update_from_file(materialized_dataset, file_name=path) + try: + path = self._stream_source( + target_source, dataset_instance.datatype, validate_hashes, materialized_dataset_hashes + ) + object_store.update_from_file(materialized_dataset, file_name=path) + materialized_dataset.set_size() + except Exception as e: + exception_materializing = e else: transient_path_mapper = self._transient_path_mapper assert transient_path_mapper transient_paths = transient_path_mapper.transient_paths_for(dataset) # TODO: optimize this by streaming right to this path... # TODO: take into acount transform and ensure we are and are not modifying the file as appropriate. - path = self._stream_source(target_source, datatype=dataset_instance.datatype) - shutil.move(path, transient_paths.external_filename) - materialized_dataset.external_filename = transient_paths.external_filename + try: + path = self._stream_source( + target_source, dataset_instance.datatype, validate_hashes, materialized_dataset_hashes + ) + shutil.move(path, transient_paths.external_filename) + materialized_dataset.external_filename = transient_paths.external_filename + except Exception as e: + exception_materializing = e history = target_history if history is None and isinstance(dataset_instance, HistoryDatasetAssociation): @@ -159,6 +181,11 @@ def ensure_materialized( else: assert isinstance(dataset_instance, HistoryDatasetAssociation) materialized_dataset_instance = cast(HistoryDatasetAssociation, dataset_instance) + if exception_materializing is not None: + materialized_dataset.state = Dataset.states.ERROR + materialized_dataset_instance.info = ( + f"Failed to materialize deferred dataset with exception: {exception_materializing}" + ) if attached: sa_session = self._sa_session if sa_session is None: @@ -184,11 +211,17 @@ def ensure_materialized( materialized_dataset_instance.metadata_deferred = False return materialized_dataset_instance - def _stream_source(self, target_source: DatasetSource, datatype) -> str: + def _stream_source( + self, target_source: DatasetSource, datatype, validate_hashes: bool, dataset_hashes: List[DatasetHash] + ) -> str: source_uri = target_source.source_uri if source_uri is None: raise Exception("Cannot stream from dataset source without specified source_uri") path = stream_url_to_file(source_uri, file_sources=self._file_sources) + if validate_hashes and target_source.hashes: + for source_hash in target_source.hashes: + _validate_hash(path, source_hash, "downloaded file") + transform = target_source.transform or [] to_posix_lines = False spaces_to_tabs = False @@ -210,6 +243,11 @@ def _stream_source(self, target_source: DatasetSource, datatype) -> str: path = convert_result.converted_path if datatype_groom: datatype.groom_dataset_content(path) + + if validate_hashes and dataset_hashes: + for dataset_hash in dataset_hashes: + _validate_hash(path, dataset_hash, "dataset contents") + return path def _find_closest_dataset_source(self, dataset: Dataset) -> DatasetSource: @@ -306,3 +344,9 @@ def materializer_factory( file_sources=file_sources, sa_session=sa_session, ) + + +def _validate_hash(path: str, describes_hash: DescribesHash, what: str) -> None: + hash_value = describes_hash.hash_value + if hash_value is not None: + verify_hash(path, hash_func_name=describes_hash.hash_func_name, hash_value=hash_value) diff --git a/lib/galaxy/model/unittest_utils/store_fixtures.py b/lib/galaxy/model/unittest_utils/store_fixtures.py index d779e79d698e..c05f0660fcf1 100644 --- a/lib/galaxy/model/unittest_utils/store_fixtures.py +++ b/lib/galaxy/model/unittest_utils/store_fixtures.py @@ -11,7 +11,7 @@ TEST_SOURCE_URI = "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/2.bed" TEST_SOURCE_URI_BAM = "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam" TEST_HASH_FUNCTION = "MD5" -TEST_HASH_VALUE = "moocowpretendthisisahas" +TEST_HASH_VALUE = "f568c29421792b1b1df4474dafae01f1" TEST_HISTORY_NAME = "My history in a model store" TEST_EXTENSION = "bed" TEST_LIBRARY_NAME = "My cool library" diff --git a/lib/galaxy/schema/schema.py b/lib/galaxy/schema/schema.py index 9ed9c4ba3444..a2cb9ef72272 100644 --- a/lib/galaxy/schema/schema.py +++ b/lib/galaxy/schema/schema.py @@ -3681,7 +3681,15 @@ class PageSummaryBase(Model): ) -class MaterializeDatasetInstanceAPIRequest(Model): +class MaterializeDatasetOptions(Model): + validate_hashes: bool = Field( + False, + title="Validate hashes", + description="Set to true to enable dataset validation during materialization.", + ) + + +class MaterializeDatasetInstanceAPIRequest(MaterializeDatasetOptions): source: DatasetSourceType = Field( title="Source", description="The source of the content. Can be other history element to be copied or library elements.", diff --git a/lib/galaxy/schema/tasks.py b/lib/galaxy/schema/tasks.py index 313ec9f1dad0..d83640f52ffa 100644 --- a/lib/galaxy/schema/tasks.py +++ b/lib/galaxy/schema/tasks.py @@ -108,6 +108,11 @@ class MaterializeDatasetInstanceTaskRequest(Model): "- The decoded id of the HDA\n" ), ) + validate_hashes: bool = Field( + False, + title="Validate hashes", + description="Set to true to enable dataset validation during materialization.", + ) class ComputeDatasetHashTaskRequest(Model): diff --git a/lib/galaxy/tools/data_fetch.py b/lib/galaxy/tools/data_fetch.py index 540baacc2d91..fe1419aa1434 100644 --- a/lib/galaxy/tools/data_fetch.py +++ b/lib/galaxy/tools/data_fetch.py @@ -34,7 +34,7 @@ from galaxy.util.compression_utils import CompressedFile from galaxy.util.hash_util import ( HASH_NAMES, - memory_bound_hexdigest, + verify_hash, ) DESCRIPTION = """Data Import Script""" @@ -515,11 +515,7 @@ def _has_src_to_path(upload_config, item, is_dataset=False) -> Tuple[str, str]: def _handle_hash_validation(upload_config, hash_function, hash_value, path): if upload_config.validate_hashes: - calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_function, path=path) - if calculated_hash_value != hash_value: - raise Exception( - f"Failed to validate upload with [{hash_function}] - expected [{hash_value}] got [{calculated_hash_value}]" - ) + verify_hash(path, hash_func_name=hash_function, hash_value=hash_value, what="upload") def _arg_parser(): diff --git a/lib/galaxy/util/hash_util.py b/lib/galaxy/util/hash_util.py index 100adc23bcc4..7c685d2fd9f1 100644 --- a/lib/galaxy/util/hash_util.py +++ b/lib/galaxy/util/hash_util.py @@ -153,6 +153,14 @@ def parse_checksum_hash(checksum: str) -> Tuple[HashFunctionNameEnum, str]: return HashFunctionNameEnum(hash_name), hash_value +def verify_hash(path: str, hash_func_name: HashFunctionNameEnum, hash_value: str, what: str = "path"): + calculated_hash_value = memory_bound_hexdigest(hash_func_name=hash_func_name, path=path) + if calculated_hash_value != hash_value: + raise Exception( + f"Failed to validate {what} with [{hash_func_name}] - expected [{hash_value}] got [{calculated_hash_value}]" + ) + + __all__ = ( "md5", "hashlib", diff --git a/lib/galaxy/webapps/galaxy/api/history_contents.py b/lib/galaxy/webapps/galaxy/api/history_contents.py index 2386807211ba..52dae4cc12b0 100644 --- a/lib/galaxy/webapps/galaxy/api/history_contents.py +++ b/lib/galaxy/webapps/galaxy/api/history_contents.py @@ -54,6 +54,7 @@ HistoryContentType, MaterializeDatasetInstanceAPIRequest, MaterializeDatasetInstanceRequest, + MaterializeDatasetOptions, StoreExportPayload, UpdateHistoryContentsBatchPayload, UpdateHistoryContentsPayload, @@ -1072,12 +1073,15 @@ def materialize_dataset( history_id: HistoryIDPathParam, id: HistoryItemIDPathParam, trans: ProvidesHistoryContext = DependsOnTrans, + materialize_api_payload: Optional[MaterializeDatasetOptions] = Body(None), ) -> AsyncTaskResultSummary: + validate_hashes: bool = materialize_api_payload.validate_hashes if materialize_api_payload is not None else False # values are already validated, use model_construct materialize_request = MaterializeDatasetInstanceRequest.model_construct( history_id=history_id, source=DatasetSourceType.hda, content=id, + validate_hashes=validate_hashes, ) rval = self.service.materialize(trans, materialize_request) return rval diff --git a/lib/galaxy/webapps/galaxy/services/history_contents.py b/lib/galaxy/webapps/galaxy/services/history_contents.py index b1edebd3e897..42913b7f2b42 100644 --- a/lib/galaxy/webapps/galaxy/services/history_contents.py +++ b/lib/galaxy/webapps/galaxy/services/history_contents.py @@ -573,6 +573,7 @@ def materialize( history_id=request.history_id, source=request.source, content=request.content, + validate_hashes=request.validate_hashes, user=trans.async_request_user, ) results = materialize_task.delay(request=task_request) diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index 8343f05d1b24..ccf5bf0d7d22 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -981,7 +981,9 @@ def tools_post(self, payload: dict, url="tools") -> Response: tool_response = self._post(url, data=payload) return tool_response - def materialize_dataset_instance(self, history_id: str, id: str, source: str = "hda"): + def materialize_dataset_instance( + self, history_id: str, id: str, source: str = "hda", validate_hashes: bool = False + ): payload: Dict[str, Any] if source == "ldda": url = f"histories/{history_id}/materialize" @@ -992,6 +994,8 @@ def materialize_dataset_instance(self, history_id: str, id: str, source: str = " else: url = f"histories/{history_id}/contents/datasets/{id}/materialize" payload = {} + if validate_hashes: + payload["validate_hashes"] = True create_response = self._post(url, payload, json=True) api_asserts.assert_status_code_is_ok(create_response) create_response_json = create_response.json() diff --git a/test/integration/test_materialize_dataset_instance_tasks.py b/test/integration/test_materialize_dataset_instance_tasks.py index 7bd606cd390a..5d3d209e7221 100644 --- a/test/integration/test_materialize_dataset_instance_tasks.py +++ b/test/integration/test_materialize_dataset_instance_tasks.py @@ -78,6 +78,26 @@ def test_materialize_gxfiles_uri(self, history_id: str): assert new_hda_details["state"] == "ok" assert not new_hda_details["deleted"] + @pytest.mark.require_new_history + def test_materialize_hash_failure(self, history_id: str): + store_dict = deferred_hda_model_store_dict(source_uri="gxfiles://testdatafiles/2.bed") + store_dict["datasets"][0]["file_metadata"]["hashes"][0]["hash_value"] = "invalidhash" + as_list = self.dataset_populator.create_contents_from_store(history_id, store_dict=store_dict) + assert len(as_list) == 1 + deferred_hda = as_list[0] + assert deferred_hda["model_class"] == "HistoryDatasetAssociation" + assert deferred_hda["state"] == "deferred" + assert not deferred_hda["deleted"] + + self.dataset_populator.materialize_dataset_instance(history_id, deferred_hda["id"], validate_hashes=True) + self.dataset_populator.wait_on_history_length(history_id, 2) + new_hda_details = self.dataset_populator.get_history_dataset_details( + history_id, hid=2, assert_ok=False, wait=False + ) + assert new_hda_details["model_class"] == "HistoryDatasetAssociation" + assert new_hda_details["state"] == "error" + assert not new_hda_details["deleted"] + @pytest.mark.require_new_history def test_materialize_history_dataset_bam(self, history_id: str): as_list = self.dataset_populator.create_contents_from_store( diff --git a/test/unit/data/test_dataset_materialization.py b/test/unit/data/test_dataset_materialization.py index 9015b107539f..e002b439726d 100644 --- a/test/unit/data/test_dataset_materialization.py +++ b/test/unit/data/test_dataset_materialization.py @@ -62,6 +62,69 @@ def test_deferred_hdas_basic_attached(): _assert_2_bed_metadata(materialized_hda) +def test_hash_validate(): + fixture_context = setup_fixture_context_with_history() + store_dict = deferred_hda_model_store_dict() + perform_import_from_store_dict(fixture_context, store_dict) + deferred_hda = fixture_context.history.datasets[0] + assert deferred_hda + _assert_2_bed_metadata(deferred_hda) + assert deferred_hda.dataset.state == "deferred" + materializer = materializer_factory(True, object_store=fixture_context.app.object_store) + materialized_hda = materializer.ensure_materialized(deferred_hda, validate_hashes=True) + materialized_dataset = materialized_hda.dataset + assert materialized_dataset.state == "ok" + + +def test_hash_invalid(): + fixture_context = setup_fixture_context_with_history() + store_dict = deferred_hda_model_store_dict() + store_dict["datasets"][0]["file_metadata"]["hashes"][0]["hash_value"] = "invalidhash" + perform_import_from_store_dict(fixture_context, store_dict) + deferred_hda = fixture_context.history.datasets[0] + assert deferred_hda + _assert_2_bed_metadata(deferred_hda) + assert deferred_hda.dataset.state == "deferred" + materializer = materializer_factory(True, object_store=fixture_context.app.object_store) + materialized_hda = materializer.ensure_materialized(deferred_hda, validate_hashes=True) + materialized_dataset = materialized_hda.dataset + assert materialized_dataset.state == "error" + + +def test_hash_validate_source_of_download(): + fixture_context = setup_fixture_context_with_history() + store_dict = deferred_hda_model_store_dict() + store_dict["datasets"][0]["file_metadata"]["sources"][0]["hashes"] = [ + {"model_class": "DatasetSourceHash", "hash_function": "MD5", "hash_value": "f568c29421792b1b1df4474dafae01f1"} + ] + perform_import_from_store_dict(fixture_context, store_dict) + deferred_hda = fixture_context.history.datasets[0] + assert deferred_hda + _assert_2_bed_metadata(deferred_hda) + assert deferred_hda.dataset.state == "deferred" + materializer = materializer_factory(True, object_store=fixture_context.app.object_store) + materialized_hda = materializer.ensure_materialized(deferred_hda, validate_hashes=True) + materialized_dataset = materialized_hda.dataset + assert materialized_dataset.state == "ok", materialized_hda.info + + +def test_hash_invalid_source_of_download(): + fixture_context = setup_fixture_context_with_history() + store_dict = deferred_hda_model_store_dict() + store_dict["datasets"][0]["file_metadata"]["sources"][0]["hashes"] = [ + {"model_class": "DatasetSourceHash", "hash_function": "MD5", "hash_value": "invalidhash"} + ] + perform_import_from_store_dict(fixture_context, store_dict) + deferred_hda = fixture_context.history.datasets[0] + assert deferred_hda + _assert_2_bed_metadata(deferred_hda) + assert deferred_hda.dataset.state == "deferred" + materializer = materializer_factory(True, object_store=fixture_context.app.object_store) + materialized_hda = materializer.ensure_materialized(deferred_hda, validate_hashes=True) + materialized_dataset = materialized_hda.dataset + assert materialized_dataset.state == "error", materialized_hda.info + + def test_deferred_hdas_basic_attached_store_by_uuid(): # skip a flush here so this is a different path... fixture_context = setup_fixture_context_with_history(store_by="uuid")