Skip to content

Commit

Permalink
Merge pull request #99 from SEKOIA-IO/feat/push_logs_regular_interval
Browse files Browse the repository at this point in the history
feat: Thread to push logs at regular interval
  • Loading branch information
Darkheir authored Nov 29, 2023
2 parents 996e5a5 + 7739243 commit 5b6cd16
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Changed

- Thread to push logs at regular interval

## [1.8.2] - 2023-11-28

## Changed
Expand Down
31 changes: 31 additions & 0 deletions sekoia_automation/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from threading import Event, Timer


class RepeatedTimer:
"""
Execute the given function every `interval` seconds.
"""

def __init__(self, interval, function):
self._timer = None
self.interval = interval
self.function = function
self.is_running = False
self._stop = Event()

def _run(self):
self.function()
self.is_running = False
self.start()

def start(self):
if not self._stop.is_set() and not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True

def stop(self):
self._stop.set()
if self._timer:
self._timer.cancel()
self.is_running = False
30 changes: 18 additions & 12 deletions sekoia_automation/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from sekoia_automation.metrics import PrometheusExporterThread, make_exporter
from sekoia_automation.module import Module, ModuleItem
from sekoia_automation.timer import RepeatedTimer
from sekoia_automation.utils import (
capture_retry_error,
get_annotation_for,
Expand All @@ -45,7 +46,7 @@ class Trigger(ModuleItem):
METRICS_PORT_FILE_NAME = "metrics_port"

LOGS_MAX_BATCH_SIZE = 50
LOGS_MAX_DELTA = timedelta(seconds=5)
LOGS_MAX_DELTA = 5 # seconds

# Time to wait for stop event to be received
_STOP_EVENT_WAIT = 120
Expand All @@ -61,7 +62,8 @@ def __init__(self, module: Module | None = None, data_path: Path | None = None):
self._stop_event = Event()
self._critical_log_sent = False
self._logs: list[dict] = []
self._first_log_time: datetime | None = None

self._logs_timer = RepeatedTimer(self.LOGS_MAX_DELTA, self._send_logs_to_api)

# Register signal to terminate thread
signal.signal(signal.SIGINT, self.stop)
Expand Down Expand Up @@ -104,6 +106,7 @@ def stop(self, *args, **kwargs) -> None: # noqa: ARG002
Engage the trigger exit
"""
self._stop_event.set()
self._logs_timer.stop()

@property
def running(self) -> bool:
Expand Down Expand Up @@ -168,6 +171,7 @@ def execute(self) -> None:
# Always restart the trigger, except if the error seems to be unrecoverable
self._secrets = self._get_secrets_from_server()
self.module.set_secrets(self._secrets)
self._logs_timer.start()
try:
while not self._stop_event.is_set():
try:
Expand Down Expand Up @@ -307,13 +311,9 @@ def log(self, message: str, level: str = "info", *args, **kwargs) -> None:
"message": message,
}
)
if self._first_log_time is None:
self._first_log_time = datetime.utcnow()

if (
level in ["error", "critical"] # Don't wait for error or critical logs
or len(self._logs) >= self.LOGS_MAX_BATCH_SIZE # batch is full
or datetime.utcnow() - self._first_log_time >= self.LOGS_MAX_DELTA
):
self._send_logs_to_api()

Expand All @@ -329,13 +329,19 @@ def log(self, message: str, level: str = "info", *args, **kwargs) -> None:
def _send_logs_to_api(self):
if not self._logs:
return
data = {"logs": self._logs}
response = requests.request(
"POST", self.logs_url, json=data, headers=self._headers, timeout=30
)
response.raise_for_status()
# Clear self._logs, so we won't lose logs that are added while sending
logs = self._logs
self._logs = []
self._first_log_time = None
try:
data = {"logs": logs}
response = requests.request(
"POST", self.logs_url, json=data, headers=self._headers, timeout=30
)
response.raise_for_status()
except Exception:
# If the request failed, we add the logs back to the list
self._logs.extend(logs)
raise

@abstractmethod
def run(self) -> None:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from time import sleep

from sekoia_automation.timer import RepeatedTimer


def test_timer():
a = 0

def timed():
nonlocal a
a += 1

timer = RepeatedTimer(0.1, timed)
timer.start()

assert timer.is_running is True
sleep(0.15)
assert a == 1

# Check stop
timer.stop()
assert timer.is_running is False
12 changes: 8 additions & 4 deletions tests/test_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,18 @@ def test_trigger_log_batch_full(mocked_trigger_logs):


def test_trigger_log_time_elapsed(mocked_trigger_logs):
DummyTrigger.LOGS_MAX_DELTA = 0.01
trigger = DummyTrigger()
trigger.LOGS_MAX_DELTA = timedelta(milliseconds=1)
trigger._logs_timer.start()

trigger.log("test message", "info")
assert mocked_trigger_logs.call_count == 0
time.sleep(trigger.LOGS_MAX_DELTA.total_seconds())
trigger.log("error message", "info")
time.sleep(trigger.LOGS_MAX_DELTA * 1.5)
assert mocked_trigger_logs.call_count == 1

logs = mocked_trigger_logs.last_request.json()["logs"]
assert len(logs) == 2
assert len(logs) == 1
trigger.stop()


def test_trigger_log_retry(mocked_trigger_logs):
Expand Down Expand Up @@ -370,6 +371,7 @@ def run(self):
assert (
mocked_trigger_logs.request_history[1].json()["logs"][0]["level"] == "critical"
)
trigger.stop()


@patch.object(Trigger, "_get_secrets_from_server")
Expand Down Expand Up @@ -400,6 +402,7 @@ def run(self):
assert (
mocked_trigger_logs.request_history[1].json()["logs"][0]["level"] == "critical"
)
trigger.stop()


def test_trigger_log_critical_only_once(mocked_trigger_logs):
Expand Down Expand Up @@ -443,6 +446,7 @@ def test_get_secrets(_, __, ___):

assert rmock.call_count == 1
assert trigger._secrets == TRIGGER_SECRETS
trigger.stop()


@pytest.fixture()
Expand Down

0 comments on commit 5b6cd16

Please sign in to comment.