Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend test coverage of policy check #1975

Merged
merged 7 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ explicit_package_bases = True
warn_redundant_casts = True
warn_unused_configs = True

[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.normalize,src.MCPClient.*.validate_file]

[mypy-src.MCPClient.lib.client.*,src.MCPClient.*.normalize,src.MCPClient.*.validate_file, src.MCPClient.*.policy_check]
check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand All @@ -18,7 +19,8 @@ strict_equality = True
warn_return_any = True
warn_unused_ignores = True

[mypy-tests.MCPClient.conftest,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file]

[mypy-tests.MCPClient.conftest,tests.MCPClient.test_normalize,tests.MCPClient.test_validate_file,tests.MCPClient.test_policy_check]
check_untyped_defs = True
disallow_any_generics = True
disallow_incomplete_defs = True
Expand Down
87 changes: 46 additions & 41 deletions src/MCPClient/lib/clientScripts/policy_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from custom_handlers import get_script_logger

django.setup()
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import databaseFunctions
from client.job import Job
from dicts import replace_string_values
from django.conf import settings as mcpclient_settings
from django.core.exceptions import ValidationError
Expand All @@ -39,7 +45,14 @@
FAIL_CODE = 1


def main(job, file_path, file_uuid, sip_uuid, shared_path, file_type):
def main(
job: Job,
file_path: str,
file_uuid: str,
sip_uuid: str,
shared_path: str,
file_type: str,
) -> int:
"""Entry point for policy checker."""
setup_dicts(mcpclient_settings)

Expand All @@ -61,7 +74,15 @@ class PolicyChecker:
``check`` method.
"""

def __init__(self, job, file_path, file_uuid, sip_uuid, shared_path, file_type):
def __init__(
self,
job: Job,
file_path: str,
file_uuid: Optional[str],
sip_uuid: str,
shared_path: str,
file_type: str,
) -> None:
"""Initiate a new policy check."""
self.job = job
self.file_path = file_path
Expand All @@ -73,11 +94,11 @@ def __init__(self, job, file_path, file_uuid, sip_uuid, shared_path, file_type):
self.is_manually_normalized_access_derivative = (
self._get_is_manually_normalized_access_derivative()
)
self._sip_logs_dir = None
self._sip_subm_doc_dir = None
self._sip_policy_checks_dir = None
self._sip_logs_dir: Optional[str] = None
self._sip_subm_doc_dir: Optional[str] = None
self._sip_policy_checks_dir: Optional[str] = None

def check(self):
def check(self) -> int:
"""Check the file identified by ``self.file_uuid`` against any
policy-check FPR commands that are applicable. If any fail, return a
non-zero exit code; otherwise return ``0``.
Expand Down Expand Up @@ -110,7 +131,7 @@ def check(self):

purpose = "policy_check"

def _we_check_this_type_of_file(self):
def _we_check_this_type_of_file(self) -> bool:
"""Return ``True`` if this policy checker should perform a check on
this file; ``False`` otherwise. This will depend on ``self.file_type``
and on whether the file is an original or a preservation/access
Expand Down Expand Up @@ -143,17 +164,15 @@ def _we_check_this_type_of_file(self):
else:
return True

def _file_is_derivative(self, for_access=False):
def _file_is_derivative(self, for_access: bool = False) -> bool:
"""Return ``True`` if the target file is a derivative; ``False``
otherwise.
"""
if self.is_manually_normalized_access_derivative:
return True
# Access derivatives have Derivation rows with NULL event types (cf.
# normalize.py client script).
event_type = "normalization"
if for_access:
event_type = None
event_type = None if for_access else "normalization"
try:
Derivation.objects.get(
derived_file__uuid=self.file_uuid, event__event_type=event_type
Expand All @@ -162,12 +181,12 @@ def _file_is_derivative(self, for_access=False):
except (Derivation.DoesNotExist, ValidationError):
return False

def _get_policies_dir(self):
def _get_policies_dir(self) -> str:
return os.path.join(
self.shared_path, "sharedMicroServiceTasksConfigs", "policies"
)

def _get_is_manually_normalized_access_derivative(self):
def _get_is_manually_normalized_access_derivative(self) -> bool:
"""Manually normalized access derivatives are never given UUIDs.
Therefore, we need this heuristic for determining if that is what we
are dealing with. TODO/QUESTION: will this return false positives?
Expand All @@ -178,7 +197,7 @@ def _get_is_manually_normalized_access_derivative(self):
return True
return False

def _file_is_for_access(self):
def _file_is_for_access(self) -> bool:
"""Return ``True`` if the file with UUID ``self.file_uuid`` is "for"
access.
"""
Expand All @@ -189,7 +208,7 @@ def _file_is_for_access(self):
return True
return False

