Skip to content

Commit

Permalink
Add new batch failure style
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Jul 28, 2024
1 parent 177a04f commit 4135c04
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 33 deletions.
16 changes: 16 additions & 0 deletions hera_librarian/models/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,22 @@ class CloneFailedResponse(BaseModel):
"The ID of the transfer. Note that this is the IncomingTransfer ID."


class CloneBatchFailedResponse(BaseModel):
"""
(Generic) model for a response when a batch clone has failed. Allows
you to communicate information about the failure of a batch.
"""

reason: str
"Reason for failure."
suggested_remedy: str = "Please try again later."
"Suggested remedy for failure."
source_transfer_ids: list[int]
"The IDs of the transfers. Note that these are the OutgoingTransfer IDs."
destination_transfer_ids: list[int]
"The IDs of the transfers. Note that these are the IncomingTransfer IDs."


class CloneFailRequest(BaseModel):
"""
In a librarian A -> librarian B transfer, this is the request
Expand Down
15 changes: 8 additions & 7 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ def use_batch_to_call_librarian(

if e.status_code == 409:
# The librarian already has the file... Potentially.
potential_id = e.full_response.get("source_transfer_id", None)
potential_ids = e.full_response.get("source_transfer_ids", None)

if potential_id is None:
if potential_ids is None:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.PROGRAMMING,
Expand All @@ -181,11 +181,12 @@ def use_batch_to_call_librarian(
session=session,
)
else:
remedy_success = handle_existing_file(
session=session,
source_transfer_id=potential_id,
librarian=librarian,
)
for id in potential_ids:
remedy_success = handle_existing_file(
session=session,
source_transfer_id=id,
librarian=librarian,
)

# Oh no, we can't call up the librarian!
if not remedy_success:
Expand Down
109 changes: 91 additions & 18 deletions librarian_server/api/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sqlalchemy.orm import Session

from hera_librarian.models.clone import (
CloneBatchFailedResponse,
CloneBatchInitiationRequest,
CloneBatchInitiationResponse,
CloneBatchInitiationResponseFileItem,
Expand Down Expand Up @@ -221,9 +222,6 @@ def de_duplicate_file_and_transfer(

transfer.source_transfer_id = source_transfer_id

session.add(transfer)
session.commit()

return transfer


Expand Down Expand Up @@ -277,6 +275,9 @@ def stage(
upload_name=request.upload_name,
)

session.add(transfer)
session.commit()

# We have a store! Create the staging area.

file_name, file_location = store.store_manager.stage(
Expand Down Expand Up @@ -311,7 +312,10 @@ def stage(
return model_response


@router.post("/batch_stage", response_model=CloneBatchInitiationResponse)
@router.post(
"/batch_stage",
response_model=CloneBatchInitiationResponse | CloneBatchFailedResponse,
)
def batch_stage(
request: CloneBatchInitiationRequest,
response: Response,
Expand Down Expand Up @@ -356,23 +360,92 @@ def batch_stage(
)

clones = []
bad_ids_exist = []
bad_ids_transfer_exist = []
bad_ids_ongoing = []
transfers = []

for upload in request.uploads:
# This may raise a HTTPException. But that's ok; if we fail out after having
# created other 'incoming' transfers, they can just be cancelled by this same
# function later down the line when the offending file has been removed
# from the batch.
transfer = de_duplicate_file_and_transfer(
session=session,
source_transfer_id=upload.source_transfer_id,
source=request.source,
uploader=user.username,
upload_size=upload.upload_size,
upload_checksum=upload.upload_checksum,
destination_location=upload.destination_location,
upload_name=upload.upload_name,
)
try:
transfer = de_duplicate_file_and_transfer(
session=session,
source_transfer_id=upload.source_transfer_id,
source=request.source,
uploader=user.username,
upload_size=upload.upload_size,
upload_checksum=upload.upload_checksum,
destination_location=upload.destination_location,
upload_name=upload.upload_name,
)
except HTTPException as e:
log.warn(f"Error in batch staging: {e}")

if e.status_code == status.HTTP_409_CONFLICT:
bad_ids_exist.append(upload.source_transfer_id)
elif e.status_code == status.HTTP_406_NOT_ACCEPTABLE:
bad_ids_transfer_exist.append(upload.source_transfer_id)
elif e.status_code == status.HTTP_425_TOO_EARLY:
bad_ids_ongoing.append(upload.source_transfer_id)
else:
log.error(f"Unknown error in batch staging: {e}")

continue

transfers.append(transfer)

n_bad = len(bad_ids_exist) + len(bad_ids_transfer_exist) + len(bad_ids_ongoing)

if n_bad > 0:
# A bad batch. No worries, though, we never actually added
# those transfer objects. We can only truly handle a single
# error at once, though, so we will prioritize.
if len(bad_ids_ongoing) > 0:
response.status_code = status.HTTP_425_TOO_EARLY
return CloneBatchFailedResponse(
reason="Transfer is ongoing.",
suggested_remedy=(
"Error in sharing logic. Your librarian is trying to send us a copy of "
"a file with an ONGOING transfer. Check the background task ordering."
),
source_transfer_ids=bad_ids_ongoing,
destination_transfer_ids=[-1] * len(bad_ids_ongoing),
)
elif len(bad_ids_exist) > 0:
response.status_code = status.HTTP_409_CONFLICT
return CloneBatchFailedResponse(
reason="File already exists on librarian.",
suggested_remedy=(
"Error in sharing logic. Your librarain should "
"never have tried to copy this. Check the background task ordering."
),
source_transfer_ids=bad_ids_exist,
destination_transfer_ids=[-1] * len(bad_ids_exist),
)
elif len(bad_ids_transfer_exist) > 0:
response.status_code = status.HTTP_406_NOT_ACCEPTABLE
return CloneBatchFailedResponse(
reason="Transfer was already initiated or staged.",
suggested_remedy=(
"Your librarian tried to upload a file again that we thought was already "
"coming to us. You should fail your outgoing transfer, we have failed ours."
),
source_transfer_ids=bad_ids_transfer_exist,
destination_transfer_ids=[-1] * len(bad_ids_transfer_exist),
)
else:
log.error("Unknown error in batch staging.")
response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
return CloneBatchFailedResponse(
reason="Unknown error.",
suggested_remedy="Check the logs.",
source_transfer_ids=[-1],
destination_transfer_ids=[-1],
)

session.add_all(transfers)
session.commit()

for upload, transfer in zip(request.uploads, transfers):
# Now we have a handle on the transfer, let's stage it.
file_name, file_location = store.store_manager.stage(
file_size=upload.upload_size,
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_test/test_database_repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ def test_recover_from_disaster(
generate_task.core(session=session)

# Check we correctly registered remote instances on the source.
# There will be only one...
# They should all be there (tested in test_send_queue) but we
# care about one that we will re-create
found_remote_instanace = False
with source_session_maker() as session:
for file_name in copied_files:
Expand Down
10 changes: 3 additions & 7 deletions tests/integration_test/test_send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,12 @@ def test_send_from_existing_file_row(
generate_task.core(session=session)

# Check we correctly registered remote instances on the source.
# There will be only one...
found_remote_instanace = False
# We should correctly register all files.
with source_session_maker() as session:
for file_name in copied_files:
file = session.get(test_orm.File, file_name)
if len(file.remote_instances) > 0:
found_remote_instanace = True
break

assert found_remote_instanace
if len(file.remote_instances) == 0:
raise FileNotFoundError

# Remove the librarians we added.
assert mocked_admin_client.remove_librarian(name="live_server")
Expand Down

0 comments on commit 4135c04

Please sign in to comment.