Skip to content

Commit

Permalink
Add new corrupt files table (#110)
Browse files Browse the repository at this point in the history
* Add client ability to shut down transfers without configuration edits

* Actually add the sub-parser to CLI

* Add docs

* Add corrupt file ORM and integration

* Add table migration for corrupt files

* Check for correct corrupt file creation in test

* Remove incorrect type annotations

* Import CorruptFile into orm

* Integrity failure may have already happened

* Formatting

* Fix minor issues caused by merge confilct
  • Loading branch information
JBorrow authored Nov 14, 2024
1 parent 3f7fec0 commit e15f99b
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 7 deletions.
16 changes: 15 additions & 1 deletion alembic/versions/1def8c988372_add_librarian_transfer_toggling.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright 2017 the HERA Collaboration
# Licensed under the 2-clause BSD License.

"""Add librarian transfer toggling

"""Add librarian transfer toggling and corruption
Revision ID: 1def8c988372
Revises: 42f29c26ab0f
Expand All @@ -24,6 +25,19 @@ def upgrade():
sa.Column("transfers_enabled", sa.Boolean(), nullable=False, default=True)
)

op.create_table(
"corrupt_files",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("file_name", sa.String(), nullable=False),
sa.Column("instance_id", sa.Integer(), nullable=False),
sa.Column("corrupt_time", sa.DateTime(), nullable=False),
sa.Column("size", sa.BigInteger(), nullable=False),
sa.Column("checksum", sa.String(), nullable=False),
sa.Column("count", sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)


def downgrade():
op.drop_column("librarians", "transfers_enabled")
op.drop_table("corrupt_files")
1 change: 0 additions & 1 deletion hera_librarian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,6 @@ def config_set_librarian_transfer_subparser(sub_parsers):
sp.add_argument(
"--name", help="The name of the librarian to set the transfer state of."
)

grp = sp.add_mutually_exclusive_group()
grp.add_argument(
"--enabled",
Expand Down
21 changes: 21 additions & 0 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.orm import Instance, StoreMetadata
from librarian_server.orm.file import CorruptFile, File

from .task import Task

Expand Down Expand Up @@ -108,6 +109,26 @@ def core(self, session: Session):
else:
# File is not fine. Log it.
all_files_fine = False

# Do we already have a corrupt file?
corrupt_file = (
session.query(CorruptFile)
.filter_by(instance_id=file.id)
.one_or_none()
)

if corrupt_file is None:
corrupt_file = CorruptFile.new_corrupt_file(
instance=file,
size=path_info.size,
checksum=path_info.checksum,
)
session.add(corrupt_file)
session.commit()
else:
corrupt_file.count += 1
session.commit()

logger.error(
"Instance {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance: {})",
file.path,
Expand Down
4 changes: 2 additions & 2 deletions librarian_background/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ def check_on_consumed(
# We can't do anything with this librarian, but there may be other
# librarians that are enabled.
continue

logger.info(
"Handling queue item {q.id} with {q.retries} retries", q=queue_item
)

current_status = queue_item.async_transfer_manager.transfer_status(
settings=server_settings
)
Expand Down
25 changes: 23 additions & 2 deletions librarian_server/api/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

from ..database import yield_session
from ..logger import log
from ..orm.file import File
from ..orm.file import CorruptFile, File
from ..orm.instance import Instance
from ..orm.librarian import Librarian
from ..settings import server_settings
from .auth import ReadonlyUserDependency
Expand Down Expand Up @@ -193,11 +194,31 @@ async def validate_file(

for info in checksum_info:
if not info.computed_same_checksum and info.librarian == server_settings.name:
# Add the corrupt file to the database, though check if we already have
# it first.
query = select(CorruptFile).filter(CorruptFile.file_name == file.name)

corrupt_file = session.execute(query).one_or_none()

if corrupt_file is not None:
corrupt_file.count += 1
session.commit()
continue
else:
corrupt_file = CorruptFile.new_corrupt_file(
instance=session.get(Instance, info.instance_id),
size=info.current_size,
checksum=info.current_checksum,
)
session.add(corrupt_file)
session.commit()

log.error(
"File validation failed, the checksums do not match for file "
"{} in store {}.",
"{} in store {}. CorruptFile: {}",
request.file_name,
info.store,
corrupt_file.id,
)

return FileValidationResponse(checksum_info)
2 changes: 1 addition & 1 deletion librarian_server/orm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from .errors import Error
from .file import File
from .file import CorruptFile, File
from .instance import Instance, RemoteInstance
from .librarian import Librarian
from .sendqueue import SendQueue
Expand Down
65 changes: 65 additions & 0 deletions librarian_server/orm/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,68 @@ def delete(

if commit:
session.commit()


class CorruptFile(db.Base):
"""
An ORM object for a file that has been marked as (potentially) corrupt
during a check. This will need to be verified and fixed.
"""

__tablename__ = "corrupt_files"

id: int = db.Column(db.Integer, primary_key=True)
"The ID of the corrupt file."
file_name: str = db.Column(
db.String(256), db.ForeignKey("files.name"), nullable=False
)
"The name of the file."
file = db.relationship("File", primaryjoin="CorruptFile.file_name == File.name")
"The file object associated with this."
instance_id: int = db.Column(db.Integer, db.ForeignKey("instances.id"))
"The instance ID of the corrupt file."
instance = db.relationship(
"Instance", primaryjoin="CorruptFile.instance_id == Instance.id"
)
"The instance object associated with this."
corrupt_time: datetime = db.Column(db.DateTime)
"The time at which the file was marked as corrupt."
size: int = db.Column(db.BigInteger)
"The size of the file in bytes."
checksum: str = db.Column(db.String(256))
"The checksum of the file that was re-computed and found to be incorrect."
count: int = db.Column(db.Integer)
"The number of times this file has been marked as corrupt."

@classmethod
def new_corrupt_file(
cls, instance: Instance, size: int, checksum: str
) -> "CorruptFile":
"""
Create a new corrupt file object.
Parameters
----------
file : File
The file that is corrupt.
size : int
The size of the file in bytes.
checksum : str
The checksum of the file that was re-computed and found to be incorrect.
Returns
-------
CorruptFile
The new corrupt file object.
"""

return CorruptFile(
file_name=instance.file.name,
file=instance.file,
instance_id=instance.id,
instance=instance,
corrupt_time=datetime.now(timezone.utc),
size=size,
checksum=checksum,
count=1,
)
6 changes: 6 additions & 0 deletions tests/background_unit_test/test_check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def test_check_integrity_failure(test_client, test_server_with_invalid_file, tes
)
assert integrity_task() == False

# Go check the database!
with get_session() as session:
corrupt_file = session.query(test_orm.CorruptFile).first()
assert corrupt_file is not None
assert corrupt_file.count >= 1


def test_check_integrity_invalid_store(test_client, test_server, test_orm):
"""
Expand Down

0 comments on commit e15f99b

Please sign in to comment.