Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of chunk_size and set intake_url from env #77

Merged
merged 3 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.6.0] - 2023-10-02

### Changed

- Remove `chunk_size` parameter from configuration
- Try to take Intake URL from an environment var first

### Changed

- Improve error message when it is not possible to access the data storage
lvoloshyn-sekoia marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
2 changes: 1 addition & 1 deletion sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def push_data_to_intakes(

result_ids = []

chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)

async with self.session() as session:
for chunk_index, chunk in enumerate(chunks):
Expand Down
20 changes: 6 additions & 14 deletions sekoia_automation/connector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import uuid
from abc import ABC
from collections.abc import Generator, Sequence
Expand Down Expand Up @@ -25,7 +26,6 @@
class DefaultConnectorConfiguration(BaseModel):
intake_server: str = "https://intake.sekoia.io"
intake_key: str
chunk_size: int = 1000


class Connector(Trigger, ABC):
Expand Down Expand Up @@ -101,14 +101,14 @@ def push_events_to_intakes(
# Reset the consecutive error count
self._error_count = 0
self._last_events_time = datetime.utcnow()
intake_host = self.configuration.intake_server
intake_host = os.getenv("INTAKE_URL", self.configuration.intake_server)
batch_api = urljoin(intake_host, "/batch")

# Dict to collect event_ids for the API
collect_ids: dict[int, list] = {}

# pushing the events
chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)

# if requested, or if the executor is down
if sync or not self.running:
Expand Down Expand Up @@ -176,17 +176,12 @@ def send_records(
remove_directory=True,
)

def _chunk_events(
self,
events: Sequence,
chunk_size: int,
) -> Generator[list[Any], None, None]:
def _chunk_events(self, events: Sequence) -> Generator[list[Any], None, None]:
"""
Group events by chunk.

Args:
sequence events: Sequence: The events to group
chunk_size: int: The size of the chunk

Returns:
Generator[list[Any], None, None]:
Expand All @@ -202,10 +197,7 @@ def _chunk_events(
continue

# if the chunk is full
if (
len(chunk) >= chunk_size
or chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE
):
if chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE:
# yield the current chunk and create a new one
yield chunk
chunk = []
Expand All @@ -229,7 +221,7 @@ def _chunk_events(

def forward_events(self, events) -> None:
try:
chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)
_name = self.name or "" # mypy complains about NoneType in annotation
for records in chunks:
self.log(message=f"Forwarding {len(records)} records", level="info")
Expand Down
12 changes: 6 additions & 6 deletions tests/aio/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ async def test_async_connector_push_multiple_events(
async_connector: DummyAsyncConnector
faker: Faker
"""
async_connector.configuration.chunk_size = 1

events = [
faker.json(
data_columns={
Expand All @@ -158,7 +156,9 @@ async def test_async_connector_push_multiple_events(

request_url = urljoin(async_connector.configuration.intake_server, "/batch")

with aioresponses() as mocked_responses:
with aioresponses() as mocked_responses, patch(
"sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128
):
for _ in range(100):
mocked_responses.post(
request_url,
Expand All @@ -182,8 +182,6 @@ async def test_async_connector_raise_error(
async_connector: DummyAsyncConnector
faker: Faker
"""
async_connector.configuration.chunk_size = 1

events = [
faker.json(
data_columns={
Expand All @@ -202,7 +200,9 @@ async def test_async_connector_raise_error(

request_url = urljoin(async_connector.configuration.intake_server, "/batch")

with aioresponses() as mocked_responses:
with aioresponses() as mocked_responses, patch(
"sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128
):
for _ in range(2):
mocked_responses.post(
request_url,
Expand Down
25 changes: 14 additions & 11 deletions tests/connectors/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ def test_send_records(test_connector):


def test_chunk_events(test_connector):
chunks = test_connector._chunk_events(events=EVENTS, chunk_size=1)
chunk_number = 0
for chunk in chunks:
assert "".join(chunk) in EVENTS
chunk_number += 1
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
chunks = test_connector._chunk_events(events=EVENTS)
chunk_number = 0

for chunk in chunks:
assert "".join(chunk) in EVENTS
chunk_number += 1

assert chunk_number == 2

Expand All @@ -72,7 +74,7 @@ def test_chunk_events_exceed_size(test_connector):
)
events_b = ["b"]
events = events_a + events_b
chunks = list(test_connector._chunk_events(events=events, chunk_size=10000))
chunks = list(test_connector._chunk_events(events=events))
assert len(chunks) == 2
assert chunks == [events_a, events_b]

Expand All @@ -82,7 +84,7 @@ def test_chunk_events_discard_too_long_message(test_connector):
event_b = "b" * (EVENT_BYTES_MAX_SIZE + 1)
event_c = "c"
events = [event_a, event_b, event_c]
chunks = list(test_connector._chunk_events(events=events, chunk_size=10000))
chunks = list(test_connector._chunk_events(events=events))
assert len(chunks) == 1
assert chunks == [[event_a, event_c]]
assert test_connector.log.called
Expand All @@ -99,7 +101,6 @@ def test_push_event_to_intake_with_2_events(test_connector, mocked_trigger_logs)

def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs):
url = "https://intake.sekoia.io/batch"
test_connector.configuration.chunk_size = 1
mocked_trigger_logs.post(
url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo")
)
Expand All @@ -112,7 +113,9 @@ def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs):
mocked_trigger_logs.post(
url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof")
)
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])

assert result is not None
assert len(result) == 4
assert mocked_trigger_logs.call_count == 4
Expand All @@ -124,7 +127,6 @@ def test_push_event_to_intake_with_chunks_executor_stopped(
):
test_connector.stop()
url = "https://intake.sekoia.io/batch"
test_connector.configuration.chunk_size = 1
mocked_trigger_logs.post(
url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo")
)
Expand All @@ -137,7 +139,8 @@ def test_push_event_to_intake_with_chunks_executor_stopped(
mocked_trigger_logs.post(
url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof")
)
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
assert result is not None
assert len(result) == 4
assert mocked_trigger_logs.call_count == 4
Expand Down