From feb66f855b28c3bf3f4fb87834bdd0a33ea3b754 Mon Sep 17 00:00:00 2001 From: Stephan Geulette Date: Fri, 11 Oct 2024 08:21:06 +0200 Subject: [PATCH] Added filtering in log read --- src/imio/fpaudit/tests/test_utils.py | 23 +++++++++++++++++++++-- src/imio/fpaudit/utils.py | 11 +++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/imio/fpaudit/tests/test_utils.py b/src/imio/fpaudit/tests/test_utils.py index 8bdfbed..f3d859d 100644 --- a/src/imio/fpaudit/tests/test_utils.py +++ b/src/imio/fpaudit/tests/test_utils.py @@ -34,11 +34,11 @@ def test_fplog(self): logs = get_logrotate_filenames(LOG_DIR, "test_utils.log", r"\.\d+$") self.assertListEqual(logs, [log_file_path]) lines = [ln for ln in get_lines_of(log_file_path)] - self.assertEqual(len(list(lines)), 1) + self.assertEqual(len(lines), 1) self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1")) fplog("test", "AUDIT", "extra 2") lines = [ln for ln in get_lines_of(log_file_path)] - self.assertEqual(len(list(lines)), 2) + self.assertEqual(len(lines), 2) self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 2")) self.assertTrue(lines[1].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1")) # check with logrotated files @@ -57,6 +57,25 @@ def test_fplog(self): for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"): os.remove(fil) + def test_get_lines_of(self): + log_file_path = os.path.join(LOG_DIR, "test_utils.log") + for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"): + os.remove(fil) + fplog("test", "AUDIT", "extra") + fplog("test", "CONTACTS", "extra") + fplog("test", "CONTACTS", "extra") + fplog("test", "AUDIT", "extra") + fplog("test", "GROUPS", "extra") + lines = [ln for ln in get_lines_of(log_file_path)] + self.assertEqual(len(lines), 5) + lines = [ln for ln in get_lines_of(log_file_path, actions=("GROUPS",))] + self.assertEqual(len(lines), 1) + self.assertTrue(all("GROUPS" in ln for ln in lines)) + lines = [ln for ln in get_lines_of(log_file_path, actions=("AUDIT", "CONTACTS"))] + self.assertEqual(len(lines), 4) + self.assertEqual(len([ln for ln in lines if "AUDIT" in ln]), 2) + self.assertEqual(len([ln for ln in lines if "CONTACTS" in ln]), 2) + def test_get_logrotate_filenames(self): temp_dir = tempfile.mkdtemp() try: diff --git a/src/imio/fpaudit/utils.py b/src/imio/fpaudit/utils.py index 8e4d612..f8446a4 100644 --- a/src/imio/fpaudit/utils.py +++ b/src/imio/fpaudit/utils.py @@ -38,14 +38,17 @@ def get_all_lines_of(logfiles): yield line -def get_lines_of(logfile): +def get_lines_of(logfile, actions=()): """Generator for reversed log lines of a log file. - :param logfile: The path to the log file""" - # with open(logfile, "r") as file: + :param logfile: The path to the log file + :param actions: An action list to search_on""" + acts = [" action={}".format(act) for act in actions] with FileReadBackwards(logfile, encoding="utf-8") as file: for line in file: - yield line.strip("\n") + s_line = line.strip("\n") + if not actions or any(act in s_line for act in acts): + yield s_line def get_logrotate_filenames(directory, base_filename, suffix_regex=r"\.\d+$", full=True):