diff --git a/sekoia_automation/aio/connector.py b/sekoia_automation/aio/connector.py index 9f9586f..61030dd 100644 --- a/sekoia_automation/aio/connector.py +++ b/sekoia_automation/aio/connector.py @@ -102,7 +102,7 @@ async def push_data_to_intakes( result_ids = [] - chunks = self._chunk_events(events, self.configuration.chunk_size) + chunks = self._chunk_events(events) async with self.session() as session: for chunk_index, chunk in enumerate(chunks): diff --git a/sekoia_automation/connector/__init__.py b/sekoia_automation/connector/__init__.py index 26cd84f..7ed356c 100644 --- a/sekoia_automation/connector/__init__.py +++ b/sekoia_automation/connector/__init__.py @@ -25,7 +25,6 @@ class DefaultConnectorConfiguration(BaseModel): intake_server: str = "https://intake.sekoia.io" intake_key: str - chunk_size: int = 1000 class Connector(Trigger, ABC): @@ -108,7 +107,7 @@ def push_events_to_intakes( collect_ids: dict[int, list] = {} # pushing the events - chunks = self._chunk_events(events, self.configuration.chunk_size) + chunks = self._chunk_events(events) # if requested, or if the executor is down if sync or not self.running: @@ -176,17 +175,12 @@ def send_records( remove_directory=True, ) - def _chunk_events( - self, - events: Sequence, - chunk_size: int, - ) -> Generator[list[Any], None, None]: + def _chunk_events(self, events: Sequence) -> Generator[list[Any], None, None]: """ Group events by chunk. Args: sequence events: Sequence: The events to group - chunk_size: int: The size of the chunk Returns: Generator[list[Any], None, None]: @@ -202,10 +196,7 @@ def _chunk_events( continue # if the chunk is full - if ( - len(chunk) >= chunk_size - or chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE - ): + if chunk_bytes + len(event) > CHUNK_BYTES_MAX_SIZE: # yield the current chunk and create a new one yield chunk chunk = [] @@ -229,7 +220,7 @@ def _chunk_events( def forward_events(self, events) -> None: try: - chunks = self._chunk_events(events, self.configuration.chunk_size) + chunks = self._chunk_events(events) _name = self.name or "" # mypy complains about NoneType in annotation for records in chunks: self.log(message=f"Forwarding {len(records)} records", level="info") diff --git a/tests/aio/test_connector.py b/tests/aio/test_connector.py index 1d2aa77..b9001cf 100644 --- a/tests/aio/test_connector.py +++ b/tests/aio/test_connector.py @@ -140,8 +140,6 @@ async def test_async_connector_push_multiple_events( async_connector: DummyAsyncConnector faker: Faker """ - async_connector.configuration.chunk_size = 1 - events = [ faker.json( data_columns={ @@ -158,7 +156,9 @@ async def test_async_connector_push_multiple_events( request_url = urljoin(async_connector.configuration.intake_server, "/batch") - with aioresponses() as mocked_responses: + with aioresponses() as mocked_responses, patch( + "sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128 + ): for _ in range(100): mocked_responses.post( request_url, @@ -182,8 +182,6 @@ async def test_async_connector_raise_error( async_connector: DummyAsyncConnector faker: Faker """ - async_connector.configuration.chunk_size = 1 - events = [ faker.json( data_columns={ @@ -202,7 +200,9 @@ async def test_async_connector_raise_error( request_url = urljoin(async_connector.configuration.intake_server, "/batch") - with aioresponses() as mocked_responses: + with aioresponses() as mocked_responses, patch( + "sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 128 + ): for _ in range(2): mocked_responses.post( request_url, diff --git a/tests/connectors/test_connector.py b/tests/connectors/test_connector.py index c877c1e..0e04c6a 100644 --- a/tests/connectors/test_connector.py +++ b/tests/connectors/test_connector.py @@ -57,11 +57,13 @@ def test_send_records(test_connector): def test_chunk_events(test_connector): - chunks = test_connector._chunk_events(events=EVENTS, chunk_size=1) - chunk_number = 0 - for chunk in chunks: - assert "".join(chunk) in EVENTS - chunk_number += 1 + with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1 + chunks = test_connector._chunk_events(events=EVENTS) + chunk_number = 0 + + for chunk in chunks: + assert "".join(chunk) in EVENTS + chunk_number += 1 assert chunk_number == 2 @@ -72,7 +74,7 @@ def test_chunk_events_exceed_size(test_connector): ) events_b = ["b"] events = events_a + events_b - chunks = list(test_connector._chunk_events(events=events, chunk_size=10000)) + chunks = list(test_connector._chunk_events(events=events)) assert len(chunks) == 2 assert chunks == [events_a, events_b] @@ -82,7 +84,7 @@ def test_chunk_events_discard_too_long_message(test_connector): event_b = "b" * (EVENT_BYTES_MAX_SIZE + 1) event_c = "c" events = [event_a, event_b, event_c] - chunks = list(test_connector._chunk_events(events=events, chunk_size=10000)) + chunks = list(test_connector._chunk_events(events=events)) assert len(chunks) == 1 assert chunks == [[event_a, event_c]] assert test_connector.log.called @@ -99,7 +101,6 @@ def test_push_event_to_intake_with_2_events(test_connector, mocked_trigger_logs) def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs): url = "https://intake.sekoia.io/batch" - test_connector.configuration.chunk_size = 1 mocked_trigger_logs.post( url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo") ) @@ -112,7 +113,9 @@ def test_push_event_to_intake_with_chunks(test_connector, mocked_trigger_logs): mocked_trigger_logs.post( url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof") ) - result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"]) + with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1 + result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"]) + assert result is not None assert len(result) == 4 assert mocked_trigger_logs.call_count == 4 @@ -124,7 +127,6 @@ def test_push_event_to_intake_with_chunks_executor_stopped( ): test_connector.stop() url = "https://intake.sekoia.io/batch" - test_connector.configuration.chunk_size = 1 mocked_trigger_logs.post( url, json={"event_ids": ["001"]}, additional_matcher=match_events("foo") ) @@ -137,7 +139,8 @@ def test_push_event_to_intake_with_chunks_executor_stopped( mocked_trigger_logs.post( url, json={"event_ids": ["004"]}, additional_matcher=match_events("oof") ) - result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"]) + with patch("sekoia_automation.connector.CHUNK_BYTES_MAX_SIZE", 4): # len("foo") + 1 + result = test_connector.push_events_to_intakes(["foo", "bar", "baz", "oof"]) assert result is not None assert len(result) == 4 assert mocked_trigger_logs.call_count == 4