Skip to content

Commit

Permalink
Confirm that the final copy worked
Browse files Browse the repository at this point in the history
Fixes #94
  • Loading branch information
JBorrow committed Nov 11, 2024
1 parent 8a77c2e commit a0a20f7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 10 deletions.
18 changes: 18 additions & 0 deletions librarian_background/create_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@ def core(self, session: Session):

all_transfers_successful = False

continue
except ValueError as e:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=(
f"Failed to commit file {instance.path} to store {store_to}: {e}. "
f"Skipping. (Instance {instance.id})"
),
session=session,
)

transfer.fail_transfer(session=session)

store_to.store_manager.unstage(staged_path)

all_transfers_successful = False

continue

store_to.store_manager.unstage(staging_name)
Expand Down
12 changes: 8 additions & 4 deletions librarian_server/orm/storemetadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,26 @@ def ingest_staged_file(
# Commit our change to the transfer, file, and instance simultaneously.

try:
session.commit()

# We're good to go and move the file to where it needs to be.
self.store_manager.commit(
staging_path=staged_path, store_path=resolved_store_path
)
self.store_manager.unstage(staging_directory)
except SQLAlchemyError as e:
# Need to rollback everything. The upload failed...

# SQLAlchemy is likely to be the least of our issues!
session.commit()
except (SQLAlchemyError, ValueError) as e:
# Need to rollback everything. The upload failed.
# In the case of ValueError, we have a problem with the file itself.
self.store_manager.unstage(staging_directory)

session.rollback()

transfer.status = TransferStatus.FAILED
session.commit()

raise e

return instance

def __repr__(self) -> str:
Expand Down
45 changes: 39 additions & 6 deletions librarian_server/stores/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from hera_librarian.utils import (
get_checksum_from_path,
get_hash_function_from_hash,
compare_checksums,
get_size_from_path,
get_type_from_path,
)
Expand All @@ -35,6 +36,8 @@ class LocalStore(CoreStore):
readonly_after_commit: bool = False
"If true, the user running the server will chmod the files to 444 and folders to 555 after commit."

max_copy_retries: int = 3

@property
def available(self) -> bool:
try:
Expand Down Expand Up @@ -133,6 +136,10 @@ def unstage(self, path: Path):
if os.path.exists(complete_path.parent):
try:
resolved_path = self._resolved_path_staging(complete_path.parent)

if any(resolved_path.iterdir()):
raise ValueError("Parent directory is not empty.")

resolved_path.rmdir()
except ValueError:
# The parent is not in the staging area. We can't delete it.
Expand All @@ -157,6 +164,10 @@ def delete(self, path: Path):
if os.path.exists(complete_path.parent):
try:
resolved_path = self._resolved_path_store(complete_path.parent)

if any(resolved_path.iterdir()):
raise ValueError("Parent directory is not empty.")

resolved_path.rmdir()
except (ValueError, OSError):
# The parent is not in the store area. We can't delete it, or
Expand All @@ -181,12 +192,34 @@ def commit(self, staging_path: Path, store_path: Path):

return
else:
# We need to copy the file and then set the permissions.
if resolved_path_staging.is_dir():
shutil.copytree(resolved_path_staging, resolved_path_store)
else:
shutil.copy2(resolved_path_staging, resolved_path_store)

# We need to copy the file and then set the permissions. Copying can inherrently
# introduce issues with corruption, so we need to be careful and make sure
# we have a good copy.
retries = 0
copy_success = False

original_checksum = get_checksum_from_path(resolved_path_staging)

while not copy_success and retries < self.max_copy_retries:
if resolved_path_staging.is_dir():
shutil.copytree(resolved_path_staging, resolved_path_store)
else:
shutil.copy2(resolved_path_staging, resolved_path_store)

new_checksum = get_checksum_from_path(resolved_path_store)

copy_success = compare_checksums(original_checksum, new_checksum)
retries += 1

if not copy_success:
# We need to clean up
self.delete(store_path)

raise ValueError(
f"Could not copy {resolved_path_staging} to {resolved_path_store} "
f"after {retries} attempts."
)

try:
# Set permissions and ownership.
def set_for_file(file: Path):
Expand Down

0 comments on commit a0a20f7

Please sign in to comment.