Skip to content

Commit

Permalink
Added outgoing transfer hypervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Jun 18, 2024
1 parent 5ab4f2b commit 5688895
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 86 deletions.
1 change: 1 addition & 0 deletions librarian_background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def background(run_once: bool = False):
+ background_settings.recieve_clone
+ background_settings.consume_queue
+ background_settings.check_consumed_queue
+ background_settings.outgoing_transfer_hypervisor
)

for task in all_tasks:
Expand Down
205 changes: 205 additions & 0 deletions librarian_background/hypervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""
The hypervisor task that checks on the status of outgoing transfers.
If they are stale, we call up the downstream librarian to ask for an
update on their status. This can lead to a:
a) Failure of the outgoing transfer
b) Successful transfer, if the file is found on the downstream
"""

import datetime

from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import (
Librarian,
OutgoingTransfer,
RemoteInstance,
TransferStatus,
)

from .task import Task


def get_stale_of_type(session: Session, age_in_days: int, transfer_type: object):
"""
Get the stale transfers of a given type.
"""

# Get the stale outgoing transfers
stale_since = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
days=age_in_days
)

transfer_stmt = session.query(transfer_type)

transfer_stmt = transfer_stmt.where(transfer_type.start_time < stale_since)

transfer_stmt = transfer_stmt.where(
transfer_type.status.in_(
[TransferStatus.INITIATED, TransferStatus.ONGOING, TransferStatus.STAGED]
)
)

return session.execute(transfer_stmt).scalars().all()


def handle_stale_outgoing_transfer(
session: Session, transfer: OutgoingTransfer
) -> bool:
"""
In all cases, we ask if the downstream has the file. If it does, we mark our
transfer as completed as if we just completed a transfer, and mark our
OutgoingTransfer as complete.
If the downstream does not have the file, we will ask the downstream to cancel
its incoming transfer.
"""

downstream_librarian = (
session.query(Librarian).filter_by(name=transfer.destination).one_or_none()
)

if not downstream_librarian:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Downstream librarian {transfer.destination} not found",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

client = downstream_librarian.client()

expected_file_name = transfer.file.name
expected_file_checksum = transfer.file.checksum

try:
potential_files = client.search_files(
name=expected_file_name,
)
except (LibrarianHTTPError, LibrarianTimeoutError) as e:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY,
message=(
f"Unacceptable error when trying to check if librarian {transfer.destination}"
f"has file {expected_file_name} with exception {e}."
),
session=session,
)
return False

if not potential_files:
# The downstream does not have the file. We should cancel the transfer.
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Downstream librarian {transfer.destination} does "
f"not have file {expected_file_name} and the transfer is stale. "
"Cancelling the transfer.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

available_checksums = {f.checksum for f in potential_files}
available_store_ids = {i.store_id for f in potential_files for i in f.instances}

if len(available_checksums) != 1:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Multiple (or zero, actual {len(available_checksums)}) checksums "
f"found for file {expected_file_name} "
f"on downstream librarian {transfer.destination}.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

available_checksum = available_checksums.pop()
available_store_id = available_store_ids.pop()

if available_checksum != expected_file_checksum:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"Checksum mismatch for file {expected_file_name} "
f"on downstream librarian {transfer.destination}.",
session=session,
)

transfer.fail_transfer(session=session, commit=False)

return False

# If we made it here, we succeeded, we just never heard back!

remote_instance = RemoteInstance.new_instance(
file=transfer.file,
store_id=available_store_id,
librarian=downstream_librarian,
)

session.add(remote_instance)
transfer.status = TransferStatus.COMPLETED

session.commit()

log_to_database(
severity=ErrorSeverity.INFO,
category=ErrorCategory.TRANSFER,
message=(
f"Successfully registered remote instance for {transfer.destination} and "
f"transfer {transfer.id} based upon stale transfer with "
f"status {transfer.status}."
),
session=session,
)

return True


class OutgoingTransferHypervisor(Task):
"""
Checks in on stale outgoing transfers.
"""

age_in_days: int
"The age in days of the outgoing transfer before we consider it stale."

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session):
"""
Checks for stale outgoing transfers and updates their status.
"""

start_time = datetime.datetime.now(datetime.timezone.utc)
end_time = start_time + self.soft_timeout

stale_transfers = get_stale_of_type(session, self.age_in_days, OutgoingTransfer)

for transfer in stale_transfers:
current_time = datetime.datetime.now(datetime.timezone.utc)

if current_time > end_time:
return False

handle_stale_outgoing_transfer(session, transfer)

return True
102 changes: 16 additions & 86 deletions librarian_background/send_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from librarian_server.settings import server_settings

from .hypervisor import handle_stale_outgoing_transfer
from .task import Task

if TYPE_CHECKING:
Expand Down Expand Up @@ -396,7 +397,8 @@ def handle_existing_file(
the checksum. If it has the file, and the checksum matches, we
register a remote instance.
NOTE: This may leave dangling STAGED files.
NOTE: This may leave dangling STAGED files, but those can be cleaned
up later by the hypervisor task
"""

