Skip to content

Commit

Permalink
Merge branch 'main' into JBorrow/issue97
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow authored Nov 14, 2024
2 parents a9e8da7 + 140fc77 commit 1a9118f
Show file tree
Hide file tree
Showing 17 changed files with 557 additions and 662 deletions.
63 changes: 38 additions & 25 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
"""

import datetime
import logging
import time

from loguru import logger
from schedule import CancelJob
from sqlalchemy.orm import Session

from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import Instance, StoreMetadata

from .task import Task

logger = logging.getLogger("schedule")


class CheckIntegrity(Task):
"""
Expand Down Expand Up @@ -47,26 +45,35 @@ def core(self, session: Session):
Frame this out with the session so that it is automatically closed.
"""
try:
logger.info(
"Checking integrity of store {}, age_in_days={}",
self.store_name,
self.age_in_days,
)
store = self.get_store(session=session)
except ValueError:
# Store doesn't exist. Cancel this job.
log_to_database(
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.CONFIGURATION,
message=f"Store {self.store_name} does not exist. Cancelling job. Please update the configuration.",
session=session,
logger.error(
"Store {} does not exist, cancelling job: please update configuration",
self.store_name,
)
return CancelJob

# Now figure out what files were uploaded in the past age_in_days days.
start_time = datetime.datetime.now() - datetime.timedelta(days=self.age_in_days)

# Now we can query the database for all files that were uploaded in the past age_in_days days.
query_start = time.perf_counter()
files = (
session.query(Instance)
.filter(Instance.store == store and Instance.created_time > start_time)
.all()
)
query_end = time.perf_counter()
logger.info(
"Queried database for instances created since {} in {} seconds",
start_time,
query_end - query_start,
)

all_files_fine = True

Expand All @@ -79,40 +86,46 @@ def core(self, session: Session):
)
except FileNotFoundError:
all_files_fine = False
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_AVAILABILITY,
message=f"File {file.path} on store {store.name} is missing. (Instance: {file.id})",
session=session,
logger.error(
"Instance {} on store {} is missing. (Instance: {})",
file.path,
store.name,
file.id,
)
continue

# Compare checksum to database
expected_checksum = file.file.checksum

if compare_checksums(expected_checksum, path_info.checksum):
# File is fine.
logger.info(
f"File {file.path} on store {store.name} has been validated."
"Instance {} on store {} has been validated (Instance: {})",
file.path,
store.name,
file.id,
)
continue
else:
# File is not fine. Log it.
all_files_fine = False
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.checksum}. (Instance: {file.id})",
session=session,
logger.error(
"Instance {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance: {})",
file.path,
store.name,
expected_checksum,
path_info.checksum,
file.id,
)

if all_files_fine:
logger.info(
f"All files uploaded since {start_time} on store {store.name} have been validated."
"All files uploaded since {} on store {} have been validated.",
start_time,
store.name,
)
else:
logger.error(
f"Some files uploaded since {start_time} on store {store.name} have not been validated. Please check the logs."
"Store {} has files with incorrect checksums.",
store.name,
)

return all_files_fine
Loading

0 comments on commit 1a9118f

Please sign in to comment.