From 81bc3915aaf1916b06084f4470098cf1f1ffed4d Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 14 Nov 2024 13:53:24 -0500 Subject: [PATCH] Added initial deletion code --- librarian_background/rolling_deletion.py | 165 ++++++++++++++++++++++- librarian_server/orm/instance.py | 8 +- 2 files changed, 171 insertions(+), 2 deletions(-) diff --git a/librarian_background/rolling_deletion.py b/librarian_background/rolling_deletion.py index bd8a964..dbf4656 100644 --- a/librarian_background/rolling_deletion.py +++ b/librarian_background/rolling_deletion.py @@ -6,6 +6,18 @@ b) The checksums of those files must match the original checksum """ +import time +from datetime import datetime, timedelta, timezone + +from loguru import logger +from schedule import CancelJob +from sqlalchemy import select +from sqlalchemy.orm import Session + +from librarian_server.api.validate import calculate_checksum_of_remote_copies +from librarian_server.database import get_session +from librarian_server.orm import Instance, Librarian, StoreMetadata + from .task import Task @@ -15,4 +27,155 @@ class RollingDeletion(Task): a certain age. """ - pass + store_name: str + "Name of the store to delete instances from" + age_in_days: int + "Age of the instances to delete, in days; older files will be deleted if they pass the checks" + + number_of_remote_copies: int = 3 + "Number of remote copies that must be available to delete the file" + verify_downstream_checksums: bool = True + "Whether to verify the checksums of the remote copies" + mark_unavailable: bool = True + "Whether to mark the instances as unavailable after deletion, or to delete them (False)" + force_deletion: bool = True + "Whether to over-ride the deletion policy of instances" + + def get_store(self, name: str, session: Session) -> StoreMetadata: + possible_metadata = session.query(StoreMetadata).filter_by(name=name).first() + + if not possible_metadata: + raise ValueError(f"Store {name} does not exist.") + + return possible_metadata + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session: Session): + core_begin = datetime.now(timezone.utc) + age_cutoff = core_begin - timedelta(days=self.age_in_days) + + try: + store = self.get_store(self.store_name, session) + except ValueError as e: + logger.error("Error getting store: {}, cannot continue; cancelling job", e) + return CancelJob + + logger.info( + "Beginning rolling deletion for store {} (ID: {})", store.name, store.id + ) + + # Get the instances that are older than the age + + query_begin = time.perf_counter() + stmt = select(Instance).filter( + Instance.store_id == store.id, + Instance.created_time < age_cutoff, + Instance.available == True, + ) + + instances = session.execute(stmt).scalars().all() + query_end = time.perf_counter() + + logger.info("Queried for old instances in {} seconds", query_end - query_begin) + + logger.info( + "Found {} instances that are older than {} days", + len(instances), + self.age_in_days, + ) + + deleted = 0 + for instance in instances: + # Check that we got what we wanted. + try: + assert instance.created_at < age_cutoff + assert instance.store_id == store.id + assert instance.available + except AssertionError: + logger.error( + "Instance {} does not meet the criteria, skipping", instance.id + ) + continue + + # Check if the file associated with the instance has enough copies. + remote_librarian_ids = { + remote_instance.librarian_id + for remote_instance in instance.remote_instances + } + + logger.info( + "Calling up {} remote librarians to check for copies", + len(remote_librarian_ids), + ) + + downstream = [] + + for librarian_id in remote_librarian_ids: + stmt = select(Librarian).filter(Librarian.id == librarian_id) + librarian = session.execute(stmt).scalar() + + if not librarian: + continue + + downstream += calculate_checksum_of_remote_copies( + librarian=librarian, file_name=instance.file_name + ) + + # Now check if we have enough! + if len(downstream) < self.number_of_remote_copies: + logger.warning( + "Instance {} does not have enough remote copies, skipping", + instance.id, + ) + continue + + # Now check if the checksums match + if self.verify_downstream_checksums: + for info in downstream: + if not info.computed_same_checksum: + logger.warning( + "Instance {} has a mismatched checksum on {}, skipping", + instance.id, + info.librarian, + ) + continue + + # If we're here, we can delete the instance. + logger.info( + "Verified that we have the correct number of copies of " + "{instance.id} ({instance.file_name}): {n}/{m}, proceeding to deletion", + instance=instance, + n=len(downstream), + m=self.number_of_remote_copies, + ) + + try: + instance.delete( + session=session, + commit=True, + force=self.force_deletion, + mark_unavailable=self.mark_unavailable, + ) + logger.info("Deleted instance {} successfully", instance.id) + deleted += 1 + except FileNotFoundError: + logger.error( + "Instance {} does not exist on disk, skipping", instance.id + ) + continue + + core_end = datetime.now(timezone.utc) + + logger.info( + "Finished rolling deletion for store {} (ID: {}) in {} seconds, deleted {}/{} instances", + store.name, + store.id, + (core_end - core_begin).total_seconds(), + deleted, + len(instances), + ) + + return deleted == len(instances) diff --git a/librarian_server/orm/instance.py b/librarian_server/orm/instance.py index 918d8e8..655e676 100644 --- a/librarian_server/orm/instance.py +++ b/librarian_server/orm/instance.py @@ -91,6 +91,7 @@ def delete( session: Session, commit: bool = True, force: bool = False, + mark_unavailable: bool = False, ): """ Delete this instance. @@ -103,12 +104,17 @@ def delete( Whether or not to commit the deletion. force : bool Whether or not to force the deletion (i.e. ignore DeletionPolicy) + mark_unavailable: bool + If true, only mark this as unavailable, don't delete the metadata """ if self.deletion_policy == DeletionPolicy.ALLOWED or force: self.store.store_manager.delete(Path(self.path)) - session.delete(self) + if mark_unavailable: + self.available = False + else: + session.delete(self) if commit: session.commit()