log_to_database(
Expand All @@ -410,99 +412,27 @@ def handle_existing_file(
session=session,
)

client = librarian.client()
transfer: OutgoingTransfer = session.get(OutgoingTransfer, source_transfer_id)

# Extract name and checksum from ongoing transfer.
transfer = session.get(OutgoingTransfer, source_transfer_id)

file_name = transfer.file.name
file_checksum = transfer.file.checksum

try:
potential_files = client.search_files(name=file_name)

if len(potential_files) == 0:
# They don't have the file; this _really_ doesn't make sense.
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.PROGRAMMING,
message=(
f"Librarian {librarian.name} told us that they have file {file_name}, "
"but we can't find it in their database."
),
session=session,
)
return False
else:
# We have the file; we need to check the checksum.
for potential_file in potential_files:
if potential_file.checksum == file_checksum:
# We have a match; we can register the remote instance.
try:
potential_store_id = potential_file.instances[0].store_id

if potential_store_id is None:
raise ValueError
except (IndexError, ValueError):
# So, you're telling me, that you have a file but it doesn't
# have any instances..?
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.PROGRAMMING,
message=(
f"Librarian {librarian.name} told us that they have file {file_name}, "
"but it has no instances."
),
session=session,
)
return False

remote_instance = RemoteInstance.new_instance(
file=transfer.file,
store_id=potential_store_id,
librarian=librarian,
)

session.add(remote_instance)
session.commit()

log_to_database(
severity=ErrorSeverity.INFO,
category=ErrorCategory.TRANSFER,
message=(
f"Successfully registered remote instance for {librarian.name} and "
f"transfer {source_transfer_id}."
),
session=session,
)

return True
else:
# Checksums do not match; this is a problem.
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.TRANSFER,
message=(
f"Librarian {librarian.name} told us that they have file {file_name}, "
"but the checksums do not match."
),
session=session,
)

return False
except Exception as e:
if transfer is None:
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY,
category=ErrorCategory.PROGRAMMING,
message=(
f"Unacceptable error when trying to check if librarian {librarian.name} "
f"has file {file_name} with exception {e}."
f"Transfer {source_transfer_id} does not exist, but we were told "
"by the downstream librarian that it does. There must be another "
"librarian that sent them the file, and the DAG nature of the "
"librarian is being violated."
),
session=session,
)
return False

return True
return False
else:
return handle_stale_outgoing_transfer(
session=session,
transfer=transfer,
)


class SendClone(Task):
Expand Down
20 changes: 20 additions & 0 deletions librarian_background/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pydantic_settings import BaseSettings, SettingsConfigDict

from hera_librarian.deletion import DeletionPolicy
from librarian_background.hypervisor import OutgoingTransferHypervisor

from .check_integrity import CheckIntegrity
from .create_clone import CreateLocalClone
Expand Down Expand Up @@ -175,6 +176,23 @@ def task(self) -> CheckConsumedQueue:
)


class OutgoingTransferHypevrisorSettings(BackgroundTaskSettings):
"""
Settings for the hypervisor task.
"""

age_in_days: int
"The age of the items to check, in days."

@property
def task(self) -> OutgoingTransferHypervisor:
return OutgoingTransferHypervisor(
name=self.task_name,
age_in_days=self.age_in_days,
soft_timeout=self.soft_timeout,
)


class BackgroundSettings(BaseSettings):
"""
Background task settings, configurable.
Expand All @@ -200,6 +218,8 @@ class BackgroundSettings(BaseSettings):
check_consumed_queue: list[CheckConsumedQueueSettings] = []
"Settings for the check consumed queue task."

outgoing_transfer_hypervisor: list[OutgoingTransferHypevrisorSettings] = []

# Global settings:

max_rsync_retries: int = 8
Expand Down

0 comments on commit 5688895

Please sign in to comment.