Skip to content

Commit

Permalink
Add background task to de-duplicate remote instances table (#118)
Browse files Browse the repository at this point in the history
* Add hypervisor task for de-duplication

* Formatting

* Added settings models for bg tasks
  • Loading branch information
JBorrow authored Dec 6, 2024
1 parent 4e97175 commit 523e836
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
2 changes: 2 additions & 0 deletions librarian_background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def background(run_once: bool = False):
+ background_settings.check_consumed_queue
+ background_settings.outgoing_transfer_hypervisor
+ background_settings.incoming_transfer_hypervisor
+ background_settings.duplicate_remote_instance_hypervisor
+ background_settings.rolling_deletion
)

for task in all_tasks:
Expand Down
108 changes: 108 additions & 0 deletions librarian_background/hypervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import datetime
import time
from time import perf_counter

from loguru import logger
from sqlalchemy import func, select
from sqlalchemy.orm import Session

from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError
Expand Down Expand Up @@ -411,3 +413,109 @@ def core(self, session):
handle_stale_incoming_transfer(session, transfer)

return True


class DuplicateRemoteInstanceHypervisor(Task):
"""
Checks for duplicate remote instances in the table and handles them.
"""

def on_call(self):
with get_session() as session:
return self.core(session=session)

def core(self, session):
query_start = perf_counter()

stmt = (
select(
RemoteInstance.file_name,
RemoteInstance.store_id,
RemoteInstance.librarian_id,
)
.group_by(
RemoteInstance.file_name,
RemoteInstance.store_id,
RemoteInstance.librarian_id,
)
.having(func.count() > 1)
)

results = session.execute(stmt).scalars().all()

query_end = perf_counter()

logger.info(
"Took {}s to query for {} duplicate remote instance files",
query_end - query_start,
len(results),
)

deleted = 0

for file_name in results:
stmt = (
select(RemoteInstance)
.filter_by(file_name=file_name)
.order_by(RemoteInstance.copy_time)
)

potential_duplicates = session.execute(stmt).scalars().all()

if len(potential_duplicates) < 2:
# Uhh, but there aren't any?
logger.warning(
"Initial query suggested that remote instance table contains "
"duplicates for {fn} but we only found {n} results when re-querying",
fn=file_name,
n=len(potential_duplicates),
)

continue

for_removal = []
# Some may be duplicates, some might not!
for i, potential_original in enumerate(potential_duplicates[:-1]):
if potential_original in for_removal:
continue
for potential_duplicate in potential_duplicates[i + 1 :]:
if potential_duplicate in for_removal:
continue

is_duplicate = (
potential_original.file_name == potential_duplicate.file_name
and potential_original.librarian_id
== potential_duplicate.librarian_id
and potential_original.store_id == potential_duplicate.store_id
)

if is_duplicate:
for_removal.append(potential_duplicate)

logger.info(
"Found {n_dupe} duplicates out of a total {n_pot} remote instances;"
"deleting the duplicates",
n_dupe=len(for_removal),
n_pot=len(potential_duplicates),
)

if len(for_removal) >= len(potential_duplicates):
# This is definitely bad...
logger.warning(
"Found more duplicates than original rows; skipping (see logs)"
)
continue

for duplicate in for_removal:
deleted += 1
session.delete(duplicate)

session.commit()

logger.info(
"Successfully completed the duplicate hypervisor task and removed "
"a total of {n} duplicate remote instances",
n=deleted,
)

return True
47 changes: 47 additions & 0 deletions librarian_background/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@

from hera_librarian.deletion import DeletionPolicy
from librarian_background.hypervisor import (
DuplicateRemoteInstanceHypervisor,
IncomingTransferHypervisor,
OutgoingTransferHypervisor,
)
from librarian_background.rolling_deletion import RollingDeletion

from .check_integrity import CheckIntegrity
from .create_clone import CreateLocalClone
Expand Down Expand Up @@ -213,6 +215,46 @@ def task(self) -> IncomingTransferHypervisor:
)


class DuplicateRemoteInstanceHypervisorSettings(BackgroundTaskSettings):
"""
Settings for the duplicate instance hypervisor task.
"""

@property
def task(self) -> DuplicateRemoteInstanceHypervisor:
return DuplicateRemoteInstanceHypervisor(
name=self.task_name,
soft_timeout=self.soft_timeout,
)


class RollingDeletionSettings(BackgroundTaskSettings):
"""
Settings for the rolling deletion task
"""

store_name: str
age_in_days: float

number_of_remote_copies: int = 3
verify_downstream_checksums: bool = True
mark_unavailable: bool = True
force_deletion: bool = True

@property
def task(self) -> RollingDeletion:
return RollingDeletion(
name=self.task_name,
soft_timeout=self.soft_timeout,
store_name=self.store_name,
age_in_days=self.age_in_days,
number_of_remote_copies=self.number_of_remote_copies,
verify_downstream_checksums=self.verify_downstream_checksums,
mark_unavailable=self.mark_unavailable,
force_deletion=self.force_deletion,
)


class BackgroundSettings(BaseSettings):
"""
Background task settings, configurable.
Expand Down Expand Up @@ -240,6 +282,11 @@ class BackgroundSettings(BaseSettings):

outgoing_transfer_hypervisor: list[OutgoingTransferHypervisorSettings] = []
incoming_transfer_hypervisor: list[IncomingTransferHypervisorSettings] = []
duplicate_remote_instance_hypervisor: list[
DuplicateRemoteInstanceHypervisorSettings
] = []

rolling_deletion: list[RollingDeletionSettings] = []

# Global settings:

Expand Down
63 changes: 63 additions & 0 deletions tests/integration_test/test_hypervisors.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,66 @@ def delete_transfers(source_transfer: int, destination_transfer: int):

assert mocked_admin_client.remove_librarian(name="live_server")
assert admin_client.remove_librarian(name="test_server")


def test_remote_instance_duplicate(
test_server_with_many_files_and_errors,
test_orm,
mocked_admin_client,
server,
):
from librarian_background.hypervisor import DuplicateRemoteInstanceHypervisor

assert mocked_admin_client.add_librarian(
name="live_server",
url="http://localhost",
authenticator="admin:password", # This is the default authenticator.
port=server.id,
)

used_file_names = []
ids_to_keep = []
ids_to_delete = []

# Get a bunch of files
with test_server_with_many_files_and_errors[1]() as session:
librarian = (
session.query(test_orm.Librarian).filter_by(name="live_server").one()
)

files = session.query(test_orm.File).limit(10).all()

# Create two remote instances for each
for file in files:
used_file_names.append(file.name)

ri_a = test_orm.RemoteInstance.new_instance(
file=file, store_id=2, librarian=librarian
)
ri_b = test_orm.RemoteInstance.new_instance(
file=file, store_id=2, librarian=librarian
)

session.add_all((ri_a, ri_b))
session.commit()

ids_to_keep.append(ri_a.id)
ids_to_delete.append(ri_b.id)

# Now can run the hypervisor
with test_server_with_many_files_and_errors[1]() as session:
DuplicateRemoteInstanceHypervisor(name="").core(session)

with test_server_with_many_files_and_errors[1]() as session:
for file_name in used_file_names:
file = session.query(test_orm.File).filter_by(name=file_name).one()

for ri in file.remote_instances:
assert ri.id in ids_to_keep
assert not ri.id in ids_to_delete

session.delete(ri)

session.commit()

assert mocked_admin_client.remove_librarian(name="live_server")

0 comments on commit 523e836

Please sign in to comment.