diff --git a/librarian_background/__init__.py b/librarian_background/__init__.py index 991caca..0df4d71 100644 --- a/librarian_background/__init__.py +++ b/librarian_background/__init__.py @@ -26,6 +26,7 @@ def background(run_once: bool = False): + background_settings.consume_queue + background_settings.check_consumed_queue + background_settings.outgoing_transfer_hypervisor + + background_settings.incoming_transfer_hypervisor ) for task in all_tasks: diff --git a/librarian_background/hypervisor.py b/librarian_background/hypervisor.py index d75357f..2b153f7 100644 --- a/librarian_background/hypervisor.py +++ b/librarian_background/hypervisor.py @@ -13,6 +13,7 @@ from sqlalchemy.orm import Session from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError +from hera_librarian.models.checkin import CheckinStatusRequest, CheckinStatusResponse from hera_librarian.utils import compare_checksums from librarian_server.database import get_session from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database @@ -22,6 +23,7 @@ RemoteInstance, TransferStatus, ) +from librarian_server.orm.transfer import IncomingTransfer from .task import Task @@ -174,6 +176,160 @@ def handle_stale_outgoing_transfer( return True +def handle_stale_incoming_transfer( + session: Session, + transfer: IncomingTransfer, +) -> bool: + + upstream_librarian = ( + session.query(Librarian).filter_by(name=transfer.source).one_or_none() + ) + + if not upstream_librarian: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.DATA_INTEGRITY, + message=f"Upstream librarian {transfer.source} not found", + session=session, + ) + + transfer.fail_transfer(session=session, commit=True) + return False + + # We have an upstream librarian. We can ask it about the status of its + # own OutgoingTransfer. + client = upstream_librarian.client() + + status_request = CheckinStatusRequest( + source_transfer_ids=[transfer.source_transfer_id], + destination_transfer_ids=[], + ) + + try: + response: CheckinStatusResponse = client.post( + "checkin/status", request=status_request, response=CheckinStatusResponse + ) + + source_status = response.source_transfer_status[transfer.source_transfer_id] + except Exception as e: + log_to_database( + severity=ErrorSeverity.ERROR, + category=ErrorCategory.TRANSFER, + message=( + f"Unsuccessfully tried to contact {transfer.source} for information on " + f"transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}). " + f"Exception: {e}. We are failing {transfer.id}" + ), + session=session, + ) + + # This implies that the transfer doesn't exist on the remote. + transfer.fail_transfer(session=session, commit=True) + return False + + # Now we need to do some state matching. + # If remote transfer is 'dead', we can just cancel. + # If remote transfer is 'alive' and in same state as ours, we can + # just leave it... For now. It is upstream's job to cancel these kind + # of stale transfers. + # If remote transfer is 'alive' and in a different state to ours, there + # are two possibilities: + # a) Remote is more advanced than us; we should update ourselves to be + # aligned with this to try to progress with the transfer. + # b) Remote is less advanced than us; we should cancel the transfer - this + # is an explicitly bad state as we are a push-based system. + + if source_status in [TransferStatus.COMPLETED]: + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.PROGRAMMING, + message=( + f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " + f"COMPLETED status on remote but {transfer.status} on local. This is " + f"an unreachable state for file {transfer.upload_name}. Requires manual check" + ), + session=session, + ) + + transfer.fail_transfer(session=session, commit=True) + return False + + if source_status in [TransferStatus.CANCELLED, TransferStatus.FAILED]: + # This one's a gimmie. + transfer.fail_transfer(session=session, commit=True) + return False + + if source_status == transfer.status: + # This is the remote's responsibility. + return True + + # We only get here in annoying scenarios. + if transfer.status == TransferStatus.INITIATED: + # Remote more advanced. + log_to_database( + severity=ErrorSeverity.INFO, + category=ErrorCategory.TRANSFER, + message=( + f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " + f"more advanced state on remote ({source_status} > {transfer.status}). Catching" + f"up our transfer." + ), + session=session, + ) + + transfer.status = source_status + session.commit() + return True + + if transfer.status == TransferStatus.STAGED: + # Uh, this should be picked up by a different task (recv_clone) + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.CONFIGURATION, + message=( + f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " + "status STAGED and is being picked up by the hypervisor task. This should not " + "occur; recommend manual check" + ), + session=session, + ) + return False + + if transfer.status == TransferStatus.ONGOING: + if source_status == TransferStatus.INITIATED: + transfer.fail_transfer(session=session, commit=True) + return False + else: + assert source_status == TransferStatus.STAGED + # Remote more advanced (STAGED) + log_to_database( + severity=ErrorSeverity.INFO, + category=ErrorCategory.TRANSFER, + message=( + f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " + f"more advanced state on remote ({source_status} > {transfer.status}). Catching" + f"up our transfer." + ), + session=session, + ) + + transfer.status = source_status + session.commit() + return True + + log_to_database( + severity=ErrorSeverity.CRITICAL, + category=ErrorCategory.PROGRAMMING, + message=( + f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " + "fallen through the hypervisor. Recommend manual check" + ), + session=session, + ) + + return True + + class OutgoingTransferHypervisor(Task): """ Checks in on stale outgoing transfers. @@ -205,3 +361,36 @@ def core(self, session): handle_stale_outgoing_transfer(session, transfer) return True + + +class IncomingTransferHypervisor(Task): + """ + Checks on stale incoming transfers. + """ + + age_in_days: int + "The age in days of the incoming transfer before we consider it stale." + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session): + """ + Checks for stale incoming transfers and updates their status. + """ + + start_time = datetime.datetime.now(datetime.timezone.utc) + end_time = start_time + self.soft_timeout + + stale_transfers = get_stale_of_type(session, self.age_in_days, IncomingTransfer) + + for transfer in stale_transfers: + current_time = datetime.datetime.now(datetime.timezone.utc) + + if current_time > end_time: + return False + + handle_stale_incoming_transfer(session, transfer) + + return True diff --git a/librarian_background/settings.py b/librarian_background/settings.py index 1f50870..4ec0bb9 100644 --- a/librarian_background/settings.py +++ b/librarian_background/settings.py @@ -12,7 +12,10 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from hera_librarian.deletion import DeletionPolicy -from librarian_background.hypervisor import OutgoingTransferHypervisor +from librarian_background.hypervisor import ( + IncomingTransferHypervisor, + OutgoingTransferHypervisor, +) from .check_integrity import CheckIntegrity from .create_clone import CreateLocalClone @@ -176,7 +179,7 @@ def task(self) -> CheckConsumedQueue: ) -class OutgoingTransferHypevrisorSettings(BackgroundTaskSettings): +class OutgoingTransferHypervisorSettings(BackgroundTaskSettings): """ Settings for the hypervisor task. """ @@ -193,6 +196,23 @@ def task(self) -> OutgoingTransferHypervisor: ) +class IncomingTransferHypervisorSettings(BackgroundTaskSettings): + """ + Settings for the hypervisor task. + """ + + age_in_days: int + "The age of the items to check, in days." + + @property + def task(self) -> IncomingTransferHypervisor: + return IncomingTransferHypervisor( + name=self.task_name, + age_in_days=self.age_in_days, + soft_timeout=self.soft_timeout, + ) + + class BackgroundSettings(BaseSettings): """ Background task settings, configurable. @@ -218,7 +238,8 @@ class BackgroundSettings(BaseSettings): check_consumed_queue: list[CheckConsumedQueueSettings] = [] "Settings for the check consumed queue task." - outgoing_transfer_hypervisor: list[OutgoingTransferHypevrisorSettings] = [] + outgoing_transfer_hypervisor: list[OutgoingTransferHypervisorSettings] = [] + incoming_transfer_hypervisor: list[IncomingTransferHypervisorSettings] = [] # Global settings: diff --git a/librarian_server/api/checkin.py b/librarian_server/api/checkin.py index 452b8dc..b7325fe 100644 --- a/librarian_server/api/checkin.py +++ b/librarian_server/api/checkin.py @@ -83,6 +83,12 @@ def modify_transfers_by_id( or (transfer.uploader == user.username) ) + # OutgoingTransfers + try: + authorized = authorized or (transfer.destination == user.username) + except AttributeError: + pass + if not authorized: unprocessed.append(transfer_id) reasons.add("You are not authorized to modify the transfer.") diff --git a/librarian_server/orm/transfer.py b/librarian_server/orm/transfer.py index eb01a35..a46cabe 100644 --- a/librarian_server/orm/transfer.py +++ b/librarian_server/orm/transfer.py @@ -3,8 +3,10 @@ """ import datetime +from pathlib import Path from typing import TYPE_CHECKING +from hera_librarian.models.checkin import CheckinUpdateRequest, CheckinUpdateResponse from hera_librarian.models.clone import ( CloneFailRequest, CloneFailResponse, @@ -104,6 +106,68 @@ def new_transfer( start_time=datetime.datetime.utcnow(), ) + def fail_transfer(self, session: "Session", commit: bool = True): + """ + Fail the transfer and (optionally) commit to the database. + """ + + self.status = TransferStatus.FAILED + self.end_time = datetime.datetime.now(datetime.timezone.utc) + + try: + self.store.store_manager.unstage(Path(self.staging_path)) + except (FileNotFoundError, OSError, AttributeError): + pass + + if commit: + session.commit() + + if self.source_transfer_id is None: + # No remote transfer ID, so we can't do anything. + return + + # Now here's the interesting part - we need to communicate to the + # remote librarian that the transfer failed! + librarian: Librarian = ( + session.query(Librarian).filter_by(name=self.source).first() + ) + + if not librarian: + # Librarian doesn't exist. We can't do anything. + log.error( + "Remote librarian does not exist when trying to fail transfer. " + "This state should be entirely unreachable." + ) + return + + client = librarian.client() + + request = CheckinUpdateRequest( + source_transfer_ids=[self.source_transfer_id], + destination_transfer_ids=[], + new_status=TransferStatus.FAILED, + ) + + try: + response: CheckinUpdateResponse = client.post( + "checkin/update", + request=request, + response=CheckinUpdateResponse, + ) + + if not response.success: + raise Exception( + "Remote librarian refused or failed to set transfer status to FAILED." + ) + except Exception as e: + log.error( + f"Failed to communicate to remote librarian that transfer {self.id} " + f"failed with exception {e}. It is likely that there is a stale transfer " + f"on remote librarian {self.source} with id {self.source_transfer_id}." + ) + + return + class OutgoingTransfer(db.Base): """ @@ -224,7 +288,7 @@ def fail_transfer(self, session: "Session", commit: bool = True): try: response: CloneFailResponse = client.post( - path="/api/v2/clone/fail", + "clone/fail", request=request, response=CloneFailResponse, ) @@ -275,7 +339,7 @@ def staged_transfer(self, session: "Session", commit: bool = True): try: response: CloneStagedResponse = client.post( - path="/api/v2/clone/staged", + "clone/staged", request=request, response=CloneStagedResponse, ) diff --git a/tests/conftest.py b/tests/conftest.py index a182af3..a79ce0a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -143,7 +143,7 @@ def mocked_admin_client(test_client): # Now need to replace post - def new_post(endpoint: str, request, response): + def new_post(endpoint: str, request, response=None): endpoint = f"/api/v2/{endpoint}" r = test_client.post_with_auth(endpoint, content=request.model_dump_json()) diff --git a/tests/integration_test/test_hypervisors.py b/tests/integration_test/test_hypervisors.py new file mode 100644 index 0000000..485c184 --- /dev/null +++ b/tests/integration_test/test_hypervisors.py @@ -0,0 +1,178 @@ +""" +Tests for the transfer hypervisors. +""" + +from pathlib import Path + +from hera_librarian.transfer import TransferStatus + + +def test_handle_stale_incoming_transfer( + test_server_with_many_files_and_errors, + test_orm, + mocked_admin_client, + server, + admin_client, + librarian_database_session_maker, + garbage_file, +): + from librarian_background.hypervisor import handle_stale_incoming_transfer + + assert mocked_admin_client.add_librarian( + name="live_server", + url="http://localhost", + authenticator="admin:password", # This is the default authenticator. + port=server.id, + ) + + admin_client.upload(garbage_file, Path("name/of/file/to/use/for/transfers")) + + assert admin_client.add_librarian( + name="test_server", + url="http://localhost", + authenticator="admin:password", # This is the default authenticator. + port=test_server_with_many_files_and_errors[2].id, + check_connection=False, + ) + + source_session_maker = test_server_with_many_files_and_errors[1] + + def make_source_and_destination( + source_status: TransferStatus, destination_status: TransferStatus + ) -> tuple[int]: + with librarian_database_session_maker() as session: + file = session.get(test_orm.File, "name/of/file/to/use/for/transfers") + instance = file.instances[0] + + source = test_orm.OutgoingTransfer.new_transfer( + destination="test_server", + instance=instance, + file=file, + ) + + source.status = source_status + + session.add(source) + session.commit() + + source_id = source.id + + with source_session_maker() as session: + destination = test_orm.IncomingTransfer.new_transfer( + uploader="live_server", + upload_name="name/of/file/to/use/for/transfers", + source="live_server", + transfer_size=1024, + transfer_checksum="Never_Mind", + ) + + destination.status = destination_status + destination.source_transfer_id = source_id + + session.add(destination) + session.commit() + + destination_id = destination.id + + return source_id, destination_id + + def delete_transfers(source_transfer: int, destination_transfer: int): + with librarian_database_session_maker() as session: + transfer = session.get(test_orm.OutgoingTransfer, source_transfer) + session.delete(transfer) + session.commit() + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination_transfer) + session.delete(transfer) + session.commit() + + # Let's cover all the cases. + # a) Remote transfer is cancelled. + # b) Remote transfer has the same status as us + # c) Remote transfer is STAGED when we are INITIATED + # d) Remote transfer is INITIATED when are we ONGOING + # e) Remote transfer is STAGED when we are ONGOING + + # --- a --- + source, destination = make_source_and_destination( + TransferStatus.CANCELLED, TransferStatus.INITIATED + ) + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination) + handle_stale_incoming_transfer(session, transfer=transfer) + assert ( + session.get(test_orm.IncomingTransfer, destination).status + == TransferStatus.FAILED + ) + + delete_transfers(source, destination) + + # --- b --- + source, destination = make_source_and_destination( + TransferStatus.INITIATED, TransferStatus.INITIATED + ) + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination) + assert handle_stale_incoming_transfer(session, transfer=transfer) + assert ( + session.get(test_orm.IncomingTransfer, destination).status + == TransferStatus.INITIATED + ) + + delete_transfers(source, destination) + + # --- c --- + source, destination = make_source_and_destination( + TransferStatus.STAGED, TransferStatus.INITIATED + ) + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination) + handle_stale_incoming_transfer(session, transfer=transfer) + assert ( + session.get(test_orm.IncomingTransfer, destination).status + == TransferStatus.STAGED + ) + + delete_transfers(source, destination) + + # --- d --- + source, destination = make_source_and_destination( + TransferStatus.INITIATED, TransferStatus.ONGOING + ) + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination) + handle_stale_incoming_transfer(session, transfer=transfer) + assert ( + session.get(test_orm.IncomingTransfer, destination).status + == TransferStatus.FAILED + ) + + delete_transfers(source, destination) + + # --- e --- + source, destination = make_source_and_destination( + TransferStatus.STAGED, TransferStatus.ONGOING + ) + + with source_session_maker() as session: + transfer = session.get(test_orm.IncomingTransfer, destination) + handle_stale_incoming_transfer(session, transfer=transfer) + assert ( + session.get(test_orm.IncomingTransfer, destination).status + == TransferStatus.STAGED + ) + + delete_transfers(source, destination) + + # Remove the librarians we added. + with librarian_database_session_maker() as session: + file = session.get(test_orm.File, "name/of/file/to/use/for/transfers") + file.delete(session=session, commit=True, force=True) + + assert mocked_admin_client.remove_librarian(name="live_server") + assert admin_client.remove_librarian(name="test_server")