diff --git a/librarian_background/__init__.py b/librarian_background/__init__.py index 0df4d71..13ad94e 100644 --- a/librarian_background/__init__.py +++ b/librarian_background/__init__.py @@ -27,6 +27,8 @@ def background(run_once: bool = False): + background_settings.check_consumed_queue + background_settings.outgoing_transfer_hypervisor + background_settings.incoming_transfer_hypervisor + + background_settings.duplicate_remote_instance_hypervisor + + background_settings.rolling_deletion ) for task in all_tasks: diff --git a/librarian_background/hypervisor.py b/librarian_background/hypervisor.py index 56c4a23..648d4bf 100644 --- a/librarian_background/hypervisor.py +++ b/librarian_background/hypervisor.py @@ -10,8 +10,10 @@ import datetime import time +from time import perf_counter from loguru import logger +from sqlalchemy import func, select from sqlalchemy.orm import Session from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError @@ -411,3 +413,109 @@ def core(self, session): handle_stale_incoming_transfer(session, transfer) return True + + +class DuplicateRemoteInstanceHypervisor(Task): + """ + Checks for duplicate remote instances in the table and handles them. + """ + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session): + query_start = perf_counter() + + stmt = ( + select( + RemoteInstance.file_name, + RemoteInstance.store_id, + RemoteInstance.librarian_id, + ) + .group_by( + RemoteInstance.file_name, + RemoteInstance.store_id, + RemoteInstance.librarian_id, + ) + .having(func.count() > 1) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {}s to query for {} duplicate remote instance files", + query_end - query_start, + len(results), + ) + + deleted = 0 + + for file_name in results: + stmt = ( + select(RemoteInstance) + .filter_by(file_name=file_name) + .order_by(RemoteInstance.copy_time) + ) + + potential_duplicates = session.execute(stmt).scalars().all() + + if len(potential_duplicates) < 2: + # Uhh, but there aren't any? + logger.warning( + "Initial query suggested that remote instance table contains " + "duplicates for {fn} but we only found {n} results when re-querying", + fn=file_name, + n=len(potential_duplicates), + ) + + continue + + for_removal = [] + # Some may be duplicates, some might not! + for i, potential_original in enumerate(potential_duplicates[:-1]): + if potential_original in for_removal: + continue + for potential_duplicate in potential_duplicates[i + 1 :]: + if potential_duplicate in for_removal: + continue + + is_duplicate = ( + potential_original.file_name == potential_duplicate.file_name + and potential_original.librarian_id + == potential_duplicate.librarian_id + and potential_original.store_id == potential_duplicate.store_id + ) + + if is_duplicate: + for_removal.append(potential_duplicate) + + logger.info( + "Found {n_dupe} duplicates out of a total {n_pot} remote instances;" + "deleting the duplicates", + n_dupe=len(for_removal), + n_pot=len(potential_duplicates), + ) + + if len(for_removal) >= len(potential_duplicates): + # This is definitely bad... + logger.warning( + "Found more duplicates than original rows; skipping (see logs)" + ) + continue + + for duplicate in for_removal: + deleted += 1 + session.delete(duplicate) + + session.commit() + + logger.info( + "Successfully completed the duplicate hypervisor task and removed " + "a total of {n} duplicate remote instances", + n=deleted, + ) + + return True diff --git a/librarian_background/settings.py b/librarian_background/settings.py index 4ec0bb9..3ee9e10 100644 --- a/librarian_background/settings.py +++ b/librarian_background/settings.py @@ -13,9 +13,11 @@ from hera_librarian.deletion import DeletionPolicy from librarian_background.hypervisor import ( + DuplicateRemoteInstanceHypervisor, IncomingTransferHypervisor, OutgoingTransferHypervisor, ) +from librarian_background.rolling_deletion import RollingDeletion from .check_integrity import CheckIntegrity from .create_clone import CreateLocalClone @@ -213,6 +215,46 @@ def task(self) -> IncomingTransferHypervisor: ) +class DuplicateRemoteInstanceHypervisorSettings(BackgroundTaskSettings): + """ + Settings for the duplicate instance hypervisor task. + """ + + @property + def task(self) -> DuplicateRemoteInstanceHypervisor: + return DuplicateRemoteInstanceHypervisor( + name=self.task_name, + soft_timeout=self.soft_timeout, + ) + + +class RollingDeletionSettings(BackgroundTaskSettings): + """ + Settings for the rolling deletion task + """ + + store_name: str + age_in_days: float + + number_of_remote_copies: int = 3 + verify_downstream_checksums: bool = True + mark_unavailable: bool = True + force_deletion: bool = True + + @property + def task(self) -> RollingDeletion: + return RollingDeletion( + name=self.task_name, + soft_timeout=self.soft_timeout, + store_name=self.store_name, + age_in_days=self.age_in_days, + number_of_remote_copies=self.number_of_remote_copies, + verify_downstream_checksums=self.verify_downstream_checksums, + mark_unavailable=self.mark_unavailable, + force_deletion=self.force_deletion, + ) + + class BackgroundSettings(BaseSettings): """ Background task settings, configurable. @@ -240,6 +282,11 @@ class BackgroundSettings(BaseSettings): outgoing_transfer_hypervisor: list[OutgoingTransferHypervisorSettings] = [] incoming_transfer_hypervisor: list[IncomingTransferHypervisorSettings] = [] + duplicate_remote_instance_hypervisor: list[ + DuplicateRemoteInstanceHypervisorSettings + ] = [] + + rolling_deletion: list[RollingDeletionSettings] = [] # Global settings: diff --git a/tests/integration_test/test_hypervisors.py b/tests/integration_test/test_hypervisors.py index 485c184..c104eac 100644 --- a/tests/integration_test/test_hypervisors.py +++ b/tests/integration_test/test_hypervisors.py @@ -176,3 +176,66 @@ def delete_transfers(source_transfer: int, destination_transfer: int): assert mocked_admin_client.remove_librarian(name="live_server") assert admin_client.remove_librarian(name="test_server") + + +def test_remote_instance_duplicate( + test_server_with_many_files_and_errors, + test_orm, + mocked_admin_client, + server, +): + from librarian_background.hypervisor import DuplicateRemoteInstanceHypervisor + + assert mocked_admin_client.add_librarian( + name="live_server", + url="http://localhost", + authenticator="admin:password", # This is the default authenticator. + port=server.id, + ) + + used_file_names = [] + ids_to_keep = [] + ids_to_delete = [] + + # Get a bunch of files + with test_server_with_many_files_and_errors[1]() as session: + librarian = ( + session.query(test_orm.Librarian).filter_by(name="live_server").one() + ) + + files = session.query(test_orm.File).limit(10).all() + + # Create two remote instances for each + for file in files: + used_file_names.append(file.name) + + ri_a = test_orm.RemoteInstance.new_instance( + file=file, store_id=2, librarian=librarian + ) + ri_b = test_orm.RemoteInstance.new_instance( + file=file, store_id=2, librarian=librarian + ) + + session.add_all((ri_a, ri_b)) + session.commit() + + ids_to_keep.append(ri_a.id) + ids_to_delete.append(ri_b.id) + + # Now can run the hypervisor + with test_server_with_many_files_and_errors[1]() as session: + DuplicateRemoteInstanceHypervisor(name="").core(session) + + with test_server_with_many_files_and_errors[1]() as session: + for file_name in used_file_names: + file = session.query(test_orm.File).filter_by(name=file_name).one() + + for ri in file.remote_instances: + assert ri.id in ids_to_keep + assert not ri.id in ids_to_delete + + session.delete(ri) + + session.commit() + + assert mocked_admin_client.remove_librarian(name="live_server")