Skip to content

Commit

Permalink
Added filtering in log read
Browse files Browse the repository at this point in the history
  • Loading branch information
sgeulette committed Oct 11, 2024
1 parent a841e98 commit feb66f8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
23 changes: 21 additions & 2 deletions src/imio/fpaudit/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def test_fplog(self):
logs = get_logrotate_filenames(LOG_DIR, "test_utils.log", r"\.\d+$")
self.assertListEqual(logs, [log_file_path])
lines = [ln for ln in get_lines_of(log_file_path)]
self.assertEqual(len(list(lines)), 1)
self.assertEqual(len(lines), 1)
self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1"))
fplog("test", "AUDIT", "extra 2")
lines = [ln for ln in get_lines_of(log_file_path)]
self.assertEqual(len(list(lines)), 2)
self.assertEqual(len(lines), 2)
self.assertTrue(lines[0].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 2"))
self.assertTrue(lines[1].endswith(" - user=test_user_1_ ip=None action=AUDIT extra 1"))
# check with logrotated files
Expand All @@ -57,6 +57,25 @@ def test_fplog(self):
for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"):
os.remove(fil)

def test_get_lines_of(self):
log_file_path = os.path.join(LOG_DIR, "test_utils.log")
for fil in get_logrotate_filenames(LOG_DIR, "test_utils.log", r".+$"):
os.remove(fil)
fplog("test", "AUDIT", "extra")
fplog("test", "CONTACTS", "extra")
fplog("test", "CONTACTS", "extra")
fplog("test", "AUDIT", "extra")
fplog("test", "GROUPS", "extra")
lines = [ln for ln in get_lines_of(log_file_path)]
self.assertEqual(len(lines), 5)
lines = [ln for ln in get_lines_of(log_file_path, actions=("GROUPS",))]
self.assertEqual(len(lines), 1)
self.assertTrue(all("GROUPS" in ln for ln in lines))
lines = [ln for ln in get_lines_of(log_file_path, actions=("AUDIT", "CONTACTS"))]
self.assertEqual(len(lines), 4)
self.assertEqual(len([ln for ln in lines if "AUDIT" in ln]), 2)
self.assertEqual(len([ln for ln in lines if "CONTACTS" in ln]), 2)

def test_get_logrotate_filenames(self):
temp_dir = tempfile.mkdtemp()
try:
Expand Down
11 changes: 7 additions & 4 deletions src/imio/fpaudit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ def get_all_lines_of(logfiles):
yield line


def get_lines_of(logfile):
def get_lines_of(logfile, actions=()):
"""Generator for reversed log lines of a log file.
:param logfile: The path to the log file"""
# with open(logfile, "r") as file:
:param logfile: The path to the log file
:param actions: An action list to search_on"""
acts = [" action={}".format(act) for act in actions]
with FileReadBackwards(logfile, encoding="utf-8") as file:
for line in file:
yield line.strip("\n")
s_line = line.strip("\n")
if not actions or any(act in s_line for act in acts):
yield s_line


def get_logrotate_filenames(directory, base_filename, suffix_regex=r"\.\d+$", full=True):
Expand Down

0 comments on commit feb66f8

Please sign in to comment.