Skip to content

Commit

Permalink
Get rid of chunk_size
Browse files Browse the repository at this point in the history
  • Loading branch information
lvoloshyn-sekoia committed Oct 2, 2023
1 parent 1616964 commit 3954121
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
2 changes: 1 addition & 1 deletion sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async def push_data_to_intakes(

result_ids = []

chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)

async with self.session() as session:
for chunk_index, chunk in enumerate(chunks):
Expand Down
17 changes: 4 additions & 13 deletions sekoia_automation/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
class DefaultConnectorConfiguration(BaseModel):
intake_server: str = "https://intake.sekoia.io"
intake_key: str
chunk_size: int = 1000


class Connector(Trigger, ABC):
Expand Down Expand Up @@ -108,7 +107,7 @@ def push_events_to_intakes(
collect_ids: dict[int, list] = {}

# pushing the events
chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)

# if requested, or if the executor is down
if sync or not self.running:
Expand Down Expand Up @@ -176,17 +175,12 @@ def send_records(
remove_directory=True,
)

def _chunk_events(
self,
events: Sequence,
chunk_size: int,
) -> Generator[list[Any], None, None]:
def _chunk_events(self, events: Sequence) -> Generator[list[Any], None, None]:
"""
Group events by chunk.
Args:
sequence events: Sequence: The events to group
chunk_size: int: The size of the chunk
Returns:
Generator[list[Any], None, None]:
Expand All @@ -202,10 +196,7 @@ def _chunk_events(
continue

# if the chunk is full
if (
len(chunk) >= chunk_size
or chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE
):
if chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE:
# yield the current chunk and create a new one
yield chunk
chunk = []
Expand All @@ -229,7 +220,7 @@ def _chunk_events(

def forward_events(self, events) -> None:
try:
chunks = self._chunk_events(events, self.configuration.chunk_size)
chunks = self._chunk_events(events)
_name = self.name or "" # mypy complains about NoneType in annotation
for records in chunks:
self.log(message=f"Forwarding {len(records)} records", level="info")
Expand Down
12 changes: 6 additions & 6 deletions tests/aio/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ async def test_async_connector_push_multiple_events(
async_connector: DummyAsyncConnector
faker: Faker
"""
async_connector.configuration.chunk_size = 1

events = [
faker.json(
data_columns={
Expand All @@ -158,7 +156,9 @@ async def test_async_connector_push_multiple_events(

request_url = urljoin(async_connector.configuration.intake_server, "/batch")

with aioresponses() as mocked_responses:
with aioresponses() as mocked_responses, patch(
"sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128
):
for _ in range(100):
mocked_responses.post(
request_url,
Expand All @@ -182,8 +182,6 @@ async def test_async_connector_raise_error(
async_connector: DummyAsyncConnector
faker: Faker
"""
async_connector.configuration.chunk_size = 1

events = [
faker.json(
data_columns={
Expand All @@ -202,7 +200,9 @@ async def test_async_connector_raise_error(

request_url = urljoin(async_connector.configuration.intake_server, "/batch")

with aioresponses() as mocked_responses:
with aioresponses() as mocked_responses, patch(
"sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128
):
for _ in range(2):
mocked_responses.post(
request_url,
Expand Down
25 changes: 14 additions & 11 deletions tests/connectors/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ def test_send_records(test_connector):


def test_chunk_events(test_connector):
chunks = test_connector._chunk_events(events=EVENTS, chunk_size=1)
chunk_number = 0
for chunk in chunks:
assert "".join(chunk) in EVENTS
chunk_number += 1
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
chunks = test_connector._chunk_events(events=EVENTS)
chunk_number = 0

for chunk in chunks:
assert "".join(chunk) in EVENTS
chunk_number += 1

assert chunk_number == 2

Expand All @@ -72,7 +74,7 @@ def test_chunk_events_exceed_size(test_connector):
)
events_b = ["b"]
events = events_a + events_b
chunks = list(test_connector._chunk_events(events=events, chunk_size=10000))
chunks = list(test_connector._chunk_events(events=events))
assert len(chunks) == 2
assert chunks == [events_a, events_b]

Expand All @@ -82,7 +84,7 @@ def test_chunk_events_discard_too_long_message(test_connector):
event_b = "b" * (EVENT_BYTES_MAX_SIZE + 1)
event_c = "c"
events = [event_a, event_b, event_c]
chunks = list(test_connector._chunk_events(events=events, chunk_size=10000))
chunks = list(test_connector._chunk_events(events=events))
assert len(chunks) == 1
assert chunks == [[event_a, event_c]]
assert test_connector.log.called
Expand All @@ -99,7 +101,6 @@ def test_push_event_to_intake_with_2_events(test_connector, mocked_trigger_logs)

def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs):
url = "https://intake.sekoia.io/batch"
test_connector.configuration.chunk_size = 1
mocked_trigger_logs.post(
url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo")
)
Expand All @@ -112,7 +113,9 @@ def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs):
mocked_trigger_logs.post(
url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof")
)
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])

assert result is not None
assert len(result) == 4
assert mocked_trigger_logs.call_count == 4
Expand All @@ -124,7 +127,6 @@ def test_push_event_to_intake_with_chunks_executor_stopped(
):
test_connector.stop()
url = "https://intake.sekoia.io/batch"
test_connector.configuration.chunk_size = 1
mocked_trigger_logs.post(
url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo")
)
Expand All @@ -137,7 +139,8 @@ def test_push_event_to_intake_with_chunks_executor_stopped(
mocked_trigger_logs.post(
url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof")
)
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1
result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"])
assert result is not None
assert len(result) == 4
assert mocked_trigger_logs.call_count == 4
Expand Down

0 comments on commit 3954121

Please sign in to comment.