diff --git a/librarian_background/check_integrity.py b/librarian_background/check_integrity.py index cccb5f7..1e4d691 100644 --- a/librarian_background/check_integrity.py +++ b/librarian_background/check_integrity.py @@ -3,20 +3,18 @@ """ import datetime -import logging +import time +from loguru import logger from schedule import CancelJob from sqlalchemy.orm import Session from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.database import get_session -from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database from librarian_server.orm import Instance, StoreMetadata from .task import Task -logger = logging.getLogger("schedule") - class CheckIntegrity(Task): """ @@ -47,14 +45,16 @@ def core(self, session: Session): Frame this out with the session so that it is automatically closed. """ try: + logger.info( + "Checking integrity of store {}, age_in_days={}", + self.store_name, + self.age_in_days, + ) store = self.get_store(session=session) except ValueError: - # Store doesn't exist. Cancel this job. - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=f"Store {self.store_name} does not exist. Cancelling job. Please update the configuration.", - session=session, + logger.error( + "Store {} does not exist, cancelling job: please update configuration", + self.store_name, ) return CancelJob @@ -62,11 +62,18 @@ def core(self, session: Session): start_time = datetime.datetime.now() - datetime.timedelta(days=self.age_in_days) # Now we can query the database for all files that were uploaded in the past age_in_days days. + query_start = time.perf_counter() files = ( session.query(Instance) .filter(Instance.store == store and Instance.created_time > start_time) .all() ) + query_end = time.perf_counter() + logger.info( + "Queried database for instances created since {} in {} seconds", + start_time, + query_end - query_start, + ) all_files_fine = True @@ -79,11 +86,11 @@ def core(self, session: Session): ) except FileNotFoundError: all_files_fine = False - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_AVAILABILITY, - message=f"File {file.path} on store {store.name} is missing. (Instance: {file.id})", - session=session, + logger.error( + "Instance {} on store {} is missing. (Instance: {})", + file.path, + store.name, + file.id, ) continue @@ -91,28 +98,34 @@ def core(self, session: Session): expected_checksum = file.file.checksum if compare_checksums(expected_checksum, path_info.checksum): - # File is fine. logger.info( - f"File {file.path} on store {store.name} has been validated." + "Instance {} on store {} has been validated (Instance: {})", + file.path, + store.name, + file.id, ) continue else: # File is not fine. Log it. all_files_fine = False - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"File {file.path} on store {store.name} has an incorrect checksum. Expected {expected_checksum}, got {path_info.checksum}. (Instance: {file.id})", - session=session, + logger.error( + "Instance {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance: {})", + file.path, + store.name, + expected_checksum, + path_info.checksum, + file.id, ) - if all_files_fine: logger.info( - f"All files uploaded since {start_time} on store {store.name} have been validated." + "All files uploaded since {} on store {} have been validated.", + start_time, + store.name, ) else: logger.error( - f"Some files uploaded since {start_time} on store {store.name} have not been validated. Please check the logs." + "Store {} has files with incorrect checksums.", + store.name, ) return all_files_fine diff --git a/librarian_background/create_clone.py b/librarian_background/create_clone.py index ff0aac6..e191acb 100644 --- a/librarian_background/create_clone.py +++ b/librarian_background/create_clone.py @@ -4,23 +4,21 @@ """ import datetime -import logging +import time from pathlib import Path from typing import Optional +from loguru import logger from schedule import CancelJob from sqlalchemy import select from sqlalchemy.orm import Session from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.database import get_session -from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database from librarian_server.orm import CloneTransfer, Instance, StoreMetadata, TransferStatus from .task import Task -logger = logging.getLogger("schedule") - class CreateLocalClone(Task): """ @@ -59,11 +57,9 @@ def core(self, session: Session): store_from = self.get_store(self.clone_from, session) except ValueError: # Store doesn't exist. Cancel this job. - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=f"Store {self.clone_from} does not exist. Cancelling job. Please update the configuration.", - session=session, + logger.error( + "Store {} does not exist, cancelling job: please update configuration", + self.clone_from, ) return CancelJob @@ -76,11 +72,9 @@ def core(self, session: Session): stores_to = [self.get_store(self.clone_to, session)] except ValueError: # Store doesn't exist. Cancel this job. - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=f"Store {self.clone_to} does not exist. Cancelling job. Please update the configuration.", - session=session, + logger.error( + "Store {} does not exist, cancelling job: please update configuration", + self.clone_from, ) return CancelJob @@ -89,11 +83,9 @@ def core(self, session: Session): all_disabled = all_disabled and not store.enabled if all_disabled: - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=f"All stores in {self.clone_to} are disabled. It is likely that all stores are full.", - session=session, + logger.error( + "All stores in {} are disabled. It is likely that all stores are full.", + self.clone_to, ) return @@ -111,6 +103,8 @@ def core(self, session: Session): # 5. Get all instances on the source store that are in the difference and are younger than start_time. # 6. Clone all of these instances to the destination store. + query_start = time.perf_counter() + source_store_id = store_from.id destination_store_ids = [store.id for store in stores_to] @@ -138,6 +132,14 @@ def core(self, session: Session): instances: list[Instance] = session.execute(query).scalars().all() + query_end = time.perf_counter() + + logger.info( + "Queried database for local clone instances created since {} in {} seconds", + start_time, + query_end - query_start, + ) + successful_clones = 0 unnecessary_clones = 0 all_transfers_successful = True @@ -153,14 +155,16 @@ def core(self, session: Session): else False ): logger.info( - "CreateLocalClone task has gone over time. Will reschedule for later." + "CreateLocalClone task has gone over time; will reschedule for later" ) break if successful_clones > self.files_per_run: logger.info( - f"CreateLocalClone task has cloned {successful_clones} files, which is over " - f"the limit of {self.files_per_run}. Will reschedule for later." + "CreateLocalClone has cloned {} files, which is over the limit of {}; " + "will reschedule for later", + successful_clones, + self.files_per_run, ) break @@ -170,7 +174,8 @@ def core(self, session: Session): if secondary_instance.store in stores_to: unnecessary_clones += 1 logger.debug( - f"File instance {instance} already exists on clone_to store. Skipping." + "File instance {} already exists on clone_to store. Skipping.", + instance, ) continue @@ -186,11 +191,9 @@ def core(self, session: Session): if self.disable_store_on_full: store.enabled = False session.commit() - log_to_database( - severity=ErrorSeverity.WARNING, - category=ErrorCategory.STORE_FULL, - message=f"Store {store} is full. Disabling; please replace the disk.", - session=session, + logger.warning( + "Store {} is full. Disabling; please replace the disk", + store, ) continue @@ -200,14 +203,10 @@ def core(self, session: Session): break if not store_available: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.STORE_FULL, - message=( - f"File {instance.file.name} is too large to fit on any store in " - f"{self.clone_to}. Skipping. (Instance {instance.id})" - ), - session=session, + logger.error( + "File {} is too large to fit on any store in {}; skipping", + instance.file.name, + self.clone_to, ) all_transfers_successful = False @@ -232,14 +231,11 @@ def core(self, session: Session): file_name=Path(instance.file.name).name, ) except ValueError: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.STORE_FULL, - message=( - f"File {instance.file.name} is too large to fit on store {store_to}. " - f"Skipping, but this should have already have been caught. (Instance {instance.id})" - ), - session=session, + logger.error( + "File {} is too large to fit on store {}; skipping, but should already have been" + "caught, check the logic", + instance.file.name, + store_to, ) transfer.fail_transfer(session=session) @@ -267,14 +263,16 @@ def core(self, session: Session): break logger.debug( - f"Failed to transfer file {instance.path} to store {store_to} using transfer manager {transfer_manager}." + "Failed to transfer file {} to store {} using transfer manager {}", + instance.path, + store_to, + tm_name, ) except FileNotFoundError as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_AVAILABILITY, - message=f"File {e.filename} does not exist when trying to clone from {store_from}. Skipping. (Instance {instance.id})", - session=session, + logger.error( + "File {} does not exist when trying to clone from {}, skipping", + e.filename, + store_from, ) transfer.fail_transfer(session=session) @@ -285,16 +283,13 @@ def core(self, session: Session): if not success: # Fail the transfer _here_, not after trying every transfer manager. - - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_AVAILABILITY, - message=f"Failed to transfer file {instance.path} to store {store_to}. Skipping. (Instance {instance.id})", - session=session, + logger.error( + "Failed to transfer file {} to store {}, skipping", + instance.path, + store_to, ) transfer.fail_transfer(session=session) - all_transfers_successful = False continue @@ -312,18 +307,19 @@ def core(self, session: Session): ) if not compare_checksums(path_info.checksum, instance.file.checksum): - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"File {instance.path} on store {store_to} has an incorrect checksum. " - f"Expected {instance.file.checksum}, got {path_info.checksum}. (Instance {instance.id})", - session=session, + logger.error( + "File {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance {})", + instance.path, + store_to, + instance.file.checksum, + path_info.checksum, + instance.id, ) - transfer.fail_transfer(session=session) + # TODO: Use the corrupt file table here. + transfer.fail_transfer(session=session) store_to.store_manager.unstage(staged_path) - all_transfers_successful = False continue @@ -336,35 +332,29 @@ def core(self, session: Session): staging_path=staged_path, store_path=resolved_store_path ) except FileExistsError: - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=f"File {instance.path} already exists on store {store_to}. Skipping. (Instance {instance.id})", - session=session, + logger.error( + "File {} already exists on store {}. Skipping. (Instance {})", + instance.path, + store_to, + instance.id, ) store_to.store_manager.unstage(staging_name) - transfer.fail_transfer(session=session) - all_transfers_successful = False continue except ValueError as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=( - f"Failed to commit file {instance.path} to store {store_to}: {e}. " - f"Skipping. (Instance {instance.id})" - ), - session=session, + logger.error( + "Failed to commit file {} to store {}: {}. Skipping. (Instance {})", + instance.path, + store_to, + e, + instance.id, ) transfer.fail_transfer(session=session) - store_to.store_manager.unstage(staged_path) - all_transfers_successful = False continue diff --git a/librarian_background/hypervisor.py b/librarian_background/hypervisor.py index 4555076..56c4a23 100644 --- a/librarian_background/hypervisor.py +++ b/librarian_background/hypervisor.py @@ -9,14 +9,15 @@ """ import datetime +import time +from loguru import logger from sqlalchemy.orm import Session from hera_librarian.exceptions import LibrarianHTTPError, LibrarianTimeoutError from hera_librarian.models.checkin import CheckinStatusRequest, CheckinStatusResponse from hera_librarian.utils import compare_checksums from librarian_server.database import get_session -from librarian_server.logger import ErrorCategory, ErrorSeverity, log, log_to_database from librarian_server.orm import ( Librarian, OutgoingTransfer, @@ -33,6 +34,7 @@ def get_stale_of_type(session: Session, age_in_days: int, transfer_type: object) Get the stale transfers of a given type. """ + query_start = time.perf_counter() # Get the stale outgoing transfers stale_since = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta( days=age_in_days @@ -48,7 +50,17 @@ def get_stale_of_type(session: Session, age_in_days: int, transfer_type: object) ) ) - return session.execute(transfer_stmt).scalars().all() + result = session.execute(transfer_stmt).scalars().all() + query_end = time.perf_counter() + + logger.info( + "Queried database for stale transfers of type {} (age {}d) in {} seconds", + transfer_type, + age_in_days, + query_end - query_start, + ) + + return result def handle_stale_outgoing_transfer( @@ -68,11 +80,9 @@ def handle_stale_outgoing_transfer( ) if not downstream_librarian: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"Downstream librarian {transfer.destination} not found", - session=session, + logger.error( + "Downstream librarian {} not found, cancelling transfer", + transfer.destination, ) transfer.fail_transfer(session=session, commit=False) @@ -84,31 +94,34 @@ def handle_stale_outgoing_transfer( expected_file_name = transfer.file.name expected_file_checksum = transfer.file.checksum + logger.info( + "Calling librarian {} to ask for information on file {} (outgoing transfer {})" + " with checksum {}", + transfer.destination, + expected_file_name, + transfer.id, + expected_file_checksum, + ) + try: potential_files = client.search_files( name=expected_file_name, ) except (LibrarianHTTPError, LibrarianTimeoutError) as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Unacceptable error when trying to check if librarian {transfer.destination}" - f"has file {expected_file_name} with exception {e}." - ), - session=session, + logger.error( + "Unacceptable error when trying to check if librarian {} has file {} with exception {}", + transfer.destination, + expected_file_name, + e, ) return False if not potential_files: # The downstream does not have the file. We should cancel the transfer. - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"Downstream librarian {transfer.destination} does " - f"not have file {expected_file_name} and the transfer is stale. " - "Cancelling the transfer.", - session=session, + logger.error( + "Downstream librarian {} does not have file {}, cancelling transfer", + transfer.destination, + expected_file_name, ) # Must commit; need to save this cancel state. The file will never get there @@ -120,13 +133,11 @@ def handle_stale_outgoing_transfer( available_store_ids = {i.store_id for f in potential_files for i in f.instances} if len(available_checksums) != 1: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"Multiple (or zero, actual {len(available_checksums)}) checksums " - f"found for file {expected_file_name} " - f"on downstream librarian {transfer.destination}.", - session=session, + logger.error( + "Multiple (or zero, actual {}) checksums found for file {} on downstream librarian {}", + len(available_checksums), + expected_file_name, + transfer.destination, ) transfer.fail_transfer(session=session, commit=True) @@ -137,14 +148,12 @@ def handle_stale_outgoing_transfer( available_store_id = available_store_ids.pop() if not compare_checksums(available_checksum, expected_file_checksum): - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"Checksum mismatch for file {expected_file_name} " - f"on downstream librarian {transfer.destination}.", - session=session, + logger.error( + "Checksum mismatch for file {} on downstream librarian {}", ) + # TODO: Use corrupt files database here? + transfer.fail_transfer(session=session, commit=True) return False @@ -162,15 +171,12 @@ def handle_stale_outgoing_transfer( session.commit() - log_to_database( - severity=ErrorSeverity.INFO, - category=ErrorCategory.TRANSFER, - message=( - f"Successfully registered remote instance for {transfer.destination} and " - f"transfer {transfer.id} based upon stale transfer with " - f"status {transfer.status}." - ), - session=session, + logger.info( + "Successfully registered remote instance for {} and transfer {} based " + "upon stale transfer with status {}", + transfer.destination, + transfer.id, + transfer.status, ) return True @@ -186,11 +192,9 @@ def handle_stale_incoming_transfer( ) if not upstream_librarian: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.DATA_INTEGRITY, - message=f"Upstream librarian {transfer.source} not found", - session=session, + logger.error( + "Upstream librarian {} not found, cancelling transfer", + transfer.source, ) transfer.fail_transfer(session=session, commit=True) @@ -205,6 +209,12 @@ def handle_stale_incoming_transfer( destination_transfer_ids=[], ) + logger.info( + "Calling librarian {} to ask for information on incoming transfer {}", + transfer.source, + transfer.id, + ) + try: response: CheckinStatusResponse = client.post( "checkin/status", request=status_request, response=CheckinStatusResponse @@ -212,15 +222,14 @@ def handle_stale_incoming_transfer( source_status = response.source_transfer_status[transfer.source_transfer_id] except Exception as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.TRANSFER, - message=( - f"Unsuccessfully tried to contact {transfer.source} for information on " - f"transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}). " - f"Exception: {e}. We are failing {transfer.id}" - ), - session=session, + logger.error( + "Unsuccessfully tried to contact {} for information on transfer " + "(local: {}, remote: {}), exception {}. We are failing {}", + transfer.source, + transfer.id, + transfer.source_transfer_id, + e, + transfer.id, ) # This implies that the transfer doesn't exist on the remote. @@ -240,15 +249,13 @@ def handle_stale_incoming_transfer( # is an explicitly bad state as we are a push-based system. if source_status in [TransferStatus.COMPLETED]: - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=( - f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " - f"COMPLETED status on remote but {transfer.status} on local. This is " - f"an unreachable state for file {transfer.upload_name}. Requires manual check" - ), - session=session, + logger.error( + "Transfer (local: {}, remote: {}) has COMPLETED status on remote but {} " + "on local. This is an unreachable state for file {}. Requires manual check", + transfer.id, + transfer.source_transfer_id, + transfer.status, + transfer.upload_name, ) transfer.fail_transfer(session=session, commit=True) @@ -256,7 +263,7 @@ def handle_stale_incoming_transfer( if source_status in [TransferStatus.CANCELLED, TransferStatus.FAILED]: # This one's a gimmie. - log.error( + logger.error( f"Found end status for incoming transfer {transfer.id} on remote, cancelling" ) transfer.fail_transfer(session=session, commit=True) @@ -264,7 +271,7 @@ def handle_stale_incoming_transfer( if source_status == transfer.status: # This is the remote's responsibility. - log.info( + logger.info( f"Found same for incoming transfer {transfer.id} on remote, continuing" ) return True @@ -272,15 +279,14 @@ def handle_stale_incoming_transfer( # We only get here in annoying scenarios. if transfer.status == TransferStatus.INITIATED: # Remote more advanced. - log_to_database( - severity=ErrorSeverity.INFO, - category=ErrorCategory.TRANSFER, - message=( - f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " - f"more advanced state on remote ({source_status} > {transfer.status}). Catching" - f"up our transfer." - ), - session=session, + logger.info( + "Transfer (local: {}, remote: {}) has more advanced state on remote " + "({}, {} > {}). Catching up our transfer", + transfer.id, + transfer.source_transfer_id, + transfer.source, + source_status, + transfer.status, ) transfer.status = source_status @@ -289,15 +295,11 @@ def handle_stale_incoming_transfer( if transfer.status == TransferStatus.STAGED: # Uh, this should be picked up by a different task (recv_clone) - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=( - f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " - "status STAGED and is being picked up by the hypervisor task. This should not " - "occur; recommend manual check" - ), - session=session, + logger.error( + "Transfer (local: {}, remote: {}) has status STAGED and is being picked up by " + "the hypervisor task. This should not occur; recommend manual check", + transfer.id, + transfer.source_transfer_id, ) return False @@ -308,29 +310,26 @@ def handle_stale_incoming_transfer( else: assert source_status == TransferStatus.STAGED # Remote more advanced (STAGED) - log_to_database( - severity=ErrorSeverity.INFO, - category=ErrorCategory.TRANSFER, - message=( - f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " - f"more advanced state on remote ({source_status} > {transfer.status}). Catching" - f"up our transfer." - ), - session=session, + logger.info( + "Transfer (local: {}, remote: {}) has more advanced state on remote " + "({}, {} > {}). Catching up our transfer", + transfer.id, + transfer.source_transfer_id, + transfer.source, + source_status, + transfer.status, ) transfer.status = source_status session.commit() return True - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=( - f"Transfer (local: {transfer.id}, remote: {transfer.source_transfer_id}) has " - "fallen through the hypervisor. Recommend manual check" - ), - session=session, + # We should never get here. + logger.error( + "Transfer (local: {}, remote: {}) has fallen through the hypervisor. " + "Recommend manual check", + transfer.id, + transfer.source_transfer_id, ) return True @@ -358,6 +357,12 @@ def core(self, session): stale_transfers = get_stale_of_type(session, self.age_in_days, OutgoingTransfer) + logger.info( + "Found {} stale outgoing transfers of age {} days", + len(stale_transfers), + self.age_in_days, + ) + for transfer in stale_transfers: current_time = datetime.datetime.now(datetime.timezone.utc) @@ -391,6 +396,12 @@ def core(self, session): stale_transfers = get_stale_of_type(session, self.age_in_days, IncomingTransfer) + logger.info( + "Found {} stale incoming transfers of age {} days", + len(stale_transfers), + self.age_in_days, + ) + for transfer in stale_transfers: current_time = datetime.datetime.now(datetime.timezone.utc) diff --git a/librarian_background/queues.py b/librarian_background/queues.py index fb46c48..22f06d9 100644 --- a/librarian_background/queues.py +++ b/librarian_background/queues.py @@ -10,15 +10,16 @@ """ import datetime +import time from pathlib import Path from typing import TYPE_CHECKING, Callable +from loguru import logger from sqlalchemy import func, select from hera_librarian.exceptions import LibrarianError from hera_librarian.transfer import TransferStatus from librarian_server.database import get_session -from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database from librarian_server.orm.sendqueue import SendQueue from librarian_server.settings import server_settings @@ -122,10 +123,20 @@ def check_on_consumed( """ with session_maker() as session: + query_start = time.perf_counter() + stmt = select(SendQueue).with_for_update(skip_locked=True) stmt = stmt.filter_by(consumed=True).filter_by(completed=False) queue_items = session.execute(stmt).scalars().all() + query_end = time.perf_counter() + + logger.info( + "Queried database for {} consumed items in {} seconds", + len(queue_items), + query_end - query_start, + ) + if len(queue_items) == 0: return False @@ -134,42 +145,43 @@ def check_on_consumed( # We are out of time. return False + logger.info( + "Handling queue item {q.id} with {q.retries} retries", q=queue_item + ) + current_status = queue_item.async_transfer_manager.transfer_status( settings=server_settings ) if current_status == TransferStatus.INITIATED: + logger.info("Transfer for {q.id} is still ongoing", q=queue_item) continue elif current_status == TransferStatus.COMPLETED: if complete_status == TransferStatus.STAGED: + logger.info("Transfer for {q.id} is staged", q=queue_item) try: queue_item.update_transfer_status( new_status=complete_status, session=session, ) except LibrarianError as e: - log_to_database( - severity=ErrorSeverity.WARNING, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Librarian {queue_item.destination} was not available for " - f"contact, returning error {e}. We will try again later." - ), - session=session, + logger.info( + "Librarian {q.destination} was not available for contact, " + "returning error {e}, trying again later", + q=queue_item, + e=e, ) continue except AttributeError as e: # This is a larger problem; we are missing the associated # librarian in the database. Better ping! - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Librarian {queue_item.destination} was not found in " - f"the database, returning error {e}. Will try again later " - "to complete this transfer, but remedy is suggested." - ), + logger.info( + "Librarian {q.destination} was not found in the database, " + "returning error {e}, trying again later, but likely this requires " + "manual remedy", + q=queue_item, + e=e, ) continue @@ -178,21 +190,20 @@ def check_on_consumed( "No other status than STAGED is supported for checking on consumed" ) elif current_status == TransferStatus.FAILED: + logger.info("Transfer for {q.id} has failed", q=queue_item) for transfer in queue_item.transfers: transfer.fail_transfer(session=session, commit=False) else: - log_to_database( - severity=ErrorSeverity.WARNING, - category=ErrorCategory.TRANSFER, - message=( - f"Incompatible return value for transfer status from " - f"SendQueue item {queue_item.id} ({current_status})." - ), - session=session, + logger.error( + "Incompatible return value for transfer status from " + "SendQueue item {queue_item.id} ({current_status})", + queue_item=queue_item, + current_status=current_status, ) continue # If we got down here, we can mark the transfer as consumed. + logger.info("Marking {q.id} as completed", q=queue_item) queue_item.completed = True queue_item.completed_time = datetime.datetime.now(datetime.timezone.utc) @@ -210,24 +221,40 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool: """ with session_maker() as session: + query_start = time.perf_counter() stmt = select(SendQueue).with_for_update(skip_locked=True) stmt = stmt.filter_by(completed=False).filter_by(consumed=False) stmt = stmt.order_by(SendQueue.priority.desc(), SendQueue.created_time) queue_item = session.execute(stmt).scalar() + query_end = time.perf_counter() + + logger.info( + "Queried database for next queue item in {} seconds", + query_end - query_start, + ) if queue_item is None: + logger.info("Found no new queue item, returning") # Nothing to do! return False # Now, check we don't have too much going on. + query_start = time.perf_counter() stmt = ( select(func.count(SendQueue.id)) .filter_by(consumed=True) .filter_by(completed=False) ) in_flight = session.execute(stmt).scalar() + query_end = time.perf_counter() + + logger.info( + "Queried database for in-flight items in {} seconds", + query_end - query_start, + ) if in_flight > server_settings.max_async_inflight_transfers: + logger.info("Too many in-flight items, returning") # Too much to do! return False @@ -235,6 +262,11 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool: transfer_list = [ (Path(x.source_path), Path(x.dest_path)) for x in queue_item.transfers ] + logger.info( + "Consuming queue item {q.id} ({n} transfers)", + q=queue_item, + n=len(transfer_list), + ) # Need to create a copy here in case there is an internal state # change. Otherwise SQLAlchemy won't write it back. transfer_manager = queue_item.async_transfer_manager.model_copy() @@ -249,10 +281,22 @@ def consume_queue_item(session_maker: Callable[[], "Session"]) -> bool: # Be careful, the internal state of the async transfer manager # may have changed. Send it back. queue_item.async_transfer_manager = transfer_manager + + logger.info("Successfully consumed queue item {q.id}", q=queue_item) else: queue_item.retries += 1 + logger.warning( + "Failed to consume queue item {q.id} ({r}/{m_r} retries)", + q=queue_item, + r=queue_item.retries, + m_r=server_settings.max_async_send_retries, + ) if queue_item.retries > server_settings.max_async_send_retries: + logger.error( + "Queue item {q.id} has exceeded maximum retries, failing", + q=queue_item, + ) queue_item.fail(session=session) session.commit() diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index c1d3319..cc45ecb 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -6,17 +6,18 @@ import datetime import logging +import time import traceback from pathlib import Path from typing import TYPE_CHECKING, Optional +from loguru import logger from sqlalchemy.orm import Session from hera_librarian.deletion import DeletionPolicy from hera_librarian.exceptions import LibrarianHTTPError from hera_librarian.models.clone import CloneCompleteRequest, CloneCompleteResponse from librarian_server.database import get_session -from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database from librarian_server.orm import ( File, IncomingTransfer, @@ -28,8 +29,6 @@ from .task import Task -logger = logging.getLogger("schedule") - class RecieveClone(Task): """ @@ -52,17 +51,25 @@ def core(self, session: Session): core_begin = datetime.datetime.now(datetime.timezone.utc) + query_start = time.perf_counter() # Find incoming transfers that are STAGED ongoing_transfers: list[IncomingTransfer] = ( session.query(IncomingTransfer) .filter_by(status=TransferStatus.STAGED) .all() ) + query_end = time.perf_counter() + + logger.info( + "Query for {n} incoming transfers took {t} seconds", + n=len(ongoing_transfers), + t=query_end - query_start, + ) all_transfers_succeeded = True if len(ongoing_transfers) == 0: - logger.info("No ongoing transfers to process.") + logger.info("No ongoing transfers to process") transfers_processed = 0 @@ -76,13 +83,14 @@ def core(self, session: Session): else False ): logger.info( - "RecieveClone task has gone over time. Will reschedule for later." + "RecieveClone task has gone over time. Will reschedule for later" ) break if transfers_processed >= self.files_per_run: logger.info( - f"Processed {transfers_processed} transfers, which is the maximum for this run." + "Processed {} transfers, which is the maximum for this run", + transfers_processed, ) break @@ -90,14 +98,10 @@ def core(self, session: Session): store: StoreMetadata = transfer.store if store is None: - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=( - f"Transfer {transfer.id} has no store associated with it. " - "Skipping for now, but this should never happen." - ), - session=session, + logger.error( + "Transfer {} has no store associated with it." + "Skipping for now, but this should never happen", + transfer.id, ) all_transfers_succeeded = False @@ -105,17 +109,20 @@ def core(self, session: Session): continue try: + logger.info( + "Attempting to ingest file {t.upload_name} from transfer {t.id}", + t=transfer, + ) store.ingest_staged_file( transfer=transfer, session=session, deletion_policy=self.deletion_policy, ) except (FileNotFoundError, FileExistsError, ValueError) as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.PROGRAMMING, - message=traceback.format_exc(), - session=session, + logger.error( + "Failed to ingest file {t.upload_name} from transfer {t.id} with exception {e}", + t=transfer, + e=e, ) all_transfers_succeeded = False @@ -137,7 +144,7 @@ def core(self, session: Session): if librarian: # Need to call back logger.info( - f"Transfer {transfer.id} has completed. Calling back to librarian {librarian.name}." + f"Transfer {transfer.id} has completed. Calling back to librarian {librarian.name}" ) request = CloneCompleteRequest( @@ -151,26 +158,22 @@ def core(self, session: Session): downstream_client = librarian.client() try: - logger.info("Sending clone complete request.") + logger.info("Sending clone complete request") response: CloneCompleteResponse = downstream_client.post( endpoint="clone/complete", request=request, response=CloneCompleteResponse, ) except LibrarianHTTPError as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Failed to call back to librarian {librarian.name} " - f"with exception {e}." - ), - session=session, + logger.error( + "Failed to call back to librarian {name} with exception {e}", + name=librarian.name, + e=e, ) else: logger.error( f"Transfer {transfer.id} has no source librarian " - f"(source is {transfer.source}) - cannot callback." + f"(source is {transfer.source}) - cannot callback" ) transfers_processed += 1 diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index ca0eacf..8cdbac0 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -3,14 +3,12 @@ """ import datetime -import logging from typing import TYPE_CHECKING, Any, Optional from schedule import CancelJob from sqlalchemy import select from hera_librarian.client import LibrarianClient -from hera_librarian.errors import ErrorCategory, ErrorSeverity from hera_librarian.exceptions import ( LibrarianError, LibrarianHTTPError, @@ -22,7 +20,6 @@ CloneBatchInitiationResponse, ) from librarian_server.database import get_session -from librarian_server.logger import log_to_database from librarian_server.orm import ( File, Instance, @@ -41,9 +38,10 @@ if TYPE_CHECKING: from hera_librarian import LibrarianClient -from sqlalchemy.orm import Session +import time -logger = logging.getLogger("schedule") +from loguru import logger +from sqlalchemy.orm import Session def process_batch( @@ -62,6 +60,11 @@ def process_batch( # be inaccessable, or not be able to use certain transfer methods, # and we want the most uniform batch possible. + logger.info( + "Processing batch of {n} files to send", + n=len(files), + ) + valid_stores = set() if store_preference is not None: @@ -109,6 +112,11 @@ def process_batch( } ) + logger.info( + "Batch of {n} files to send prepared", + n=len(files), + ) + return outgoing_transfers, outgoing_information @@ -144,6 +152,12 @@ def use_batch_to_call_librarian( Truthy if the call was successful, False otherwise. """ + + logger.info( + "Using batch of {n} prepared files to call {lib} for egress", + n=len(outgoing_transfers), + lib=librarian.name if librarian is not None else None, + ) # Now the outgoing transfers all have IDs! We can create the batch # items. batch_items = [ @@ -171,14 +185,9 @@ def use_batch_to_call_librarian( potential_ids = e.full_response.get("source_transfer_ids", None) if potential_ids is None: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.PROGRAMMING, - message=( - "Librarian told us that they have a file, but did not provide " - "the source transfer ID." - ), - session=session, + logger.error( + "Librarian told us that they have a file, but did not provide a " + "source transfer ID" ) else: for id in potential_ids: @@ -190,18 +199,18 @@ def use_batch_to_call_librarian( # Oh no, we can't call up the librarian! if not remedy_success: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Unable to communicate with remote librarian for batch " - f"to stage clone with exception {e}." - ), - session=session, + logger.warning( + "Unable to communicate with remote librarian for batch to " + "stage clone with exception {e}", + e=e, ) # What a waste... Even if we did remedy the problem with the # already-existent file, we need to fail this over. + logger.warning( + "Failing existing {n} transfers due to failure to communicate with librarian", + n=len(outgoing_transfers), + ) for transfer in outgoing_transfers: transfer.fail_transfer(session=session, commit=False) @@ -210,15 +219,10 @@ def use_batch_to_call_librarian( return False except LibrarianTimeoutError as e: # Can't connect to the librarian. Log and move on... - - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Timeout when trying to communicate with remote librarian for batch " - f"to stage clone with exception {e}." - ), - session=session, + logger.warning( + "Timeout when trying to communicate with remote librarian for batch " + "to stage clone with exception {e}, failing transfers", + e=e, ) for transfer in outgoing_transfers: @@ -228,6 +232,14 @@ def use_batch_to_call_librarian( return False + logger.info( + "Successfully staged batch of {n} files for egress to {lib} with {b} " + "bytes available on store", + n=len(outgoing_transfers), + lib=librarian.name, + b=response.available_bytes_on_store, + ) + return response @@ -261,6 +273,8 @@ def create_send_queue_item( Mapping of source transfer IDs to the remote transfer information. """ + logger.info("Creating send queue item for {lib}", lib=librarian.name) + transfer_map: dict[int:CloneBatchInitiationRequestFileItem] = { x.source_transfer_id: x for x in response.uploads } @@ -277,15 +291,11 @@ def create_send_queue_item( # In all liklehood, this loop will never run. If it does, that's probably a bug. for tid in not_accepted_transfers: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.TRANSFER, - message=( - f"Transfer ID {tid} was not returned from the batch upload process. " - "Failing this transfer internally, and continuing, but this " - "should not happen." - ), - session=session, + logger.error( + "Transfer ID {} was not returned from the batch upload process. " + "Failing this transfer internally, and continuing, but this " + "should not happen", + tid, ) # Because we want to re-use the list, need to iterate through it. @@ -304,14 +314,10 @@ def create_send_queue_item( if len(response.async_transfer_providers) == 0: # No transfer providers are available at all for that librarian. - log_to_database( - severity=ErrorSeverity.WARNING, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"No transfer providers to send to {librarian}, " - f"were provided. Failing all associated transfers." - ), - session=session, + logger.error( + "No transfer providers to send to {librarian}, were provided." + "Failing all associated transfers", + librarian=librarian.name, ) for transfer in outgoing_transfers: @@ -328,15 +334,11 @@ def create_send_queue_item( if not transfer_provider.valid: # We couldn't find a valid transfer manager. We will have to fail it all. - log_to_database( - severity=ErrorSeverity.WARNING, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"No valid transfer manager found for transfer to {librarian}, " - f"was provided {list(response.async_transfer_providers.keys())}. Failing " - "all associated transfers." - ), - session=session, + logger.error( + "No valid transfer manager found for transfer to {librarian}, " + "was provided {providers}. Failing all associated transfers", + librarian=librarian.name, + providers=list(response.async_transfer_providers.keys()), ) for transfer in outgoing_transfers: @@ -347,6 +349,12 @@ def create_send_queue_item( # Break out of the loop. return False, None, None + logger.info( + "Successfully found transfer provider {provider} for {librarian}", + provider=transfer_provider, + librarian=librarian.name, + ) + send = SendQueue.new_item( priority=0, destination=librarian.name, @@ -357,6 +365,8 @@ def create_send_queue_item( session.add(send) session.commit() + logger.info("Successfully added new send queue item {send} to database", send=send) + return send, transfer_provider, transfer_map @@ -365,6 +375,12 @@ def call_destination_and_state_ongoing(send: SendQueue, session: Session): Call the destination librarian and state the transfer as ongoing. """ + logger.info( + "Calling destination librarian {lib} for send queue item {send} to set ONGOING", + lib=send.destination, + send=send.id, + ) + try: send.update_transfer_status( new_status=TransferStatus.ONGOING, @@ -374,16 +390,21 @@ def call_destination_and_state_ongoing(send: SendQueue, session: Session): # Incorrect downstream librarian. This is a weird programming error, # that is only reachable if someone deleted the librarian in the # database between this process starting and ending. - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=e.message, - session=session, + logger.error( + "Incorrect downstream librarian for send queue item {send}, {e}", + send=send.id, + e=e, ) except LibrarianError as e: # Can't call up downstream librarian. Already been called in. pass + logger.info( + "Successfully updated to ONGOING librarian {lib} for send queue item {send}", + lib=send.destination, + send=send.id, + ) + def handle_existing_file( session: Session, @@ -402,30 +423,22 @@ def handle_existing_file( up later by the hypervisor task """ - log_to_database( - severity=ErrorSeverity.INFO, - category=ErrorCategory.TRANSFER, - message=( - f"Librarian {librarian.name} told us that they already have the file " - f"from transfer {source_transfer_id}, attempting to handle and create " - "remote instance." - ), - session=session, + logger.info( + "Librarian {lib} told us that they already have the file from transfer " + "{source_transfer_id}, attempting to handle and create a remote instance", + lib=librarian.name, + source_transfer_id=source_transfer_id, ) transfer: OutgoingTransfer = session.get(OutgoingTransfer, source_transfer_id) if transfer is None: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.PROGRAMMING, - message=( - f"Transfer {source_transfer_id} does not exist, but we were told " - "by the downstream librarian that it does. There must be another " - "librarian that sent them the file, and the DAG nature of the " - "librarian is being violated." - ), - session=session, + logger.error( + "Transfer {source_transfer_id} does not exist, but we were told " + "by the downstream librarian that it does. There must be another " + "librarian that sent them the file, and the DAG nature of the " + "librarian is being violated.", + source_transfer_id=source_transfer_id, ) return False @@ -468,14 +481,10 @@ def core(self, session: Session): # Only used when there is a botched config. if librarian is None: # pragma: no cover - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Librarian {self.destination_librarian} does not exist within database. " - "Cancelling job. Please update the configuration (and re-start the librarian)." - ), - session=session, + logger.error( + "Librarian {dest} does not existi within the database. Cancelling job, " + "please update the configuration", + dest=self.destination_librarian, ) return CancelJob @@ -484,18 +493,15 @@ def core(self, session: Session): try: client.ping() except Exception as e: - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Librarian {self.destination_librarian} is unreachable. Skipping sending clones." - ), - session=session, + logger.warning( + "Librarian {dest} is unreachable. Skipping sending clones for now", + dest=self.destination_librarian, ) # No point canceling job, our freind could just be down for a while. return + query_start = time.perf_counter() current_time = datetime.datetime.now(datetime.timezone.utc) age_in_days = datetime.timedelta(days=self.age_in_days) oldest_file_age = current_time - age_in_days @@ -525,10 +531,13 @@ def core(self, session: Session): files_without_remote_instances: list[File] = ( session.execute(file_stmt).scalars().all() ) + query_end = time.perf_counter() logger.info( - f"Found {len(files_without_remote_instances)} files without remote instances, " - "and without ongoing transfers." + "Found {n} files without remote instances, " + "and without ongoing transfers in {t} seconds; preparing to send clones", + n=len(files_without_remote_instances), + t=query_end - query_start, ) if self.store_preference is not None: @@ -540,14 +549,9 @@ def core(self, session: Session): # Botched configuration! if use_store is None: # pragma: no cover - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.CONFIGURATION, - message=( - f"Store {self.store_preference} does not exist. Cancelling job. " - "Please update the configuration." - ), - session=session, + logger.error( + "Store {store} does not exist. Cancelling job. Please update the configuration", + store=self.store_preference, ) return CancelJob @@ -573,6 +577,13 @@ def core(self, session: Session): files_tried : files_tried + this_batch_size ] + logger.info( + "Sending batch of files to {dest} with {n}/{left} files", + dest=self.destination_librarian, + n=this_batch_size, + left=left_to_send, + ) + files_tried += this_batch_size outgoing_transfers, outgoing_information = process_batch( @@ -628,16 +639,10 @@ def core(self, session: Session): if remote_transfer_info is None: # pragma: no cover # This is an unreachable state; we already purged these # scenarios. - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.PROGRAMMING, - message=( - "Trying to set parameters of a transfer that should not " - "exist; this should be an unreachable state." - ), - session=session, + logger.error( + "Trying to set parameters of a transfer that should not exist; " + "this should be an unreachable state." ) - # In this case, the best thing that we can do is fail this individual # transfer and pick it up later. transfer.fail_transfer(session=session, commit=False) diff --git a/librarian_server/__init__.py b/librarian_server/__init__.py index 326192d..d6bad1b 100644 --- a/librarian_server/__init__.py +++ b/librarian_server/__init__.py @@ -18,18 +18,18 @@ async def slack_post_at_startup_shutdown(app: FastAPI): Lifespan event that posts to the slack hook once the FastAPI server starts up and shuts down. """ - from .logger import post_text_event_to_slack + from loguru import logger - post_text_event_to_slack("Librarian server starting up") + logger.info("Librarian server starting up") yield - post_text_event_to_slack("Librarian server shutting down") + logger.info("Librarian server shutting down") def main() -> FastAPI: - from .logger import log + from loguru import logger - log.info("Starting Librarian v2.0 server.") - log.debug("Creating FastAPI app instance.") + logger.info("Starting Librarian v2.0 server.") + logger.debug("Creating FastAPI app instance.") app = FastAPI( title=server_settings.displayed_site_name, @@ -38,7 +38,7 @@ def main() -> FastAPI: lifespan=slack_post_at_startup_shutdown, ) - log.debug("Adding API router.") + logger.debug("Adding API router.") from .api import ( admin_router, diff --git a/librarian_server/api/checkin.py b/librarian_server/api/checkin.py index 600daf9..36dc8e1 100644 --- a/librarian_server/api/checkin.py +++ b/librarian_server/api/checkin.py @@ -17,7 +17,7 @@ from librarian_server.orm.transfer import IncomingTransfer, OutgoingTransfer from ..database import yield_session -from ..logger import log, log_to_database +from ..logger import log from .auth import CallbackUserDependency, ReadappendUserDependency, User router = APIRouter(prefix="/api/v2/checkin") diff --git a/librarian_server/api/clone.py b/librarian_server/api/clone.py index e57ea4c..6df1769 100644 --- a/librarian_server/api/clone.py +++ b/librarian_server/api/clone.py @@ -378,7 +378,7 @@ def batch_stage( upload_name=upload.upload_name, ) except HTTPException as e: - log.warn(f"Error in batch staging: {e}") + log.warning(f"Error in batch staging: {e}") if e.status_code == status.HTTP_409_CONFLICT: bad_ids_exist.append(upload.source_transfer_id) diff --git a/librarian_server/api/validate.py b/librarian_server/api/validate.py index 8b40c6e..28d0ea1 100644 --- a/librarian_server/api/validate.py +++ b/librarian_server/api/validate.py @@ -24,7 +24,7 @@ from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from ..database import yield_session -from ..logger import log, log_to_database +from ..logger import log from ..orm.file import File from ..orm.librarian import Librarian from ..settings import server_settings @@ -193,14 +193,11 @@ async def validate_file( for info in checksum_info: if not info.computed_same_checksum and info.librarian == server_settings.name: - log_to_database( - severity=ErrorSeverity.CRITICAL, - category=ErrorCategory.DATA_INTEGRITY, - message=( - "File validation failed The checksums do not match for " - f"file {file.name} in store {instance.store.id}." - ), - session=session, + log.error( + "File validation failed, the checksums do not match for file " + "{} in store {}.", + request.file_name, + info.store, ) return FileValidationResponse(checksum_info) diff --git a/librarian_server/logger.py b/librarian_server/logger.py index 3b5976b..49d215c 100644 --- a/librarian_server/logger.py +++ b/librarian_server/logger.py @@ -2,120 +2,13 @@ Logging setup. Use this as 'from logger import log' """ -import inspect -import logging as log - -import requests -from sqlalchemy.orm import Session - -from hera_librarian.errors import ErrorCategory, ErrorSeverity +import loguru from .settings import server_settings -logging_level = log.getLevelName(server_settings.log_level) - -log.basicConfig( - encoding="utf-8", - level=logging_level, - format="(%(module)s:%(funcName)s) [%(asctime)s] {%(levelname)s}:%(message)s", -) - -error_severity_to_logging_level = { - ErrorSeverity.CRITICAL: log.CRITICAL, - ErrorSeverity.ERROR: log.ERROR, - ErrorSeverity.WARNING: log.WARNING, - ErrorSeverity.INFO: log.INFO, -} - -log.debug("Logging set up.") - - -def post_text_event_to_slack(text: str) -> None: - log.info(text) - - if not server_settings.slack_webhook_enable: - return - - requests.post( - server_settings.slack_webhook_url, - json={ - "username": server_settings.displayed_site_name, - "icon_emoji": ":ledger:", - "text": text, - }, - ) - - -def post_error_to_slack(error: "Error") -> None: - if not server_settings.slack_webhook_enable: - return - - if error.severity not in server_settings.slack_webhook_post_error_severity: - return - - if error.category not in server_settings.slack_webhook_post_error_category: - return - - requests.post( - server_settings.slack_webhook_url, - json={ - "username": server_settings.displayed_site_name, - "icon_emoji": ":ledger:", - "text": ( - f"*New Librarian Error at {server_settings.name}*\n" - f"> _Error Severity_: {error.severity.name}\n" - f"> _Error Category_: {error.category.name}\n" - f"> _Error Message_: {error.message}\n" - f"> _Error ID_: {error.id}\n" - f"> _Error Raised Time_: {error.raised_time}\n" - f"`{error.caller}`" - ), - }, - ) - - -def log_to_database( - severity: ErrorSeverity, category: ErrorCategory, message: str, session: Session -) -> None: - """ - Log an error to the database. - - Parameters - ---------- - - severity : ErrorSeverity - The severity of this error. - category : ErrorCategory - The category of this error. - message : str - The message describing this error. - session : Session - The database session to use. - - Notes - ----- - - Automatically stores the above frame's file name, function, and line number in - the 'caller' field of the error. - """ - - # Avoid circular imports. - from .orm.errors import Error - - log_level = error_severity_to_logging_level[severity] - log.log(log_level, message) - - caller = ( - inspect.stack()[1].filename - + ":" - + inspect.stack()[1].function - + ":" - + str(inspect.stack()[1].lineno) - ) - - error = Error.new_error(severity, category, message, caller=caller) +log_settings = server_settings.log_settings - session.add(error) - session.commit() +log_settings.setup_logs(server_settings.displayed_site_name) +loguru.logger.debug("Logging set up.") - post_error_to_slack(error) +log = loguru.logger diff --git a/librarian_server/orm/sendqueue.py b/librarian_server/orm/sendqueue.py index 505c281..ad4cbe8 100644 --- a/librarian_server/orm/sendqueue.py +++ b/librarian_server/orm/sendqueue.py @@ -19,6 +19,7 @@ import datetime from typing import TYPE_CHECKING +from loguru import logger from sqlalchemy.orm import Session from hera_librarian.async_transfers import CoreAsyncTransferManager @@ -28,7 +29,6 @@ from hera_librarian.transfer import TransferStatus from .. import database as db -from ..logger import log, log_to_database from .librarian import Librarian if TYPE_CHECKING: @@ -188,14 +188,9 @@ def update_transfer_status( ) except Exception as e: # Oh no, we can't call up the librarian! - log_to_database( - severity=ErrorSeverity.ERROR, - category=ErrorCategory.LIBRARIAN_NETWORK_AVAILABILITY, - message=( - f"Unable to communicate with remote librarian for batch " - f"status update, recieved response {e}." - ), - session=session, + logger.error( + f"Unable to communicate with remote librarian for batch " + f"status update, recieved response {e}." ) raise LibrarianError( diff --git a/librarian_server/settings.py b/librarian_server/settings.py index ddcd9b5..d31848f 100644 --- a/librarian_server/settings.py +++ b/librarian_server/settings.py @@ -7,12 +7,12 @@ from pathlib import Path from typing import TYPE_CHECKING, Optional -from pydantic import BaseModel, ValidationError, field_validator +import loguru +from notifiers.logging import NotificationHandler +from pydantic import BaseModel, Field, ValidationError, field_validator from pydantic_settings import BaseSettings, SettingsConfigDict from sqlalchemy import URL -from hera_librarian.errors import ErrorCategory, ErrorSeverity - from .stores import StoreNames if TYPE_CHECKING: @@ -50,6 +50,55 @@ def store_type_is_valid(cls, v: str) -> int: return StoreNames[v] +class LogSettings(BaseModel): + """ + Settings for the loguru logger. + """ + + files: dict[Path, str] = {} + "Egress files for the logger. Rotation (e.g. 500 MB, 1 week) is the string." + + # Slack integration; by default disable this. You will need a slack + # webhook url, and by default we raise all error alerts to slack too. + slack_webhook_enable: bool = False + slack_webhook_url: Optional[str] = None + slack_webhook_url_file: Optional[Path] = None + slack_webhook_level: str = "ERROR" + + def model_post_init(__context, *args, **kwargs): + """ + Post initialization for the model. + """ + if __context.slack_webhook_url_file is not None: + with open(__context.slack_webhook_url_file, "r") as handle: + __context.slack_webhook_url = handle.read().strip() + + def setup_logs(self, username: str): + for file_name, rotation in self.files.items(): + loguru.logger.add(file_name, rotation=rotation, enqueue=True) + + if self.slack_webhook_enable: + params = { + "username": username, + "icon_emoji": ":ledger:", + "webhook_url": self.slack_webhook_url, + } + + handler = NotificationHandler("slack", defaults=params) + + format = ( + "Librarian *{level: <8}*\n" + "> _Module_ `{name}`\n" + "> _Function_ `{function}:{line}`\n" + "> *{message}*\n" + "> _Raised at_ {time: YYYY-MM-DD HH:mm:ss.SSS}\n" + ) + + loguru.logger.add(handler, level=self.slack_webhook_level, format=format) + + return + + class ServerSettings(BaseSettings): """ Settings for the librarian server. Note that because this is a BaseSettings @@ -106,13 +155,8 @@ class ServerSettings(BaseSettings): # a specific destination. max_async_inflight_transfers: int = 64 - # Slack integration; by default disable this. You will need a slack - # webhook url, and by default we raise all log_to_database alerts to slack too. - slack_webhook_enable: bool = False - slack_webhook_url: Optional[str] = None - slack_webhook_url_file: Optional[Path] = None - slack_webhook_post_error_severity: list[ErrorSeverity] = list(ErrorSeverity) - slack_webhook_post_error_category: list[ErrorCategory] = list(ErrorCategory) + # Log settings + log_settings: LogSettings = Field(default_factory=LogSettings) # Globus integration; by default disable this. This contains a client ID and # login secret (for authenticating with Globus as a service), whether this @@ -144,10 +188,6 @@ def model_post_init(__context, *args, **kwargs): with open(__context.encryption_key_file, "r") as handle: __context.encryption_key = handle.read().strip() - if __context.slack_webhook_url_file is not None: - with open(__context.slack_webhook_url_file, "r") as handle: - __context.slack_webhook_url = handle.read().strip() - if __context.globus_client_secret_file is not None: with open(__context.globus_client_secret_file, "r") as handle: __context.globus_client_secret = handle.read().strip() diff --git a/librarian_server/stores/local.py b/librarian_server/stores/local.py index db017fa..fe801a2 100644 --- a/librarian_server/stores/local.py +++ b/librarian_server/stores/local.py @@ -9,11 +9,12 @@ import uuid from pathlib import Path +from loguru import logger + from hera_librarian.transfers.core import CoreTransferManager from hera_librarian.utils import ( - get_checksum_from_path, - get_hash_function_from_hash, compare_checksums, + get_checksum_from_path, get_size_from_path, get_type_from_path, ) @@ -99,6 +100,11 @@ def resolve_path_staging(self, path: Path | str) -> Path: def stage(self, file_size: int, file_name: Path) -> tuple[Path]: if file_size > self.free_space: + logger.error( + "Not enough free space on store. Requested: {}, Available: {}", + file_size, + self.free_space, + ) raise ValueError("Not enough free space on store") # TODO: Do we want to actually keep track of files we have staged? @@ -127,7 +133,10 @@ def unstage(self, path: Path): # It's not a directory. Delete it. os.remove(complete_path) except OSError: - # Directory is not empty. Delete it and all its contents. Unfortunately we can't log this.. + # Directory is not empty. Delete it and all its contents. + logger.warning( + f"Directory {complete_path} is not empty. Deleting all contents" + ) shutil.rmtree(complete_path) # Check if the parent is still in the staging area. We don't want @@ -157,7 +166,10 @@ def delete(self, path: Path): # It's not a directory. Delete it. os.remove(complete_path) except OSError: - # Directory is not empty. Delete it and all its contents. Unfortunately we can't log this.. + # Directory is not empty. Delete it and all its contents. + logger.warning( + f"Directory {complete_path} is not empty. Deleting all contents" + ) shutil.rmtree(complete_path) # Check if the parent is empty. We don't want to leave dregs! @@ -214,12 +226,19 @@ def commit(self, staging_path: Path, store_path: Path): if not copy_success: # We need to clean up self.delete(store_path) - + + logger.error( + "Could not copy {} to {} after {} attempts.", + resolved_path_staging, + resolved_path_store, + retries, + ) + raise ValueError( f"Could not copy {resolved_path_staging} to {resolved_path_store} " f"after {retries} attempts." ) - + try: # Set permissions and ownership. def set_for_file(file: Path): @@ -249,6 +268,10 @@ def set_for_file(file: Path): for x in dirs + files: set_for_file(Path(root) / x) except ValueError: + logger.error( + "Can not set permissions on {}", + resolved_path_store, + ) raise PermissionError(f"Could not set permissions on {resolved_path_store}") return diff --git a/pyproject.toml b/pyproject.toml index 1e6094d..9321158 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,9 @@ dependencies = [ "sqlalchemy >= 2", "sysrsync == 1.1.1", "uvicorn", - "asyncer==0.0.8" + "asyncer==0.0.8", + "loguru", + "notifiers" ] authors = [ {name = "Josh Borrow"}, diff --git a/tests/server_unit_test/test_error.py b/tests/server_unit_test/test_error.py index 71ee8e7..69d27cc 100644 --- a/tests/server_unit_test/test_error.py +++ b/tests/server_unit_test/test_error.py @@ -10,83 +10,6 @@ ) -def test_error_to_db(test_server, test_orm): - # Don't import until we've set up server settings for logging. - from librarian_server.logger import log_to_database - - _, session_maker, _ = test_server - - with session_maker() as session: - starting_errors = session.query(test_orm.Error).count() - - log_to_database( - ErrorSeverity.CRITICAL, ErrorCategory.DATA_AVAILABILITY, "test", session - ) - log_to_database( - ErrorSeverity.INFO, ErrorCategory.DATA_AVAILABILITY, "test", session - ) - log_to_database( - ErrorSeverity.WARNING, ErrorCategory.DATA_AVAILABILITY, "test", session - ) - log_to_database( - ErrorSeverity.CRITICAL, ErrorCategory.DATA_INTEGRITY, "test", session - ) - - # Check that they were logged correctly - with session_maker() as session: - errors = session.query(test_orm.Error).all() - - assert len(errors) == 4 + starting_errors - - for error in errors[starting_errors:]: - assert error.message == "test" - assert error.cleared is False - assert error.cleared_time is None - - assert errors[0].severity == ErrorSeverity.CRITICAL - assert errors[0].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[1].severity == ErrorSeverity.INFO - assert errors[1].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[2].severity == ErrorSeverity.WARNING - assert errors[2].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[3].severity == ErrorSeverity.CRITICAL - assert errors[3].category == ErrorCategory.DATA_INTEGRITY - - # Check we can clear them - - with session_maker() as session: - errors = session.query(test_orm.Error).all() - - for error in errors: - error.clear(session) - - # Check that they were cleared correctly - with session_maker() as session: - errors = session.query(test_orm.Error).all() - - assert len(errors) == 4 - - for error in errors: - assert error.message == "test" - assert error.cleared is True - assert error.cleared_time is not None - - assert errors[0].severity == ErrorSeverity.CRITICAL - assert errors[0].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[1].severity == ErrorSeverity.INFO - assert errors[1].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[2].severity == ErrorSeverity.WARNING - assert errors[2].category == ErrorCategory.DATA_AVAILABILITY - - assert errors[3].severity == ErrorSeverity.CRITICAL - assert errors[3].category == ErrorCategory.DATA_INTEGRITY - - def test_clear_endpoint(test_server_with_many_files_and_errors, test_client, test_orm): """ Test the clear endpoint. diff --git a/tests/server_unit_test/test_post_to_slack.py b/tests/server_unit_test/test_post_to_slack.py index e8344c9..abf5546 100644 --- a/tests/server_unit_test/test_post_to_slack.py +++ b/tests/server_unit_test/test_post_to_slack.py @@ -4,47 +4,3 @@ import datetime import inspect - - -def test_post_to_slack(): - # This is a manual-only test. You will need to have a valid install. - # If you want to run this test, you will need to set the following - # env vars: LIBRARIAN_SERVER_SLACK_WEBHOOK_ENABLE, - # LIBRARIAN_SERVER_SLACK_WEBHOOK_URL - try: - from librarian_server.settings import server_settings - except: - return - - if not server_settings.slack_webhook_enable: - return - - from librarian_server.logger import ( - ErrorCategory, - ErrorSeverity, - post_error_to_slack, - ) - - class MockError: - def __init__(self, severity, category, message, id): - self.message = message - self.category = category - self.severity = severity - self.id = id - self.raised_time = datetime.datetime.now(datetime.timezone.utc) - self.caller = ( - inspect.stack()[1].filename - + ":" - + inspect.stack()[1].function - + ":" - + str(inspect.stack()[1].lineno) - ) - - post_error_to_slack( - MockError( - ErrorSeverity.CRITICAL, - ErrorCategory.DATA_AVAILABILITY, - "This is a test message, please ignore.", - 12345, - ) - )