Skip to content

Commit

Permalink
Added initial deletion code
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow committed Nov 14, 2024
1 parent 5bed8cb commit 81bc391
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 2 deletions.
165 changes: 164 additions & 1 deletion librarian_background/rolling_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
b) The checksums of those files must match the original checksum
"""

import time
from datetime import datetime, timedelta, timezone

from loguru import logger
from schedule import CancelJob
from sqlalchemy import select
from sqlalchemy.orm import Session

from librarian_server.api.validate import calculate_checksum_of_remote_copies
from librarian_server.database import get_session
from librarian_server.orm import Instance, Librarian, StoreMetadata

from .task import Task


Expand All @@ -15,4 +27,155 @@ class RollingDeletion(Task):
a certain age.
"""

pass
store_name: str
"Name of the store to delete instances from"
age_in_days: int
"Age of the instances to delete, in days; older files will be deleted if they pass the checks"

number_of_remote_copies: int = 3
"Number of remote copies that must be available to delete the file"
verify_downstream_checksums: bool = True
"Whether to verify the checksums of the remote copies"
mark_unavailable: bool = True
"Whether to mark the instances as unavailable after deletion, or to delete them (False)"
force_deletion: bool = True
"Whether to over-ride the deletion policy of instances"

def get_store(self, name: str, session: Session) -> StoreMetadata:
possible_metadata = session.query(StoreMetadata).filter_by(name=name).first()

if not possible_metadata:
raise ValueError(f"Store {name} does not exist.")

return possible_metadata

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session: Session):
core_begin = datetime.now(timezone.utc)
age_cutoff = core_begin - timedelta(days=self.age_in_days)

try:
store = self.get_store(self.store_name, session)
except ValueError as e:
logger.error("Error getting store: {}, cannot continue; cancelling job", e)
return CancelJob

logger.info(
"Beginning rolling deletion for store {} (ID: {})", store.name, store.id
)

# Get the instances that are older than the age

query_begin = time.perf_counter()
stmt = select(Instance).filter(
Instance.store_id == store.id,
Instance.created_time < age_cutoff,
Instance.available == True,
)

instances = session.execute(stmt).scalars().all()
query_end = time.perf_counter()

logger.info("Queried for old instances in {} seconds", query_end - query_begin)

logger.info(
"Found {} instances that are older than {} days",
len(instances),
self.age_in_days,
)

deleted = 0
for instance in instances:
# Check that we got what we wanted.
try:
assert instance.created_at < age_cutoff
assert instance.store_id == store.id
assert instance.available
except AssertionError:
logger.error(
"Instance {} does not meet the criteria, skipping", instance.id
)
continue

# Check if the file associated with the instance has enough copies.
remote_librarian_ids = {
remote_instance.librarian_id
for remote_instance in instance.remote_instances
}

logger.info(
"Calling up {} remote librarians to check for copies",
len(remote_librarian_ids),
)

downstream = []

for librarian_id in remote_librarian_ids:
stmt = select(Librarian).filter(Librarian.id == librarian_id)
librarian = session.execute(stmt).scalar()

if not librarian:
continue

downstream += calculate_checksum_of_remote_copies(
librarian=librarian, file_name=instance.file_name
)

# Now check if we have enough!
if len(downstream) < self.number_of_remote_copies:
logger.warning(
"Instance {} does not have enough remote copies, skipping",
instance.id,
)
continue

# Now check if the checksums match
if self.verify_downstream_checksums:
for info in downstream:
if not info.computed_same_checksum:
logger.warning(
"Instance {} has a mismatched checksum on {}, skipping",
instance.id,
info.librarian,
)
continue

# If we're here, we can delete the instance.
logger.info(
"Verified that we have the correct number of copies of "
"{instance.id} ({instance.file_name}): {n}/{m}, proceeding to deletion",
instance=instance,
n=len(downstream),
m=self.number_of_remote_copies,
)

try:
instance.delete(
session=session,
commit=True,
force=self.force_deletion,
mark_unavailable=self.mark_unavailable,
)
logger.info("Deleted instance {} successfully", instance.id)
deleted += 1
except FileNotFoundError:
logger.error(
"Instance {} does not exist on disk, skipping", instance.id
)
continue

core_end = datetime.now(timezone.utc)

logger.info(
"Finished rolling deletion for store {} (ID: {}) in {} seconds, deleted {}/{} instances",
store.name,
store.id,
(core_end - core_begin).total_seconds(),
deleted,
len(instances),
)

return deleted == len(instances)
8 changes: 7 additions & 1 deletion librarian_server/orm/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def delete(
session: Session,
commit: bool = True,
force: bool = False,
mark_unavailable: bool = False,
):
"""
Delete this instance.
Expand All @@ -103,12 +104,17 @@ def delete(
Whether or not to commit the deletion.
force : bool
Whether or not to force the deletion (i.e. ignore DeletionPolicy)
mark_unavailable: bool
If true, only mark this as unavailable, don't delete the metadata
"""

if self.deletion_policy == DeletionPolicy.ALLOWED or force:
self.store.store_manager.delete(Path(self.path))

session.delete(self)
if mark_unavailable:
self.available = False
else:
session.delete(self)

if commit:
session.commit()
Expand Down

0 comments on commit 81bc391

Please sign in to comment.