Skip to content

Commit

Permalink
Use xxh3 throughout
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Jul 16, 2024
1 parent 534feda commit 629fba0
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 37 deletions.
2 changes: 1 addition & 1 deletion hera_librarian/async_transfers/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _get_transfer_data(self, label: str, settings: "ServerSettings"):
destination_endpoint=self.destination_endpoint,
label=label,
sync_level="exists",
verify_checksum=False, # we do this ourselves
verify_checksum=True, # We do this ourselves, but globus will auto-retry if it files failed files
preserve_timestamp=True,
notify_on_succeeded=False,
)
Expand Down
8 changes: 6 additions & 2 deletions hera_librarian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@
UserAdministrationUpdateRequest,
)
from .settings import ClientInfo
from .utils import get_md5_from_path, get_size_from_path
from .utils import (
get_checksum_from_path,
get_hash_function_from_hash,
get_size_from_path,
)

if TYPE_CHECKING:
from .transfers import CoreTransferManager
Expand Down Expand Up @@ -388,7 +392,7 @@ def upload(
endpoint="upload/stage",
request=UploadInitiationRequest(
upload_size=get_size_from_path(local_path),
upload_checksum=get_md5_from_path(local_path),
upload_checksum=get_checksum_from_path(local_path),
upload_name=dest_path.name,
destination_location=dest_path,
uploader=self.user,
Expand Down
162 changes: 161 additions & 1 deletion hera_librarian/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,110 @@
Useful utilities for files.
"""

import hashlib
import os
import os.path
import re
from pathlib import Path

from checksumdir import HASH_FUNCS, _filehash, dirhash
import pkg_resources
import xxhash

# Here we bundle the source code from checksumdir rather than relying on it as
# a dependency. Maintenance seems to have ended for checksumdir, and we want access
# to faster hashing functions

# --- Begin MIT Licensed checksumdir ---


__version__ = pkg_resources.require("checksumdir")[0].version

HASH_FUNCS = {
"md5": hashlib.md5,
"xxh3": xxhash.xxh3_128,
"sha1": hashlib.sha1,
"sha256": hashlib.sha256,
"sha512": hashlib.sha512,
}


def dirhash(
dirname,
hashfunc="md5",
excluded_files=None,
ignore_hidden=False,
followlinks=False,
excluded_extensions=None,
include_paths=False,
):
hash_func = HASH_FUNCS.get(hashfunc)
if not hash_func:
raise NotImplementedError("{} not implemented.".format(hashfunc))

if not excluded_files:
excluded_files = []

if not excluded_extensions:
excluded_extensions = []

if not os.path.isdir(dirname):
raise TypeError("{} is not a directory.".format(dirname))

hashvalues = []
for root, dirs, files in os.walk(dirname, topdown=True, followlinks=followlinks):
if ignore_hidden and re.search(r"/\.", root):
continue

dirs.sort()
files.sort()

for fname in files:
if ignore_hidden and fname.startswith("."):
continue

if fname.split(".")[-1:][0] in excluded_extensions:
continue

if fname in excluded_files:
continue

hashvalues.append(_filehash(os.path.join(root, fname), hash_func))

if include_paths:
hasher = hash_func()
# get the resulting relative path into array of elements
path_list = os.path.relpath(os.path.join(root, fname)).split(os.sep)
# compute the hash on joined list, removes all os specific separators
hasher.update("".join(path_list).encode("utf-8"))
hashvalues.append(hasher.hexdigest())

return _reduce_hash(hashvalues, hash_func)


def _filehash(filepath, hashfunc):
hasher = hashfunc()
blocksize = 64 * 1024

if not os.path.exists(filepath):
return hasher.hexdigest()

with open(filepath, "rb") as fp:
while True:
data = fp.read(blocksize)
if not data:
break
hasher.update(data)
return hasher.hexdigest()


def _reduce_hash(hashlist, hashfunc):
hasher = hashfunc()
for hashvalue in sorted(hashlist):
hasher.update(hashvalue.encode("utf-8"))
return hasher.hexdigest()


# --- end checksumdir ---


def get_type_from_path(path):
Expand Down Expand Up @@ -33,6 +133,66 @@ def get_md5_from_path(path):
return _filehash(path, HASH_FUNCS["md5"])


def get_checksum_from_path(path: str | Path, hash_function: str = "xxh3") -> str:
"""
Compute the checksum of a file from a path. This allows you to select
the underlying checksum function, which is by default the very fast
xxh3. Using this function, you also always have the hashing function
pre-pended to the hash itself.
"""

path = Path(path).resolve()

if path.is_dir():
return hash_function + ":::" + dirhash(path, hash_function)
else:
# Just a single file. That's fine!
return hash_function + ":::" + _filehash(path, HASH_FUNCS[hash_function])


def get_hash_function_from_hash(hash: str) -> str:
"""
Searches the hash for the hash function. If none is found, then we return
the old default (md5).
"""

for hash_func_name in HASH_FUNCS.keys():
if hash.startswith(hash_func_name + ":::"):
return hash_func_name

return "md5"


def get_base_hash_from_hash(hash: str) -> str:
"""
Gets the 'base' hash without our hashfunc::: prepended.
"""

for hash_func_name in HASH_FUNCS.keys():
if hash.startswith(hash_func_name + ":::"):
return hash.replace(hash_func_name + ":::", "")

return hash


def compare_checksums(a: str, b: str) -> bool:
"""
Compares two checksums to see if they match.
Raises a ValueError if a, b were checksummed with differing algorithms.
"""

hf_a = get_hash_function_from_hash(a)
hf_b = get_hash_function_from_hash(b)

if hf_a != hf_b:
raise ValueError(
f"Checksums {a} and {b} were created with differing has functions!"
)

return get_base_hash_from_hash(a) == get_base_hash_from_hash(b)


def get_size_from_path(path):
"""Get the number of bytes occupied within `path`.
Expand Down
10 changes: 7 additions & 3 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from schedule import CancelJob
from sqlalchemy.orm import Session

from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import Instance, StoreMetadata
Expand Down Expand Up @@ -72,7 +73,10 @@ def core(self, session: Session):
for file in files:
# Now we can check the integrity of each file.
try:
path_info = store.store_manager.path_info(file.path)
hash_function = get_hash_function_from_hash(file.file.checksum)
path_info = store.store_manager.path_info(
file.path, hash_function=hash_function
)
except FileNotFoundError:
all_files_fine = False
log_to_database(
Expand All @@ -86,7 +90,7 @@ def core(self, session: Session):
# Compare checksum to database
expected_checksum = file.file.checksum

if path_info.md5 == expected_checksum:
if compare_checksums(expected_checksum, path_info.checksum):
# File is fine.
logger.info(
f"File {file.path} on store {store.name} has been validated."
Expand All @@ -98,7 +102,7 @@ def core(self, session: Session):
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.md5}. (Instance: {file.id})",
message=f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.checksum}. (Instance: {file.id})",
session=session,
)

Expand Down
10 changes: 7 additions & 3 deletions librarian_background/create_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from sqlalchemy import select
from sqlalchemy.orm import Session

from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import CloneTransfer, Instance, StoreMetadata, TransferStatus
Expand Down Expand Up @@ -302,14 +303,17 @@ def core(self, session: Session):

# Now we can commit the file to the store.
try:
path_info = store_to.store_manager.path_info(staged_path)
hash_function = get_hash_function_from_hash(instance.file.checksum)
path_info = store_to.store_manager.path_info(
staged_path, hash_function=hash_function
)

if path_info.md5 != instance.file.checksum:
if not compare_checksums(path_info.checksum, instance.file.checksum):
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"File {instance.path} on store {store_to} has an incorrect checksum. "
f"Expected {instance.file.checksum}, got {path_info.md5}. (Instance {instance.id})",
f"Expected {instance.file.checksum}, got {path_info.checksum}. (Instance {instance.id})",
session=session,
)

