-
Notifications
You must be signed in to change notification settings - Fork 3
/
federation_info.py
120 lines (102 loc) · 4.03 KB
/
federation_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import json
import re
from contextlib import contextmanager
from sys import stdin
import click
import pymonetdb
DB_USERNAME = "admin"
DB_PASSWORD = "executor"
DB_FARM = "db"
@contextmanager
def db_cursor(ip, port):
connection = pymonetdb.connect(
hostname=ip,
port=port,
username=DB_USERNAME,
password=DB_PASSWORD,
database=DB_FARM,
autocommit=True,
)
cursor = connection.cursor()
yield cursor
cursor.close()
connection.close()
@click.group()
def cli():
"""
This is a log aggregation script.
It can be used either in a local hospital worker to show database actions or in the federation master worker
to show information for all the federation workers.
"""
pass
LOG_FILE_CHUNK_SIZE = 1024 # Will read the logfile in chunks
TIMESTAMP_REGEX = (
r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}" # 2022-04-13 18:25:22,875
)
WORKER_JOINED_PATTERN = (
rf"({TIMESTAMP_REGEX}) .* Worker with id '(.*)' joined the federation.$"
)
WORKER_LEFT_PATTERN = (
rf"({TIMESTAMP_REGEX}) .* Worker with id '(.*)' left the federation.$"
)
DATA_MODEL_ADDED_PATTERN = rf"({TIMESTAMP_REGEX}) .* Datamodel '(.*)' was added.$"
DATA_MODEL_REMOVED_PATTERN = rf"({TIMESTAMP_REGEX}) .* Datamodel '(.*)' was removed.$"
DATASET_ADDED_PATTERN = (
rf"({TIMESTAMP_REGEX}) .* Dataset '(.*)' of datamodel '(.*)' was "
r"added in worker '(.*)'.$"
)
DATASET_REMOVED_PATTERN = (
rf"({TIMESTAMP_REGEX}) .* Dataset '(.*)' of datamodel '(.*)' "
r"was removed from worker '(.*)'.$"
)
EXPERIMENT_EXECUTION_PATTERN = (
rf"({TIMESTAMP_REGEX}) .* Experiment with request id '(.*)' "
r"and context id '(.*)' is starting algorithm '(.*)', touching datasets '(.*)' on local "
r"workers '(.*)' with parameters '(.*)'.$"
)
def print_audit_entry(log_line):
if pattern_groups := re.search(WORKER_JOINED_PATTERN, log_line):
print(f"{pattern_groups.group(1)} - WORKER_JOINED - {pattern_groups.group(2)}")
elif pattern_groups := re.search(WORKER_LEFT_PATTERN, log_line):
print(f"{pattern_groups.group(1)} - WORKER_LEFT - {pattern_groups.group(2)}")
elif pattern_groups := re.search(DATA_MODEL_ADDED_PATTERN, log_line):
print(
f"{pattern_groups.group(1)} - DATAMODEL_ADDED - {pattern_groups.group(2)}"
)
elif pattern_groups := re.search(DATA_MODEL_REMOVED_PATTERN, log_line):
print(
f"{pattern_groups.group(1)} - DATAMODEL_REMOVED - {pattern_groups.group(2)}"
)
elif pattern_groups := re.search(DATASET_ADDED_PATTERN, log_line):
print(
f"{pattern_groups.group(1)} - DATASET_ADDED - {pattern_groups.group(4)} - {pattern_groups.group(3)} - {pattern_groups.group(2)}"
)
elif pattern_groups := re.search(DATASET_REMOVED_PATTERN, log_line):
print(
f"{pattern_groups.group(1)} - DATASET_REMOVED - {pattern_groups.group(4)} - {pattern_groups.group(3)} - {pattern_groups.group(2)}"
)
elif pattern_groups := re.search(EXPERIMENT_EXECUTION_PATTERN, log_line):
print(
f"{pattern_groups.group(1)} - EXPERIMENT_STARTED - {pattern_groups.group(2)} - {pattern_groups.group(4)} - {pattern_groups.group(5)} - {pattern_groups.group(7)}"
)
@cli.command()
@click.option(
"--logfile",
help="The logfile to get the audit entries from. Will use stdin if not provided.",
type=click.File("r"),
default=stdin,
)
def show_controller_audit_entries(logfile):
previous_chunk_remains = ""
while logs_chunk := logfile.read(LOG_FILE_CHUNK_SIZE):
logs_chunk = previous_chunk_remains + logs_chunk
# Separate lines when "\n2022-04-13 18:25:22,875 - is found
separate_log_lines = re.split(rf"\n(?={TIMESTAMP_REGEX} -)", logs_chunk)
# The final log_line could be incomplete due to "chunking"
for log_line in separate_log_lines[:-1]:
print_audit_entry(log_line)
previous_chunk_remains = separate_log_lines[-1]
else:
print_audit_entry(previous_chunk_remains)
if __name__ == "__main__":
cli()