diff --git a/src/imio/fpaudit/testing.py b/src/imio/fpaudit/testing.py index 9882341..abd52ae 100644 --- a/src/imio/fpaudit/testing.py +++ b/src/imio/fpaudit/testing.py @@ -8,6 +8,7 @@ from plone.app.testing import TEST_USER_ID import imio.fpaudit +import os class ImioFPAuditLayer(PloneSandboxLayer): @@ -35,3 +36,16 @@ def setUpPloneSite(self, portal): IMIO_FPAUDIT_FUNCTIONAL_TESTING = FunctionalTesting( bases=(IMIO_FPAUDIT_FIXTURE,), name="ImioFPAuditLayer:FunctionalTesting" ) + + +def write_temp_files(temp_dir, filenames): + for filename in filenames: + with open(os.path.join(temp_dir, filename), "w") as f: + f.write("test") + + +def clear_temp_dir(temp_dir): + for filename in os.listdir(temp_dir): + file_path = os.path.join(temp_dir, filename) + if os.path.isfile(file_path): + os.remove(file_path) diff --git a/src/imio/fpaudit/tests/test_utils.py b/src/imio/fpaudit/tests/test_utils.py new file mode 100644 index 0000000..8bdfbed --- /dev/null +++ b/src/imio/fpaudit/tests/test_utils.py @@ -0,0 +1,90 @@ +from imio.fpaudit import LOG_DIR +from imio.fpaudit.storage import store_config +from imio.fpaudit.testing import clear_temp_dir +from imio.fpaudit.testing import IMIO_FPAUDIT_INTEGRATION_TESTING +from imio.fpaudit.testing import write_temp_files +from imio.fpaudit.utils import fplog +from imio.fpaudit.utils import get_all_lines_of +from imio.fpaudit.utils import get_lines_of +from imio.fpaudit.utils import get_logrotate_filenames +from plone import api + +import os +import shutil +import tempfile +import unittest + + +class TestUtils(unittest.TestCase): + + layer = IMIO_FPAUDIT_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer["portal"] + api.portal.set_registry_record( + "imio.fpaudit.settings.log_entries", + [{"log_id": u"test", "audit_log": u"test_utils.log", "log_format": u"%(asctime)s - %(message)s"}], + ) + + def test_fplog(self): + log_file_path = os.path.join(LOG_DIR, "test_utils.log") + for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"): + os.remove(fil) + fplog("test", "AUDIT", "extra 1") + logs = get_logrotate_filenames(LOG_DIR, "test_utils.log", r"\.\d+$") + self.assertListEqual(logs, [log_file_path]) + lines = [ln for ln in get_lines_of(log_file_path)] + self.assertEqual(len(list(lines)), 1) + self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1")) + fplog("test", "AUDIT", "extra 2") + lines = [ln for ln in get_lines_of(log_file_path)] + self.assertEqual(len(list(lines)), 2) + self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 2")) + self.assertTrue(lines[1].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1")) + # check with logrotated files + log_file_path1 = log_file_path + ".1" + os.rename(log_file_path, log_file_path1) + # changed id to stop writing in rotated here + store_config([{"log_id": u"test1", "audit_log": u"test_utils.log", "log_format": u"%(asctime)s - %(message)s"}]) + fplog("test1", "AUDIT", "extra 3") + fplog("test1", "AUDIT", "extra 4") + logs = get_logrotate_filenames(LOG_DIR, "test_utils.log", r"\.\d+$") + lines = [ln for ln in get_all_lines_of(logs)] + self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 4")) + self.assertTrue(lines[1].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 3")) + self.assertTrue(lines[2].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 2")) + self.assertTrue(lines[3].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1")) + for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"): + os.remove(fil) + + def test_get_logrotate_filenames(self): + temp_dir = tempfile.mkdtemp() + try: + # check filter + write_temp_files(temp_dir, ["test.log", "other.log", "test.log.1", "test.log.2", "test.log.lock"]) + expected_files = ["test.log", "test.log.1", "test.log.2"] + result_files = get_logrotate_filenames(temp_dir, "test.log", r"\.\d+$", full=False) + self.assertListEqual(result_files, expected_files) + clear_temp_dir(temp_dir) + # check order + write_temp_files(temp_dir, ["test.log", "test.log.1", "test.log.2", "test.log.10"]) + expected_files = ["test.log", "test.log.1", "test.log.2", "test.log.10"] + result_files = get_logrotate_filenames(temp_dir, "test.log", r"\.\d+$", full=False) + self.assertListEqual(result_files, expected_files) + clear_temp_dir(temp_dir) + # check full path + write_temp_files(temp_dir, ["test.log", "test.log.1", "test.log.2", "test.log.10"]) + expected_files = ["test.log", "test.log.1", "test.log.2", "test.log.10"] + expected_files = [os.path.join(temp_dir, f) for f in expected_files] + result_files = get_logrotate_filenames(temp_dir, "test.log", r"\.\d+$") + self.assertListEqual(result_files, expected_files) + clear_temp_dir(temp_dir) + # checl another filter + write_temp_files( + temp_dir, ["test.log", "other.log", "test.log-20240825", "test.log-20240901", "test.log-20240908"] + ) + expected_files = ["test.log", "test.log-20240825", "test.log-20240901", "test.log-20240908"] + result_files = get_logrotate_filenames(temp_dir, "test.log", r"-\d{8}$", full=False) + self.assertListEqual(result_files, expected_files) + finally: + shutil.rmtree(temp_dir) diff --git a/src/imio/fpaudit/utils.py b/src/imio/fpaudit/utils.py index 892838c..8e4d612 100644 --- a/src/imio/fpaudit/utils.py +++ b/src/imio/fpaudit/utils.py @@ -1,17 +1,25 @@ # -*- coding: utf-8 -*- from collective.fingerpointing.config import AUDIT_MESSAGE from collective.fingerpointing.utils import get_request_information +from file_read_backwards import FileReadBackwards from imio.fpaudit.interfaces import ILogsStorage +from natsort import natsorted from zope.component import getUtility import logging +import os +import re logger = logging.getLogger("imio.fpaudit") def fplog(log_id, action, extras): - """collective.fingerpointing add log message.""" + """collective.fingerpointing add log message. + + :param log_id: The log id as defined in the configuration + :param action: The action string + :param extras: The extras""" user, ip = get_request_information() storage = getUtility(ILogsStorage) log_i = storage.get(log_id) @@ -19,3 +27,37 @@ def fplog(log_id, action, extras): log_i(AUDIT_MESSAGE.format(user, ip, action, extras)) else: logger.info(AUDIT_MESSAGE.format(user, ip, action, extras)) + + +def get_all_lines_of(logfiles): + """Get all lines of a list of log files. + + :param logfiles: The list of log files""" + for logfile in logfiles: + for line in get_lines_of(logfile): + yield line + + +def get_lines_of(logfile): + """Generator for reversed log lines of a log file. + + :param logfile: The path to the log file""" + # with open(logfile, "r") as file: + with FileReadBackwards(logfile, encoding="utf-8") as file: + for line in file: + yield line.strip("\n") + + +def get_logrotate_filenames(directory, base_filename, suffix_regex=r"\.\d+$", full=True): + """Get all logrotate files matching the base filename in the directory. + + :param directory: The directory where the logrotate files are stored + :param base_filename: The base filename of the logrotate files + :param suffix_regex: The regex pattern to match the suffix of the logrotate files + :param full: If True, return the full path of the logrotate files + """ + pattern = re.compile("^{}(?:{})*$".format(re.escape(base_filename), suffix_regex)) + res = natsorted([f for f in os.listdir(directory) if pattern.match(f)]) + if full: + res = [os.path.join(directory, f) for f in res] + return res