def _get_manually_normalized_access_derivative_file_uuid(self):
def _get_manually_normalized_access_derivative_file_uuid(self) -> Optional[File]:
"""If the file-to-be-policy-checked is a manually normalized access
derivative it will have no file UUID in the database. We therefore have
to retrieve the UUID of the original file that was format-identified,
Expand All @@ -209,7 +228,7 @@ def _get_manually_normalized_access_derivative_file_uuid(self):
except (File.DoesNotExist, File.MultipleObjectsReturned, ValidationError):
return None

def _get_rules(self):
def _get_rules(self) -> FPRule:
"""Return the FPR rules with purpose ``self.purpose`` and that apply to
the type/format of file given as input.
"""
Expand All @@ -227,7 +246,7 @@ def _get_rules(self):
rules = FPRule.active.filter(purpose=f"default_{self.purpose}")
return rules

def _execute_rule_command(self, rule):
def _execute_rule_command(self, rule: FPRule) -> str:
"""Execute the FPR command of FPR rule ``rule`` against the file passed
in to this client script. The output of that command determines what we
print to stdout and stderr, and the nature of the validation event that
Expand Down Expand Up @@ -296,7 +315,7 @@ def _execute_rule_command(self, rule):
)
return result

def _get_command_to_execute(self, rule):
def _get_command_to_execute(self, rule: FPRule) -> Tuple[str, List[str]]:
"""Return a 2-tuple consisting of a) the FPR rule ``rule``'s command
and b) a list of arguments to pass to it.
"""
Expand All @@ -313,14 +332,14 @@ def _get_command_to_execute(self, rule):
else:
return (rule.command.command, [self.file_path, self.policies_dir])

def _save_to_logs_dir(self, output):
def _save_to_logs_dir(self, output: Dict[str, str]) -> None:
replaceafill marked this conversation as resolved.
Show resolved Hide resolved
"""Save the MediaConch policy file as well as the raw MediaConch stdout
for the target file to the logs/ directory of the SIP.
"""
self._save_stdout_to_logs_dir(output)
self._save_policy_to_subm_doc_dir(output)

def _save_stdout_to_logs_dir(self, output):
def _save_stdout_to_logs_dir(self, output: Dict[str, str]) -> None:
"""Save the output of running MediaConch's policy checker against the
input file to
logs/policyChecks/<policy_filename>/<input_filename>.xml in the SIP.
Expand All @@ -337,7 +356,7 @@ def _save_stdout_to_logs_dir(self, output):
with open(stdout_path, "w") as f:
f.write(mc_stdout)

def _save_policy_to_subm_doc_dir(self, output):
def _save_policy_to_subm_doc_dir(self, output: Dict[str, str]) -> None:
"""Save the policy file text in ``output['policy']`` to a file named
``output['policyFileName']`` in
metadata/submissionDocumentation/policies/ in the SIP, if it is not
Expand All @@ -355,7 +374,7 @@ def _save_policy_to_subm_doc_dir(self, output):
fileo.write(policy)

@property
def sip_logs_dir(self):
def sip_logs_dir(self) -> Optional[str]:
"""Return the absolute path the logs/ directory of the SIP (or
Transfer) that the target file is a part of.
"""
Expand Down Expand Up @@ -398,7 +417,7 @@ def sip_logs_dir(self):
return None

@property
def sip_subm_doc_dir(self):
def sip_subm_doc_dir(self) -> Optional[str]:
"""Return the absolute path the metadata/submissionDocumentation/
directory of the SIP that the target file is a part of.
"""
Expand Down Expand Up @@ -428,7 +447,7 @@ def sip_subm_doc_dir(self):
return None

@property
def sip_policy_checks_dir(self):
def sip_policy_checks_dir(self) -> Optional[str]:
if self._sip_policy_checks_dir:
return self._sip_policy_checks_dir
if self.sip_logs_dir:
Expand All @@ -445,29 +464,15 @@ def sip_policy_checks_dir(self):
return self._sip_policy_checks_dir


def _get_shared_path(argv):
try:
return argv[4]
except IndexError:
return None


def _get_file_type(argv):
try:
return argv[5]
except IndexError:
return "original"


def call(jobs):
def call(jobs: List[Job]) -> None:
with transaction.atomic():
for job in jobs:
with job.JobContext(logger=logger):
file_path = job.args[1]
file_uuid = job.args[2]
sip_uuid = job.args[3]
shared_path = _get_shared_path(job.args)
file_type = _get_file_type(job.args)
shared_path = job.args[4]
file_type = job.args[5]
replaceafill marked this conversation as resolved.
Show resolved Hide resolved

try:
job.set_status(
Expand Down
8 changes: 8 additions & 0 deletions tests/MCPClient/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def fprule_preservation(fprule: fprmodels.FPRule) -> fprmodels.FPRule:
return fprule


@pytest.fixture
def fprule_policy_check(fprule: fprmodels.FPRule) -> fprmodels.FPRule:
fprule.purpose = fprmodels.FPRule.POLICY
fprule.save()

return fprule


@pytest.fixture
def fprule_thumbnail(fprule: fprmodels.FPRule) -> fprmodels.FPRule:
fprule.purpose = fprmodels.FPRule.THUMBNAIL
Expand Down
Loading