diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py index c993cc0..74e1a6c 100644 --- a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -1,7 +1,8 @@ # Copyright 2017 the HERA Collaboration # Licensed under the 2-clause BSD License. -"""Add librarian transfer toggling + +"""Add librarian transfer toggling and corruption Revision ID: 1def8c988372 Revises: 42f29c26ab0f @@ -24,6 +25,19 @@ def upgrade(): sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True) ) + op.create_table( + "corrupt_files", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("file_name", sa.String(), nullable=False), + sa.Column("instance_id", sa.Integer(), nullable=False), + sa.Column("corrupt_time", sa.DateTime(), nullable=False), + sa.Column("size", sa.BigInteger(), nullable=False), + sa.Column("checksum", sa.String(), nullable=False), + sa.Column("count", sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + def downgrade(): op.drop_column("librarians", "transfers_enabled") + op.drop_table("corrupt_files") diff --git a/hera_librarian/cli.py b/hera_librarian/cli.py index 4590f0d..d9ae81c 100644 --- a/hera_librarian/cli.py +++ b/hera_librarian/cli.py @@ -1749,7 +1749,6 @@ def config_set_librarian_transfer_subparser(sub_parsers): sp.add_argument( "--name", help="The name of the librarian to set the transfer state of." ) - grp = sp.add_mutually_exclusive_group() grp.add_argument( "--enabled", diff --git a/librarian_background/check_integrity.py b/librarian_background/check_integrity.py index 1e4d691..db01585 100644 --- a/librarian_background/check_integrity.py +++ b/librarian_background/check_integrity.py @@ -12,6 +12,7 @@ from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.database import get_session from librarian_server.orm import Instance, StoreMetadata +from librarian_server.orm.file import CorruptFile, File from .task import Task @@ -108,6 +109,26 @@ def core(self, session: Session): else: # File is not fine. Log it. all_files_fine = False + + # Do we already have a corrupt file? + corrupt_file = ( + session.query(CorruptFile) + .filter_by(instance_id=file.id) + .one_or_none() + ) + + if corrupt_file is None: + corrupt_file = CorruptFile.new_corrupt_file( + instance=file, + size=path_info.size, + checksum=path_info.checksum, + ) + session.add(corrupt_file) + session.commit() + else: + corrupt_file.count += 1 + session.commit() + logger.error( "Instance {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance: {})", file.path, diff --git a/librarian_background/queues.py b/librarian_background/queues.py index 9b34cfc..8fff403 100644 --- a/librarian_background/queues.py +++ b/librarian_background/queues.py @@ -157,11 +157,11 @@ def check_on_consumed( # We can't do anything with this librarian, but there may be other # librarians that are enabled. continue - + logger.info( "Handling queue item {q.id} with {q.retries} retries", q=queue_item ) - + current_status = queue_item.async_transfer_manager.transfer_status( settings=server_settings ) diff --git a/librarian_server/api/validate.py b/librarian_server/api/validate.py index 28d0ea1..6781e81 100644 --- a/librarian_server/api/validate.py +++ b/librarian_server/api/validate.py @@ -25,7 +25,8 @@ from ..database import yield_session from ..logger import log -from ..orm.file import File +from ..orm.file import CorruptFile, File +from ..orm.instance import Instance from ..orm.librarian import Librarian from ..settings import server_settings from .auth import ReadonlyUserDependency @@ -193,11 +194,31 @@ async def validate_file( for info in checksum_info: if not info.computed_same_checksum and info.librarian == server_settings.name: + # Add the corrupt file to the database, though check if we already have + # it first. + query = select(CorruptFile).filter(CorruptFile.file_name == file.name) + + corrupt_file = session.execute(query).one_or_none() + + if corrupt_file is not None: + corrupt_file.count += 1 + session.commit() + continue + else: + corrupt_file = CorruptFile.new_corrupt_file( + instance=session.get(Instance, info.instance_id), + size=info.current_size, + checksum=info.current_checksum, + ) + session.add(corrupt_file) + session.commit() + log.error( "File validation failed, the checksums do not match for file " - "{} in store {}.", + "{} in store {}. CorruptFile: {}", request.file_name, info.store, + corrupt_file.id, ) return FileValidationResponse(checksum_info) diff --git a/librarian_server/orm/__init__.py b/librarian_server/orm/__init__.py index 8fd1beb..91b7fc4 100644 --- a/librarian_server/orm/__init__.py +++ b/librarian_server/orm/__init__.py @@ -3,7 +3,7 @@ """ from .errors import Error -from .file import File +from .file import CorruptFile, File from .instance import Instance, RemoteInstance from .librarian import Librarian from .sendqueue import SendQueue diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index fad87c3..a7d04e5 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -145,3 +145,68 @@ def delete( if commit: session.commit() + + +class CorruptFile(db.Base): + """ + An ORM object for a file that has been marked as (potentially) corrupt + during a check. This will need to be verified and fixed. + """ + + __tablename__ = "corrupt_files" + + id: int = db.Column(db.Integer, primary_key=True) + "The ID of the corrupt file." + file_name: str = db.Column( + db.String(256), db.ForeignKey("files.name"), nullable=False + ) + "The name of the file." + file = db.relationship("File", primaryjoin="CorruptFile.file_name == File.name") + "The file object associated with this." + instance_id: int = db.Column(db.Integer, db.ForeignKey("instances.id")) + "The instance ID of the corrupt file." + instance = db.relationship( + "Instance", primaryjoin="CorruptFile.instance_id == Instance.id" + ) + "The instance object associated with this." + corrupt_time: datetime = db.Column(db.DateTime) + "The time at which the file was marked as corrupt." + size: int = db.Column(db.BigInteger) + "The size of the file in bytes." + checksum: str = db.Column(db.String(256)) + "The checksum of the file that was re-computed and found to be incorrect." + count: int = db.Column(db.Integer) + "The number of times this file has been marked as corrupt." + + @classmethod + def new_corrupt_file( + cls, instance: Instance, size: int, checksum: str + ) -> "CorruptFile": + """ + Create a new corrupt file object. + + Parameters + ---------- + file : File + The file that is corrupt. + size : int + The size of the file in bytes. + checksum : str + The checksum of the file that was re-computed and found to be incorrect. + + Returns + ------- + CorruptFile + The new corrupt file object. + """ + + return CorruptFile( + file_name=instance.file.name, + file=instance.file, + instance_id=instance.id, + instance=instance, + corrupt_time=datetime.now(timezone.utc), + size=size, + checksum=checksum, + count=1, + ) diff --git a/tests/background_unit_test/test_check_integrity.py b/tests/background_unit_test/test_check_integrity.py index f248693..1ed8a28 100644 --- a/tests/background_unit_test/test_check_integrity.py +++ b/tests/background_unit_test/test_check_integrity.py @@ -41,6 +41,12 @@ def test_check_integrity_failure(test_client, test_server_with_invalid_file, tes ) assert integrity_task() == False + # Go check the database! + with get_session() as session: + corrupt_file = session.query(test_orm.CorruptFile).first() + assert corrupt_file is not None + assert corrupt_file.count >= 1 + def test_check_integrity_invalid_store(test_client, test_server, test_orm): """