Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for identify_file_format script #1974

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ warn_redundant_casts = True
warn_unused_configs = True


[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.normalize,src.MCPClient.*.validate_file, src.MCPClient.*.policy_check]
[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.identify_file_format,src.MCPClient.*.normalize,src.MCPClient.*.validate_file, src.MCPClient.*.policy_check]

check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand All @@ -20,7 +21,8 @@ warn_return_any = True
warn_unused_ignores = True


[mypy-tests.MCPClient.conftest,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file,tests.MCPClient.test_policy_check]
[mypy-tests.MCPClient.conftest,tests.MCPClient.test_identify_file_format,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file,tests.MCPClient.test_policy_check]

check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand Down
94 changes: 62 additions & 32 deletions src/MCPClient/lib/clientScripts/identify_file_format.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#!/usr/bin/env python
import argparse
import dataclasses
import multiprocessing
import uuid
from typing import List
from typing import Optional

import django

django.setup()
# dashboard

from client.job import Job
from databaseFunctions import insertIntoEvents
from django.db import transaction
from django.utils import timezone

# archivematicaCommon
from executeOrRunSubProcess import executeOrRun
from fpr.models import FormatVersion
from fpr.models import IDCommand
Expand All @@ -21,12 +23,23 @@
from main.models import FileID
from main.models import UnitVariable

SUCCESS = 0
ERROR = 255


def concurrent_instances():
@dataclasses.dataclass
class IdentifyFileFormatArgs:
idcommand: str
file_path: str
file_uuid: str
disable_reidentify: bool


def concurrent_instances() -> int:
return multiprocessing.cpu_count()


def _save_id_preference(file_, value):
def _save_id_preference(file_: File, value: bool) -> None:
"""
Saves whether file format identification is being used.

Expand All @@ -35,22 +48,25 @@ def _save_id_preference(file_, value):
variable, which will be transformed back into a passVar when a new chain in
the same unit is begun.
"""
value = str(value)

# The unit_uuid foreign key can point to a transfer or SIP, and this tool
# runs in both.
# Check the SIP first - if it hasn't been assigned yet, then this is being
# run during the transfer.
unit = file_.sip or file_.transfer

rd = {"%IDCommand%": value}
rd = {"%IDCommand%": str(value)}

UnitVariable.objects.create(
unituuid=unit.pk, variable="replacementDict", variablevalue=str(rd)
)


def write_identification_event(file_uuid, command, format=None, success=True):
def write_identification_event(
file_uuid: str,
command: IDCommand,
format: Optional[str] = None,
success: bool = True,
) -> None:
event_detail_text = (
f'program="{command.tool.description}"; version="{command.tool.version}"'
)
Expand All @@ -75,7 +91,7 @@ def write_identification_event(file_uuid, command, format=None, success=True):
)


def write_file_id(file_uuid, format, output):
def write_file_id(file_uuid: str, format: FormatVersion, output: str) -> None:
"""
Write the identified format to the DB.

Expand All @@ -102,24 +118,26 @@ def write_file_id(file_uuid, format, output):
)


def _default_idcommand():
def _default_idcommand() -> IDCommand:
"""Retrieve the default ``fpr.IDCommand``.

