From 6310d85a53a6cd03aa1a7c6f2956be318d2d6489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Thu, 31 Oct 2024 18:00:47 +0000 Subject: [PATCH 1/2] Defer FPRule counter updates during normalization --- src/MCPClient/lib/clientScripts/normalize.py | 76 ++++++++++++++++++- src/MCPClient/lib/clientScripts/transcoder.py | 15 ++-- tests/MCPClient/test_normalize.py | 27 ++++--- 3 files changed, 97 insertions(+), 21 deletions(-) diff --git a/src/MCPClient/lib/clientScripts/normalize.py b/src/MCPClient/lib/clientScripts/normalize.py index 6e240b6b00..67d20858ae 100755 --- a/src/MCPClient/lib/clientScripts/normalize.py +++ b/src/MCPClient/lib/clientScripts/normalize.py @@ -8,9 +8,14 @@ import shutil import traceback import uuid +from collections import defaultdict +from types import TracebackType from typing import Callable +from typing import DefaultDict +from typing import Dict from typing import List from typing import Optional +from typing import Type import django import transcoder @@ -20,10 +25,12 @@ import databaseFunctions import fileOperations from client.job import Job +from custom_handlers import get_script_logger from dicts import ReplacementDict from django.conf import settings as mcpclient_settings from django.core.exceptions import ValidationError from django.db import transaction +from django.db.models import F from fpr.models import FPRule from lib import setup_dicts from main.models import Derivation @@ -31,6 +38,8 @@ from main.models import FileFormatVersion from main.models import FileID +logger = get_script_logger("archivematica.mcp.client.normalize") + # Return codes SUCCESS = 0 RULE_FAILED = 1 @@ -355,7 +364,59 @@ def get_default_rule(purpose: str) -> FPRule: return FPRule.active.get(purpose="default_" + purpose) -def main(job: Job, opts: NormalizeArgs) -> int: +class DeferredFPRuleCounter: + """Deferred counter for FPRule attempts, successes, and failures. + + This class postpones database writes to aggregate updates and minimize the + duration of transactions, which is beneficial when dealing with long-running + batches. + """ + + def __init__(self) -> None: + self._counters: DefaultDict[uuid.UUID, Dict[str, int]] = defaultdict( + lambda: {"count_attempts": 0, "count_okay": 0, "count_not_okay": 0} + ) + + def __enter__(self) -> "DeferredFPRuleCounter": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + try: + self.save() + except Exception as err: + logger.error("Failed to save counters: %s", err, exc_info=True) + + def record_attempt(self, fprule: FPRule) -> None: + self._counters[fprule.uuid]["count_attempts"] += 1 + + def record_success(self, fprule: FPRule) -> None: + self._counters[fprule.uuid]["count_okay"] += 1 + + def record_failure(self, fprule: FPRule) -> None: + self._counters[fprule.uuid]["count_not_okay"] += 1 + + def save(self) -> None: + """Persist all aggregated FPRule counters in a single transaction. + + This method updates the success and failure rates of FPRules by + incrementing their respective counters. It uses Django's F() expressions + to ensure atomic updates and prevent race conditions. + """ + with transaction.atomic(): + for fprule_id, increments in self._counters.items(): + FPRule.objects.filter(uuid=fprule_id).update( + count_attempts=F("count_attempts") + increments["count_attempts"], + count_okay=F("count_okay") + increments["count_okay"], + count_not_okay=F("count_not_okay") + increments["count_not_okay"], + ) + + +def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int: """Find and execute normalization commands on input file.""" # TODO fix for maildir working only on attachments @@ -489,7 +550,13 @@ def main(job: Job, opts: NormalizeArgs) -> int: replacement_dict = get_replacement_dict(job, opts) cl = transcoder.CommandLinker( - job, rule, command, replacement_dict, opts, once_normalized_callback(job) + job, + rule, + command, + replacement_dict, + opts, + once_normalized_callback(job), + counter, ) exitstatus = cl.execute() @@ -540,6 +607,7 @@ def main(job: Job, opts: NormalizeArgs) -> int: replacement_dict, opts, once_normalized_callback(job), + counter, ) exitstatus = cl.execute() @@ -611,7 +679,7 @@ def parse_args(parser: argparse.ArgumentParser, job: Job) -> NormalizeArgs: def call(jobs: List[Job]) -> None: parser = get_parser() - with transaction.atomic(): + with DeferredFPRuleCounter() as counter, transaction.atomic(): for job in jobs: with job.JobContext(): opts = parse_args(parser, job) @@ -625,7 +693,7 @@ def call(jobs: List[Job]) -> None: continue try: - job.set_status(main(job, opts)) + job.set_status(main(job, opts, counter)) except Exception as e: job.print_error(str(e)) job.set_status(1) diff --git a/src/MCPClient/lib/clientScripts/transcoder.py b/src/MCPClient/lib/clientScripts/transcoder.py index 72822f90f8..d4ad2621bc 100755 --- a/src/MCPClient/lib/clientScripts/transcoder.py +++ b/src/MCPClient/lib/clientScripts/transcoder.py @@ -15,7 +15,6 @@ # # You should have received a copy of the GNU General Public License # along with Archivematica. If not, see . -from django.db.models import F from executeOrRunSubProcess import executeOrRun @@ -109,7 +108,9 @@ def execute(self, skip_on_success=False): class CommandLinker: - def __init__(self, job, fprule, command, replacement_dict, opts, on_success): + def __init__( + self, job, fprule, command, replacement_dict, opts, on_success, counter + ): self.fprule = fprule self.command = command self.replacement_dict = replacement_dict @@ -118,6 +119,7 @@ def __init__(self, job, fprule, command, replacement_dict, opts, on_success): self.commandObject = Command( job, self.command, replacement_dict, self.on_success, opts ) + self.counter = counter def __str__(self): return ( @@ -128,13 +130,10 @@ def execute(self): """Execute the command, and track the success statistics. Returns 0 on success, non-0 on failure.""" - # Track success/failure rates of FP Rules - # Use Django's F() to prevent race condition updating the counts - self.fprule.count_attempts = F("count_attempts") + 1 + self.counter.record_attempt(self.fprule) ret = self.commandObject.execute() if ret: - self.fprule.count_not_okay = F("count_not_okay") + 1 + self.counter.record_failure(self.fprule) else: - self.fprule.count_okay = F("count_okay") + 1 - self.fprule.save() + self.counter.record_success(self.fprule) return ret diff --git a/tests/MCPClient/test_normalize.py b/tests/MCPClient/test_normalize.py index 025a34d5a0..6bf986f2cb 100644 --- a/tests/MCPClient/test_normalize.py +++ b/tests/MCPClient/test_normalize.py @@ -45,7 +45,8 @@ def test_normalization_fails_if_original_file_does_not_exist() -> None: job = mock.Mock(spec=Job) opts = mock.Mock(file_uuid=file_uuid) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.NO_RULE_FOUND job.print_error.assert_called_once_with( @@ -67,7 +68,8 @@ def test_normalization_skips_submission_documentation_file_if_group_use_does_not normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS assert job.print_output.mock_calls == [ @@ -93,7 +95,8 @@ def test_normalization_skips_file_if_group_use_does_not_match( normalize_file_grp_use="access", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS assert job.print_output.mock_calls == [ @@ -178,7 +181,8 @@ def test_manual_normalization_creates_event_and_derivation( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS assert job.print_output.mock_calls == [ @@ -250,7 +254,8 @@ def test_manual_normalization_fails_with_invalid_normalization_csv( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.NO_RULE_FOUND assert job.print_error.mock_calls == [ @@ -297,7 +302,8 @@ def test_manual_normalization_matches_by_filename_instead_of_normalization_csv( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS assert job.print_error.mock_calls == [] @@ -350,7 +356,8 @@ def test_manual_normalization_matches_from_multiple_filenames( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS assert job.print_error.mock_calls == [] @@ -413,7 +420,8 @@ def test_normalization_falls_back_to_default_rule( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS command_linker.assert_called_once() @@ -474,7 +482,8 @@ def test_normalization_finds_rule_by_file_format_version( normalize_file_grp_use="original", ) - result = normalize.main(job, opts) + with normalize.DeferredFPRuleCounter() as counter: + result = normalize.main(job, opts, counter) assert result == normalize.SUCCESS command_linker.assert_called_once() From 09ed00eb11da29574d1345f63c61068f250a7bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Fri, 1 Nov 2024 06:12:58 +0000 Subject: [PATCH 2/2] Remove unnecessary fields from CommandLinker --- src/MCPClient/lib/clientScripts/normalize.py | 6 +++--- src/MCPClient/lib/clientScripts/transcoder.py | 18 +++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/MCPClient/lib/clientScripts/normalize.py b/src/MCPClient/lib/clientScripts/normalize.py index 67d20858ae..b3c9f61358 100755 --- a/src/MCPClient/lib/clientScripts/normalize.py +++ b/src/MCPClient/lib/clientScripts/normalize.py @@ -573,8 +573,8 @@ def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int: if ( exitstatus != 0 and opts.purpose in ("access", "thumbnail") - and cl.commandObject.output_location - and (not os.path.isfile(cl.commandObject.output_location)) + and cl.output_location + and (not os.path.isfile(cl.output_location)) ): # Fall back to default rule try: @@ -614,7 +614,7 @@ def main(job: Job, opts: NormalizeArgs, counter: DeferredFPRuleCounter) -> int: # Store thumbnails locally for use during AIP searches # TODO is this still needed, with the storage service? if "thumbnail" in opts.purpose: - thumbnail_filepath = cl.commandObject.output_location + thumbnail_filepath = cl.output_location thumbnail_storage_dir = os.path.join( mcpclient_settings.SHARED_DIRECTORY, "www", "thumbnails", opts.sip_uuid ) diff --git a/src/MCPClient/lib/clientScripts/transcoder.py b/src/MCPClient/lib/clientScripts/transcoder.py index d4ad2621bc..354f270e27 100755 --- a/src/MCPClient/lib/clientScripts/transcoder.py +++ b/src/MCPClient/lib/clientScripts/transcoder.py @@ -112,26 +112,22 @@ def __init__( self, job, fprule, command, replacement_dict, opts, on_success, counter ): self.fprule = fprule - self.command = command - self.replacement_dict = replacement_dict - self.opts = opts - self.on_success = on_success - self.commandObject = Command( - job, self.command, replacement_dict, self.on_success, opts - ) + self.cmd = Command(job, command, replacement_dict, on_success, opts) self.counter = counter def __str__(self): - return ( - f"[Command Linker] FPRule: {self.fprule.uuid} Command: {self.commandObject}" - ) + return f"[Command Linker] FPRule: {self.fprule.uuid} Command: {self.cmd}" + + @property + def output_location(self): + return self.cmd.output_location def execute(self): """Execute the command, and track the success statistics. Returns 0 on success, non-0 on failure.""" self.counter.record_attempt(self.fprule) - ret = self.commandObject.execute() + ret = self.cmd.execute() if ret: self.counter.record_failure(self.fprule) else: