Skip to content

Commit

Permalink
Merge pull request #72 from SEKOIA-IO/feat/logs_by_batch
Browse files Browse the repository at this point in the history
feat: Send logs by batch
  • Loading branch information
Darkheir authored Sep 8, 2023
2 parents 089c025 + 7200a82 commit d69dbe0
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Increase time to wait without events before restarting the pod
- Send logs by batch

## [1.3.9] - 2023-08-21

Expand Down
77 changes: 51 additions & 26 deletions sekoia_automation/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class Trigger(ModuleItem):
LIVENESS_PORT_FILE_NAME = "liveness_port"
METRICS_PORT_FILE_NAME = "metrics_port"

LOGS_MAX_BATCH_SIZE = 50
LOGS_MAX_DELTA = timedelta(seconds=5)

# Time to wait for stop event to be received
_STOP_EVENT_WAIT = 120

Expand All @@ -57,6 +60,8 @@ def __init__(self, module: Module | None = None, data_path: Path | None = None):
self._secrets: dict[str, Any] = {}
self._stop_event = Event()
self._critical_log_sent = False
self._logs: list[dict] = []
self._first_log_time: datetime | None = None

# Register signal to terminate thread
signal.signal(signal.SIGINT, self.stop)
Expand Down Expand Up @@ -151,14 +156,18 @@ def execute(self) -> None:
# Always restart the trigger, except if the error seems to be unrecoverable
self._secrets = self._get_secrets_from_server()
self.module.set_secrets(self._secrets)
while not self._stop_event.is_set():
try:
self._execute_once()
except Exception: # pragma: no cover
# Exception are handled in `_execute_once` but in case
# an error occurred while handling an error we catch everything
# i.e. An error occurred while sending logs to Sekoia.io
pass
try:
while not self._stop_event.is_set():
try:
self._execute_once()
except Exception: # pragma: no cover
# Exception are handled in `_execute_once` but in case
# an error occurred while handling an error we catch everything
# i.e. An error occurred while sending logs to Sekoia.io
pass
finally:
# Send remaining logs if any
self._send_logs_to_api()

def _rm_tree(self, path: Path):
"""Delete a directory and its children.
Expand Down Expand Up @@ -240,34 +249,50 @@ def _log_url(self):

# Try to send the log record to the API
# If it can't be done, give up after 10 attempts and capture the logging error
@retry(
wait=wait_exponential(max=10),
stop=stop_after_attempt(10),
retry_error_callback=capture_retry_error,
)

def log(self, message: str, level: str = "info", *args, **kwargs) -> None:
if level == "critical" and self._critical_log_sent:
# Prevent sending multiple critical errors
level = "error"
data = {
"logs": [
{
"date": datetime.utcnow().isoformat(),
"level": level,
"message": message,
}
]
}
response = requests.request(
"POST", self._log_url, json=data, headers=self._headers, timeout=30
)
response.raise_for_status()

super().log(message, level, *args, **kwargs)

self._logs.append(
{
"date": datetime.utcnow().isoformat(),
"level": level,
"message": message,
}
)
if self._first_log_time is None:
self._first_log_time = datetime.utcnow()

if (
level in ["error", "critical"] # Don't wait for error or critical logs
or len(self._logs) >= self.LOGS_MAX_BATCH_SIZE # batch is full
or datetime.utcnow() - self._first_log_time >= self.LOGS_MAX_DELTA
):
self._send_logs_to_api()

if level == "critical":
self._critical_log_sent = True

@retry(
wait=wait_exponential(max=10),
stop=stop_after_attempt(10),
retry_error_callback=capture_retry_error,
)
def _send_logs_to_api(self):
if not self._logs:
return
data = {"logs": self._logs}
response = requests.request(
"POST", self._log_url, json=data, headers=self._headers, timeout=30
)
response.raise_for_status()
self._logs = []
self._first_log_time = None

@abstractmethod
def run(self) -> None:
"""Method that each trigger should implement to contain its logic.
Expand Down
53 changes: 50 additions & 3 deletions tests/test_trigger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import time
from datetime import timedelta
from pathlib import Path
from typing import ClassVar
Expand Down Expand Up @@ -254,6 +255,7 @@ class TestTrigger(DummyTrigger):

def test_trigger_log(mocked_trigger_logs):
trigger = DummyTrigger()
trigger.LOGS_MAX_BATCH_SIZE = 0

assert mocked_trigger_logs.call_count == 0

Expand All @@ -265,6 +267,51 @@ def test_trigger_log(mocked_trigger_logs):
assert log_request["message"] == "test message"


def test_trigger_log_severity(mocked_trigger_logs):
trigger = DummyTrigger()

assert mocked_trigger_logs.call_count == 0

trigger.log("test message", "info")
assert mocked_trigger_logs.call_count == 0
trigger.log("error message", "error")
assert mocked_trigger_logs.call_count == 1

log = mocked_trigger_logs.last_request.json()["logs"][0]
assert log["date"] is not None
assert log["level"] == "info"
assert log["message"] == "test message"

log = mocked_trigger_logs.last_request.json()["logs"][1]
assert log["level"] == "error"


def test_trigger_log_batch_full(mocked_trigger_logs):
trigger = DummyTrigger()

for _ in range(trigger.LOGS_MAX_BATCH_SIZE):
assert mocked_trigger_logs.call_count == 0
trigger.log("test message", "info")

assert mocked_trigger_logs.call_count == 1
logs = mocked_trigger_logs.last_request.json()["logs"]
assert len(logs) == trigger.LOGS_MAX_BATCH_SIZE


def test_trigger_log_time_elapsed(mocked_trigger_logs):
trigger = DummyTrigger()
trigger.LOGS_MAX_DELTA = timedelta(milliseconds=1)

trigger.log("test message", "info")
assert mocked_trigger_logs.call_count == 0
time.sleep(trigger.LOGS_MAX_DELTA.total_seconds())
trigger.log("error message", "info")
assert mocked_trigger_logs.call_count == 1

logs = mocked_trigger_logs.last_request.json()["logs"]
assert len(logs) == 2


def test_trigger_log_retry(mocked_trigger_logs):
trigger = DummyTrigger()

Expand All @@ -277,7 +324,7 @@ def test_trigger_log_retry(mocked_trigger_logs):

# Make sure we are retrying log registrations
assert mocked_trigger_logs.call_count == 0
trigger.log.retry.wait = wait_none()
trigger._send_logs_to_api.retry.wait = wait_none()
trigger.log("test message", "error")

assert mocked_trigger_logs.call_count == 2
Expand All @@ -296,7 +343,7 @@ def run(self):
raise TriggerConfigurationError

trigger = TestTrigger()
trigger._STOP_EVENT_WAIT = 0.1
trigger._STOP_EVENT_WAIT = 0.001
with pytest.raises(SystemExit), patch.object(
Module, "load_config", return_value={}
):
Expand All @@ -323,7 +370,7 @@ def run(self):

trigger = TestTrigger()
trigger._error_count = 4
trigger._STOP_EVENT_WAIT = 0.1
trigger._STOP_EVENT_WAIT = 0.001
with pytest.raises(SystemExit), patch.object(
Module, "load_config", return_value={}
):
Expand Down

0 comments on commit d69dbe0

Please sign in to comment.