Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor file upload code and handle errors #73

Merged
merged 12 commits into from
Jul 8, 2024
18 changes: 11 additions & 7 deletions hera_librarian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .authlevel import AuthLevel
from .deletion import DeletionPolicy
from .errors import ErrorCategory, ErrorSeverity
from .exceptions import LibrarianError, LibrarianHTTPError
from .exceptions import LibrarianError, LibrarianHTTPError, LibrarianTimeoutError
from .models.admin import (
AdminAddLibrarianRequest,
AdminAddLibrarianResponse,
Expand Down Expand Up @@ -195,12 +195,15 @@ def post(

data = None if request is None else request.model_dump_json()

r = requests.post(
self.resolve(endpoint),
data=data,
headers={"Content-Type": "application/json"},
auth=(self.user, self.password),
)
try:
r = requests.post(
self.resolve(endpoint),
data=data,
headers={"Content-Type": "application/json"},
auth=(self.user, self.password),
)
except (TimeoutError, requests.exceptions.ConnectionError):
raise LibrarianTimeoutError(url=self.resolve(endpoint))

if str(r.status_code)[0] != "2":
try:
Expand All @@ -222,6 +225,7 @@ def post(
suggested_remedy=response_json.get(
"suggested_remedy", "<no suggested remedy provided>"
),
full_response=response_json,
)

if response is None:
Expand Down
11 changes: 10 additions & 1 deletion hera_librarian/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@


class LibrarianHTTPError(Exception):
def __init__(self, url, status_code, reason, suggested_remedy):
def __init__(self, url, status_code, reason, suggested_remedy, full_response=None):
super(LibrarianHTTPError, self).__init__(
f"HTTP request to {url} failed with status code {status_code} and reason {reason}."
)
self.url = url
self.status_code = status_code
self.reason = reason
self.suggested_remedy = suggested_remedy
self.full_response = full_response


class LibrarianTimeoutError(Exception):
def __init__(self, url):
super(LibrarianTimeoutError, self).__init__(
f"HTTP request to {url} timed out or took too many retries."
)
self.url = url


class LibrarianError(Exception):
Expand Down
1 change: 1 addition & 0 deletions librarian_background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def background(run_once: bool = False):
+ background_settings.recieve_clone
+ background_settings.consume_queue
+ background_settings.check_consumed_queue
+ background_settings.outgoing_transfer_hypervisor
)

for task in all_tasks:
Expand Down
205 changes: 205 additions & 0 deletions librarian_background/hypervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""
The hypervisor task that checks on the status of outgoing transfers.

If they are stale, we call up the downstream librarian to ask for an
update on their status. This can lead to a:

a) Failure of the outgoing transfer
b) Successful transfer, if the file is found on the downstream
"""

import datetime

from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import (
Librarian,
OutgoingTransfer,
RemoteInstance,
TransferStatus,
)

from .task import Task


def get_stale_of_type(session: Session, age_in_days: int, transfer_type: object):
"""
Get the stale transfers of a given type.
"""

# Get the stale outgoing transfers
stale_since = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
days=age_in_days
)

transfer_stmt = session.query(transfer_type)

transfer_stmt = transfer_stmt.where(transfer_type.start_time < stale_since)

transfer_stmt = transfer_stmt.where(
transfer_type.status.in_(
[TransferStatus.INITIATED, TransferStatus.ONGOING, TransferStatus.STAGED]
)
)

return session.execute(transfer_stmt).scalars().all()


def handle_stale_outgoing_transfer(
session: Session, transfer: OutgoingTransfer
) -> bool:
"""
In all cases, we ask if the downstream has the file. If it does, we mark our
transfer as completed as if we just completed a transfer, and mark our
OutgoingTransfer as complete.

If the downstream does not have the file, we will ask the downstream to cancel
its incoming transfer.
"""

downstream_librarian = (
session.query(Librarian).filter_by(name=transfer.destination).one_or_none()
)

if not downstream_librarian:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Downstream librarian {transfer.destination} not found",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

client = downstream_librarian.client()

expected_file_name = transfer.file.name
expected_file_checksum = transfer.file.checksum

try:
potential_files = client.search_files(
name=expected_file_name,
)
except (LibrarianHTTPError, LibrarianTimeoutError) as e:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY,
message=(
f"Unacceptable error when trying to check if librarian {transfer.destination}"
f"has file {expected_file_name} with exception {e}."
),
session=session,
)
return False

if not potential_files:
# The downstream does not have the file. We should cancel the transfer.
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Downstream librarian {transfer.destination} does "
f"not have file {expected_file_name} and the transfer is stale. "
"Cancelling the transfer.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

available_checksums = {f.checksum for f in potential_files}
available_store_ids = {i.store_id for f in potential_files for i in f.instances}

if len(available_checksums) != 1:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Multiple (or zero, actual {len(available_checksums)}) checksums "
f"found for file {expected_file_name} "
f"on downstream librarian {transfer.destination}.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

available_checksum = available_checksums.pop()
available_store_id = available_store_ids.pop()

if available_checksum != expected_file_checksum:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Checksum mismatch for file {expected_file_name} "
f"on downstream librarian {transfer.destination}.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

# If we made it here, we succeeded, we just never heard back!

remote_instance = RemoteInstance.new_instance(
file=transfer.file,
store_id=available_store_id,
librarian=downstream_librarian,
)

session.add(remote_instance)
transfer.status = TransferStatus.COMPLETED

session.commit()

log_to_database(
severity=ErrorSeverity.INFO,
category=ErrorCategory.TRANSFER,
message=(
f"Successfully registered remote instance for {transfer.destination} and "
f"transfer {transfer.id} based upon stale transfer with "
f"status {transfer.status}."
),
session=session,
)

return True


class OutgoingTransferHypervisor(Task):
"""
Checks in on stale outgoing transfers.
"""

age_in_days: int
"The age in days of the outgoing transfer before we consider it stale."

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session):
"""
Checks for stale outgoing transfers and updates their status.
"""

start_time = datetime.datetime.now(datetime.timezone.utc)
end_time = start_time + self.soft_timeout

stale_transfers = get_stale_of_type(session, self.age_in_days, OutgoingTransfer)

for transfer in stale_transfers:
current_time = datetime.datetime.now(datetime.timezone.utc)

if current_time > end_time:
return False

handle_stale_outgoing_transfer(session, transfer)

return True
Loading
Loading