diff --git a/librarian_background/__init__.py b/librarian_background/__init__.py index 06d68c7..991caca 100644 --- a/librarian_background/__init__.py +++ b/librarian_background/__init__.py @@ -25,6 +25,7 @@ def background(run_once: bool = False): + background_settings.recieve_clone + background_settings.consume_queue + background_settings.check_consumed_queue + + background_settings.outgoing_transfer_hypervisor ) for task in all_tasks: diff --git a/librarian_background/hypervisor.py b/librarian_background/hypervisor.py new file mode 100644 index 0000000..c684fed --- /dev/null +++ b/librarian_background/hypervisor.py @@ -0,0 +1,205 @@ +""" +The hypervisor task that checks on the status of outgoing transfers. + +If they are stale, we call up the downstream librarian to ask for an +update on their status. This can lead to a: + +a) Failure of the outgoing transfer +b) Successful transfer, if the file is found on the downstream +""" + +import datetime + +from sqlalchemy.orm import Session + +from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError +from librarian_server.database import get_session +from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database +from librarian_server.orm import ( + Librarian, + OutgoingTransfer, + RemoteInstance, + TransferStatus, +) + +from .task import Task + + +def get_stale_of_type(session: Session, age_in_days: int, transfer_type: object): + """ + Get the stale transfers of a given type. + """ + + # Get the stale outgoing transfers + stale_since = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( + days=age_in_days + ) + + transfer_stmt = session.query(transfer_type) + + transfer_stmt = transfer_stmt.where(transfer_type.start_time < stale_since) + + transfer_stmt = transfer_stmt.where( + transfer_type.status.in_( + [TransferStatus.INITIATED, TransferStatus.ONGOING, TransferStatus.STAGED] + ) + ) + + return session.execute(transfer_stmt).scalars().all() + + +def handle_stale_outgoing_transfer( + session: Session, transfer: OutgoingTransfer +) -> bool: + """ + In all cases, we ask if the downstream has the file. If it does, we mark our + transfer as completed as if we just completed a transfer, and mark our + OutgoingTransfer as complete. + + If the downstream does not have the file, we will ask the downstream to cancel + its incoming transfer. + """ + + downstream_librarian = ( + session.query(Librarian).filter_by(name=transfer.destination).one_or_none() + ) + + if not downstream_librarian: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"Downstream librarian {transfer.destination} not found", + session=session, + ) + + transfer.fail_transfer(session=session, commit=False) + + return False + + client = downstream_librarian.client() + + expected_file_name = transfer.file.name + expected_file_checksum = transfer.file.checksum + + try: + potential_files = client.search_files( + name=expected_file_name, + ) + except (LibrarianHTTPError, LibrarianTimeoutError) as e: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + message=( + f"Unacceptable error when trying to check if librarian {transfer.destination}" + f"has file {expected_file_name} with exception {e}." + ), + session=session, + ) + return False + + if not potential_files: + # The downstream does not have the file. We should cancel the transfer. + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"Downstream librarian {transfer.destination} does " + f"not have file {expected_file_name} and the transfer is stale. " + "Cancelling the transfer.", + session=session, + ) + + transfer.fail_transfer(session=session, commit=False) + + return False + + available_checksums = {f.checksum for f in potential_files} + available_store_ids = {i.store_id for f in potential_files for i in f.instances} + + if len(available_checksums) != 1: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"Multiple (or zero, actual {len(available_checksums)}) checksums " + f"found for file {expected_file_name} " + f"on downstream librarian {transfer.destination}.", + session=session, + ) + + transfer.fail_transfer(session=session, commit=False) + + return False + + available_checksum = available_checksums.pop() + available_store_id = available_store_ids.pop() + + if available_checksum != expected_file_checksum: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"Checksum mismatch for file {expected_file_name} " + f"on downstream librarian {transfer.destination}.", + session=session, + ) + + transfer.fail_transfer(session=session, commit=False) + + return False + + # If we made it here, we succeeded, we just never heard back! + + remote_instance = RemoteInstance.new_instance( + file=transfer.file, + store_id=available_store_id, + librarian=downstream_librarian, + ) + + session.add(remote_instance) + transfer.status = TransferStatus.COMPLETED + + session.commit() + + log_to_database( + severity=ErrorSeverity.INFO, + category=ErrorCategory.TRANSFER, + message=( + f"Successfully registered remote instance for {transfer.destination} and " + f"transfer {transfer.id} based upon stale transfer with " + f"status {transfer.status}." + ), + session=session, + ) + + return True + + +class OutgoingTransferHypervisor(Task): + """ + Checks in on stale outgoing transfers. + """ + + age_in_days: int + "The age in days of the outgoing transfer before we consider it stale." + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session): + """ + Checks for stale outgoing transfers and updates their status. + """ + + start_time = datetime.datetime.now(datetime.timezone.utc) + end_time = start_time + self.soft_timeout + + stale_transfers = get_stale_of_type(session, self.age_in_days, OutgoingTransfer) + + for transfer in stale_transfers: + current_time = datetime.datetime.now(datetime.timezone.utc) + + if current_time > end_time: + return False + + handle_stale_outgoing_transfer(session, transfer) + + return True diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 27b233b..802d14a 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -35,6 +35,7 @@ ) from librarian_server.settings import server_settings +from .hypervisor import handle_stale_outgoing_transfer from .task import Task if TYPE_CHECKING: @@ -396,7 +397,8 @@ def handle_existing_file( the checksum. If it has the file, and the checksum matches, we register a remote instance. - NOTE: This may leave dangling STAGED files. + NOTE: This may leave dangling STAGED files, but those can be cleaned + up later by the hypervisor task """ log_to_database( @@ -410,99 +412,27 @@ def handle_existing_file( session=session, ) - client = librarian.client() + transfer: OutgoingTransfer = session.get(OutgoingTransfer, source_transfer_id) - # Extract name and checksum from ongoing transfer. - transfer = session.get(OutgoingTransfer, source_transfer_id) - - file_name = transfer.file.name - file_checksum = transfer.file.checksum - - try: - potential_files = client.search_files(name=file_name) - - if len(potential_files) == 0: - # They don't have the file; this _really_ doesn't make sense. - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.PROGRAMMING, - message=( - f"Librarian {librarian.name} told us that they have file {file_name}, " - "but we can't find it in their database." - ), - session=session, - ) - return False - else: - # We have the file; we need to check the checksum. - for potential_file in potential_files: - if potential_file.checksum == file_checksum: - # We have a match; we can register the remote instance. - try: - potential_store_id = potential_file.instances[0].store_id - - if potential_store_id is None: - raise ValueError - except (IndexError, ValueError): - # So, you're telling me, that you have a file but it doesn't - # have any instances..? - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.PROGRAMMING, - message=( - f"Librarian {librarian.name} told us that they have file {file_name}, " - "but it has no instances." - ), - session=session, - ) - return False - - remote_instance = RemoteInstance.new_instance( - file=transfer.file, - store_id=potential_store_id, - librarian=librarian, - ) - - session.add(remote_instance) - session.commit() - - log_to_database( - severity=ErrorSeverity.INFO, - category=ErrorCategory.TRANSFER, - message=( - f"Successfully registered remote instance for {librarian.name} and " - f"transfer {source_transfer_id}." - ), - session=session, - ) - - return True - else: - # Checksums do not match; this is a problem. - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.TRANSFER, - message=( - f"Librarian {librarian.name} told us that they have file {file_name}, " - "but the checksums do not match." - ), - session=session, - ) - - return False - except Exception as e: + if transfer is None: log_to_database( severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, + category=ErrorCategory.PROGRAMMING, message=( - f"Unacceptable error when trying to check if librarian {librarian.name} " - f"has file {file_name} with exception {e}." + f"Transfer {source_transfer_id} does not exist, but we were told " + "by the downstream librarian that it does. There must be another " + "librarian that sent them the file, and the DAG nature of the " + "librarian is being violated." ), session=session, ) - return False - return True + return False + else: + return handle_stale_outgoing_transfer( + session=session, + transfer=transfer, + ) class SendClone(Task): diff --git a/librarian_background/settings.py b/librarian_background/settings.py index bb03e60..1f50870 100644 --- a/librarian_background/settings.py +++ b/librarian_background/settings.py @@ -12,6 +12,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from hera_librarian.deletion import DeletionPolicy +from librarian_background.hypervisor import OutgoingTransferHypervisor from .check_integrity import CheckIntegrity from .create_clone import CreateLocalClone @@ -175,6 +176,23 @@ def task(self) -> CheckConsumedQueue: ) +class OutgoingTransferHypevrisorSettings(BackgroundTaskSettings): + """ + Settings for the hypervisor task. + """ + + age_in_days: int + "The age of the items to check, in days." + + @property + def task(self) -> OutgoingTransferHypervisor: + return OutgoingTransferHypervisor( + name=self.task_name, + age_in_days=self.age_in_days, + soft_timeout=self.soft_timeout, + ) + + class BackgroundSettings(BaseSettings): """ Background task settings, configurable. @@ -200,6 +218,8 @@ class BackgroundSettings(BaseSettings): check_consumed_queue: list[CheckConsumedQueueSettings] = [] "Settings for the check consumed queue task." + outgoing_transfer_hypervisor: list[OutgoingTransferHypevrisorSettings] = [] + # Global settings: max_rsync_retries: int = 8