Skip to content

Commit

Permalink
Merge pull request #139 from SEKOIA-IO/fix/AsyncConnectorURLJoin
Browse files Browse the repository at this point in the history
AsyncConnector: replace urllib.parse.urljoin with posixpath.join
  • Loading branch information
squioc authored Oct 30, 2024
2 parents 29cd2b8 + 2d8dc1b commit b710358
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Replace ulrllib.parse.urljoin by posixpath.join in AsyncConnector
- Fix tests for async version of connector.

## 1.16.0 - 2024-10-16
Expand Down
15 changes: 9 additions & 6 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from urllib.parse import urljoin
from posixpath import join as urljoin

from aiohttp import ClientSession
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -129,6 +129,13 @@ async def _async_send_chunk(

return events_ids

@property
def _batchapi_url(self):
if intake_server := self.configuration.intake_server:
return urljoin(intake_server, "batch")
else:
return urljoin(self.intake_url, "batch")

async def push_data_to_intakes(
self, events: list[str]
) -> list[str]: # pragma: no cover
Expand All @@ -142,18 +149,14 @@ async def push_data_to_intakes(
list[str]:
"""
self._last_events_time = datetime.utcnow()
if intake_server := self.configuration.intake_server:
batch_api = urljoin(intake_server, "batch")
else:
batch_api = urljoin(self.intake_url, "batch")

result_ids = []

chunks = self._chunk_events(events)

async with self.session() as session:
forwarders = [
self._async_send_chunk(session, batch_api, chunk_index, chunk)
self._async_send_chunk(session, self._batchapi_url, chunk_index, chunk)
for chunk_index, chunk in enumerate(chunks)
]
async for ids in limit_concurrency(forwarders, self.max_concurrency_tasks):
Expand Down
23 changes: 22 additions & 1 deletion tests/aio/test_connector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Test async connector."""

from unittest.mock import Mock, patch
from urllib.parse import urljoin
from posixpath import join as urljoin

import pytest
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -215,3 +215,24 @@ async def test_async_connector_raise_error(
except Exception as e:
assert isinstance(e, RuntimeError)
assert str(e) == expected_error


@pytest.mark.parametrize(
'base_url,expected_batchapi_url',
[
('http://intake.fake.url/', 'http://intake.fake.url/batch'),
('http://fake.url/intake/', 'http://fake.url/intake/batch'),
('http://fake.url/intake', 'http://fake.url/intake/batch'),
]
)
def test_async_connector_batchapi_url(storage, mocked_trigger_logs, base_url: str, expected_batchapi_url: str):
with patch("sentry_sdk.set_tag"):
async_connector = DummyAsyncConnector(data_path=storage)

async_connector.trigger_activation = "2022-03-14T11:16:14.236930Z"
async_connector.configuration = {
"intake_key": "",
"intake_server": base_url,
}

assert async_connector._batchapi_url == expected_batchapi_url

0 comments on commit b710358

Please sign in to comment.