Skip to content

Commit

Permalink
Added full initial version of background task
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Dec 13, 2024
1 parent e81819f commit 2c002eb
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 67 deletions.
24 changes: 24 additions & 0 deletions hera_librarian/models/corrupt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Models for the corruption fixing endpoints.
"""

from pydantic import BaseModel


class CorruptionPreparationRequest(BaseModel):
file_name: str
librarian_name: str


class CorruptionPreparationResponse(BaseModel):
ready: bool


class CorruptionResendRequest(BaseModel):
librarian_name: str
file_name: str


class CorruptionResendResponse(BaseModel):
success: bool
destination_transfer_id: int
204 changes: 169 additions & 35 deletions librarian_background/corruption_fixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@
from sqlalchemy.orm import Session

from hera_librarian.errors import LibrarianError, LibrarianHTTPError
from hera_librarian.models.corrupt import (
CorruptionPreparationRequest,
CorruptionPreparationResponse,
CorruptionResendRequest,
CorruptionResendResponse,
)
from hera_librarian.transfer import TransferStatus
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.orm.file import CorruptFile, File
from librarian_server.orm.instance import Instance
from librarian_server.orm.librarian import Librarian
from librarian_server.orm.transfer import IncomingTransfer

from .task import Task

Expand Down Expand Up @@ -50,56 +59,63 @@ def core(self, session: Session) -> bool:
"Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name
)

# First: query the file table to see if we still have the file. We do not store
# a foreign key in the corrupt table because we may have deleted the file and
# failed to contact the upstream.
stmt = select(File).filter_by(name=corrupt.file_name)
potential_file = session.execute(stmt).scalar_one_or_none()

stmt = select(Instance).filter_by(id=corrupt.instance_id)
potential_instance = session.execute(stmt).scalar_one_or_none()

# Step A: Check that the file is actually corrupt
try:
hash_function = get_hash_function_from_hash(corrupt.file.checksum)
instance = corrupt.instance
store = instance.store
hash_function = get_hash_function_from_hash(potential_file.checksum)
store = potential_instance.store
path_info = store.store_manager.path_info(
instance.path, hash_function=hash_function
potential_instance.path, hash_function=hash_function
)

if compare_checksums(corrupt.file.checksum, path_info.checksum):
if compare_checksums(potential_file.checksum, path_info.checksum):
logger.info(
"CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} "
"but we just checked the checksums: {chk_a}=={chk_b} and the file is fine "
"or was fixed behind our back; removing CorruptFile row",
id=corrupt.id,
name=corrupt.file_name,
inst_id=corrupt.instance_id,
chk_a=corrupt.file.checksum,
chk_a=potential_file.checksum,
chk_b=path_info.checksum,
)
session.delete(corrupt)
session.commit()
continue
except FileNotFoundError:
logger.error(
"Instance {} on store {} is missing, but we will continue with recovery (Instance: {})",
instance.path,
store.name,
instance.id,
)

# Ok, so the file _really is corrupt_ or it is missing

# Remedy A: We have another local copy of the file!
# TODO: Implement this; it is not relevant for SO.
if len(corrupt.file.instances) > 1:
# Uhhh there is more than one instance here, we don't know what to do.
# Remedy A: We have another local copy of the file!
# TODO: Implement this; it is not relevant for SO.
if len(potential_file.instances) > 1:
# Uhhh there is more than one instance here, we don't know what to do.
logger.error(
"File {name} has a corrupt instance {id} but there is {n} > 1 "
"instances of the file on this librarian; entered block that was "
"never completed and need manual remedy",
name=corrupt.file_name,
id=corrupt.instance_id,
n=len(potential_file.instances),
)
continue
except (FileNotFoundError, AttributeError):
logger.error(
"File {name} has a corrupt instance {id} but there is {n} > 1 "
"instances of the file on this librarian; entered block that was "
"never completed and need manual remedy",
name=corrupt.file_name,
id=corrupt.instance_id,
n=len(corrupt.file.instances),
"Instance {} is missing, but we will continue with recovery (File: {})",
corrupt.instance_id,
corrupt.file_name,
)
continue

# Ok, so the file _really is corrupt_ or it is missing and we only have one instance

# Remedy B: the origin of this file is another librarian. Ask for a new copy.
stmt = select(Librarian).filter_by(name=corrupt.file.source)
result = session.execute(stmt).scalars().one_or_none()
stmt = select(Librarian).filter_by(name=corrupt.file_source)
result = session.execute(stmt).scalar_one_or_none()

if result is None:
logger.error(
Expand All @@ -108,7 +124,7 @@ def core(self, session: Session) -> bool:
"request a new valid copy of the file",
name=corrupt.file_name,
id=corrupt.instance_id,
lib=corrupt.file.source,
lib=corrupt.file_source,
)
continue

Expand All @@ -126,14 +142,132 @@ def core(self, session: Session) -> bool:
)
continue

# TODO: CALL PREPARE ENDPOINT
prepare_request = CorruptionPreparationRequest(
file_name=corrupt.file_name, librarian_name=result.name
)

# TODO: Deal with the fact that we would have broken remote instances..?
corrupt.file.delete(session=session, commit=False, force=True)
try:
prepare_response: CorruptionPreparationResponse = client.post(
endpoint="corrupt/prepare",
request=prepare_request,
response=CorruptionPreparationResponse,
)

# TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL
# HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING
# THE ROWS SIMPLIFIES THE LOGIC ABOVE.
if not prepare_response.ready:
raise ValueError("Preparation endpoint returned False")
except (LibrarianError, LibrarianHTTPError, ValueError) as e:
logger.error(
"Librarian {lib} contact during preparation for corruption fix to restore "
"{name} did not succeed: {e}",
lib=result.name,
name=corrupt.file_name,
e=e,
)
continue

# This also deletes remote instances which will need to be repaired. However
# it is unlikely that we will be in that situation. Unfortunately we _must_ commit
# this as the files table must be accessed from a different table.
corrupt.file.delete(session=session, commit=True, force=True)

resend_request = CorruptionResendRequest(
file_name=corrupt.file_name,
librarian_name=result.name,
)

try:
resend_response: CorruptionResendResponse = client.post(
"corrupt/resend",
request=resend_request,
response=CorruptionResendResponse,
)

if not resend_response.success:
raise ValueError("Failure during resend")
except (LibrarianError, LibrarianHTTPError):
logger.error(
"Failed during the resend request flow for librarian {lib}, "
"corrupt {id} for file {name} with {e}",
lib=result.name,
id=corrupt.id,
name=corrupt.file_name,
e=e,
)
continue

corrupt.incoming_transfer_id = resend_response.destination_transfer_id
corrupt.replacement_requested = True
session.commit()

# Now check in on files that we already requested new copies of.
query_start = perf_counter()

stmt = (
select(CorruptFile)
.filter(CorruptFile.replacement_requested == True)
.with_for_update(skip_locked=True)
)

results = session.execute(stmt).scalars().all()

query_end = perf_counter()

logger.info(
"Took {} s to query for {} corrupt files already in progress",
query_end - query_start,
len(results),
)

for result in results:
stmt = select(IncomingTransfer).filter_by(id=result.incoming_transfer_id)
transfer = session.execute(stmt).scalar_one_or_none()

file_is_fixed = False

if transfer.status in [TransferStatus.FAILED, TransferStatus.CANCELLED]:
logger.warning(
"Transfer for corrupt file {id} ({name}) is in status {status}",
id=result.id,
name=result.file_name,
status=transfer.status,
)
# That's no good. We should check to see if we got the file anyway:
stmt = select(File).filter_by(name=result.file_name)
file = session.execute(stmt).scalar_one_or_none()

if file is not None:
# Oh, we're good. Phew, we successfully ingested it.
logger.info(
"Though transfer is in status {status}, file {name} was successfully "
"ingested anyway",
status=transfer.status,
name=result.file_name,
)
file_is_fixed = True
else:
# We actually need to re-download it.
logger.warning(
"Re-setting corrupt file {id} ({name}) to not having a replacement requested "
"as the transfer failed. It will be re-downloaded at the next invocation ",
id=result.id,
name=result.file_name,
)
result.replacement_requested = False
elif transfer.status in [TransferStatus.COMPLETED]:
# That's good, we got the file!
file_is_fixed = True
else:
file_is_fixed = False

if file_is_fixed:
logger.info(
"Confirmed that corrupt file {id} ({name}) has been replaced with a new copy; "
"deleting the CorruptFile row",
id=result.id,
name=result.file_name,
)
session.delete(result)

session.commit()

return
2 changes: 1 addition & 1 deletion librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def send_file_batch(

call_destination_and_state_ongoing(send=send, session=session)

return True
return list(transfer_map.values())


class SendClone(Task):
Expand Down
38 changes: 16 additions & 22 deletions librarian_server/api/corrupt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@
from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError
from hera_librarian.models.corrupt import (
CorruptionPreparationRequest,
CorruptionPreparationResponse,
CorruptionResendRequest,
CorruptionResendResponse,
)
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.orm.instance import Instance, RemoteInstance
from librarian_server.orm.librarian import Librarian

router = APIRouter(prefix="/api/v2/corrupt")

from loguru import logger
from pydantic import BaseModel

from ..database import yield_session
from .auth import CallbackUserDependency, ReadappendUserDependency


class CorruptionPreparationRequest(BaseModel):
file_name: str
librarian_name: str


class CorruptionPreparationResponse(BaseModel):
ready: bool
from .auth import CallbackUserDependency, User


def user_and_librarian_validation_flow(
user, librarian_name, file_name, session
user: User, librarian_name: str, file_name: str, session: Session
) -> tuple[Librarian, File, Instance, list[RemoteInstance]]:
"""
Figure out if this user is a librarian and that we can make file transfers
to that librarian for this file. Also validates the file on our librarian to make
sure it is not corrupt and is present.
"""
user_is_librarian = user.username == librarian_name

stmt = select(Librarian).filter_by(name=librarian_name)
Expand Down Expand Up @@ -163,15 +164,6 @@ def prepare(
return CorruptionPreparationResponse(ready=True)


class CorruptionResendRequest(BaseModel):
librarian_name: str
file_name: str


class CorruptionResendResponse(BaseModel):
success: bool


@router.post("/resend")
def resend(
request: CorruptionResendRequest,
Expand Down Expand Up @@ -227,4 +219,6 @@ def resend(
request.file_name,
)

return CorruptionResendResponse(success=success)
return CorruptionResendResponse(
success=bool(success), destination_transfer_id=success[0]
)
Loading

0 comments on commit 2c002eb

Please sign in to comment.