We only expect to find one command enabled/active.
"""
return IDCommand.active.first()


def main(job, enabled, file_path, file_uuid, disable_reidentify):
enabled = True if enabled == "True" else False
if not enabled:
def main(
job: Job, enabled: str, file_path: str, file_uuid: str, disable_reidentify: bool
) -> int:
enabled_bool = True if enabled == "True" else False
if not enabled_bool:
job.print_output("Skipping file format identification")
return 0
return SUCCESS

command = _default_idcommand()
if command is None:
job.write_error("Unable to determine IDCommand.\n")
return 255
return ERROR

command_uuid = command.uuid
job.print_output("IDCommand:", command.description)
Expand All @@ -138,11 +156,11 @@ def main(job, enabled, file_path, file_uuid, disable_reidentify):
job.print_output(
"This file has already been identified, and re-identification is disabled. Skipping."
)
return 0
return SUCCESS

# Save whether identification was enabled by the user for use in a later
# chain.
_save_id_preference(file_, enabled)
_save_id_preference(file_, enabled_bool)

exitcode, output, err = executeOrRun(
command.script_type,
Expand All @@ -156,49 +174,49 @@ def main(job, enabled, file_path, file_uuid, disable_reidentify):
if exitcode != 0:
job.print_error(f"Error: IDCommand with UUID {command_uuid} exited non-zero.")
job.print_error(f"Error: {err}")
return 255
return ERROR

job.print_output("Command output:", output)
# PUIDs are the same regardless of tool, so PUID-producing tools don't have "rules" per se - we just
# go straight to the FormatVersion table to see if there's a matching PUID
try:
if command.config == "PUID":
version = FormatVersion.active.get(pronom_id=output)
format_version = FormatVersion.active.get(pronom_id=output)
else:
rule = IDRule.active.get(command_output=output, command=command)
version = rule.format
format_version = rule.format
except IDRule.DoesNotExist:
job.print_error(
f'Error: No FPR identification rule for tool output "{output}" found'
)
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR
except IDRule.MultipleObjectsReturned:
job.print_error(
f'Error: Multiple FPR identification rules for tool output "{output}" found'
)
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR
except FormatVersion.DoesNotExist:
job.print_error(f"Error: No FPR format record found for PUID {output}")
write_identification_event(file_uuid, command, success=False)
return 255
return ERROR

(ffv, created) = FileFormatVersion.objects.get_or_create(
file_uuid=file_, defaults={"format_version": version}
file_uuid=file_, defaults={"format_version": format_version}
)
if not created: # Update the version if it wasn't created new
ffv.format_version = version
ffv.format_version = format_version
ffv.save()
job.print_output(f"{file_path} identified as a {version.description}")
job.print_output(f"{file_path} identified as a {format_version.description}")

write_identification_event(file_uuid, command, format=version.pronom_id)
write_file_id(file_uuid=file_uuid, format=version, output=output)
write_identification_event(file_uuid, command, format=format_version.pronom_id)
write_file_id(file_uuid=file_uuid, format=format_version, output=output)

return 0
return SUCCESS


def call(jobs):
def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Identify file formats.")

# Since AM19 the accepted values are "True" or "False" since the ability to
Expand All @@ -215,10 +233,22 @@ def call(jobs):
help="Disable identification if it has already happened for this file.",
)

return parser


def parse_args(parser: argparse.ArgumentParser, job: Job) -> IdentifyFileFormatArgs:
namespace = parser.parse_args(job.args[1:])

return IdentifyFileFormatArgs(**vars(namespace))


def call(jobs: List[Job]) -> None:
parser = get_parser()

with transaction.atomic():
for job in jobs:
with job.JobContext():
args = parser.parse_args(job.args[1:])
args = parse_args(parser, job)
job.set_status(
main(
job,
Expand Down
17 changes: 17 additions & 0 deletions tests/MCPClient/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,35 @@ def fptool() -> fprmodels.FPTool:
return fprmodels.FPTool.objects.create()


@pytest.fixture
def idtool() -> fprmodels.IDTool:
return fprmodels.IDTool.objects.create()


@pytest.fixture
def fpcommand(fptool: fprmodels.FPTool) -> fprmodels.FPCommand:
return fprmodels.FPCommand.objects.create(tool=fptool)


@pytest.fixture
def idcommand(idtool: fprmodels.IDTool) -> fprmodels.IDCommand:
return fprmodels.IDCommand.objects.create(tool=idtool, config="PUID")


@pytest.fixture
def fprule(
fpcommand: fprmodels.FPCommand, format_version: fprmodels.FormatVersion
) -> fprmodels.FPRule:
return fprmodels.FPRule.objects.create(command=fpcommand, format=format_version)


@pytest.fixture
def idrule(
idcommand: fprmodels.IDCommand, format_version: fprmodels.FormatVersion
) -> fprmodels.IDRule:
return fprmodels.IDRule.objects.create(command=idcommand, format=format_version)


@pytest.fixture()
def fprule_characterization(fprule: fprmodels.FPRule) -> fprmodels.FPRule:
fprule.purpose = fprmodels.FPRule.CHARACTERIZATION
Expand Down
Loading