Expand Down
3 changes: 2 additions & 1 deletion librarian_background/hypervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError
from hera_librarian.utils import compare_checksums
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import (
Expand Down Expand Up @@ -132,7 +133,7 @@ def handle_stale_outgoing_transfer(
available_checksum = available_checksums.pop()
available_store_id = available_store_ids.pop()

if available_checksum != expected_file_checksum:
if not compare_checksums(available_checksum, expected_file_checksum):
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
Expand Down
2 changes: 1 addition & 1 deletion librarian_server/api/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def commit(
)
response.status_code = status.HTTP_406_NOT_ACCEPTABLE
return UploadFailedResponse(
reason="File does not have a valid checksum or size.",
reason=f"File does not have a valid checksum or size",
suggested_remedy="Try to transfer the file again. If the problem persists, "
"contact the administrator of this librarian instance.",
)
Expand Down
18 changes: 9 additions & 9 deletions librarian_server/orm/storemetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
"""

import datetime
from enum import Enum
from pathlib import Path
from typing import Optional

from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session, reconstructor
Expand All @@ -17,8 +14,8 @@
async_transfer_manager_from_name,
)
from hera_librarian.deletion import DeletionPolicy
from hera_librarian.models.uploads import UploadCompletionRequest
from hera_librarian.transfers import CoreTransferManager, transfer_manager_from_name
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash

from .. import database as db
from ..stores import CoreStore, Stores
Expand Down Expand Up @@ -158,9 +155,13 @@ def ingest_staged_file(
staged_path = staging_directory / upload_name
store_path = self.store_manager.resolve_path_store(transfer.store_path)

hash_function = get_hash_function_from_hash(transfer.transfer_checksum)

# First up, check that we got what we expected!
try:
info = self.store_manager.path_info(staged_path)
info = self.store_manager.path_info(
staged_path, hash_function=hash_function
)
except FileNotFoundError:
transfer.status = TransferStatus.FAILED
session.commit()
Expand All @@ -169,9 +170,8 @@ def ingest_staged_file(
f"File {staged_path} not found in staging area. "
"It is likely there was a problem with the file upload. "
)
if (
info.size != transfer.transfer_size
or info.md5 != transfer.transfer_checksum
if info.size != transfer.transfer_size or (
not compare_checksums(info.checksum, transfer.transfer_checksum)
):
# We have a problem! The file is not what we expected. Delete it quickly!
self.store_manager.unstage(staging_directory)
Expand All @@ -182,7 +182,7 @@ def ingest_staged_file(
raise ValueError(
f"File {staged_path} does not match expected size/checksum; "
f"expected {transfer.transfer_size}/{transfer.transfer_checksum}, "
f"got {info.size}/{info.md5}."
f"got {info.size}/{info.checksum}."
)

# If we got here, we got what we expected. Let's try to commit the file to the store.
Expand Down
4 changes: 3 additions & 1 deletion librarian_server/stores/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,16 @@ def store(self, path: Path) -> Path:
raise NotImplementedError

@abc.abstractmethod
def path_info(self, path: Path) -> PathInfo:
def path_info(self, path: Path, hash_function: str = "xxh3") -> PathInfo:
"""
Get information about a file or directory at a path.
Parameters
----------
path : Path
Path to do this at.
hash_function: str
The hashing function chosen for checksuming this data.
Returns
-------
Expand Down
Loading

0 comments on commit 629fba0

Please sign in to comment.