Skip to content

Commit

Permalink
Merge branch 'main' into fix/connector_workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
squioc committed Oct 30, 2024
2 parents d234c52 + b710358 commit 9d5cc0a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Specify docker image when publishing a module
- Improvements for AsyncConnector.
- Improvements for Async Http workflow
- Remove duplicated parts and make the code more uniform for async http workflow

### Fixed

- Replace ulrllib.parse.urljoin by posixpath.join in AsyncConnector
- Fix tests for async version of connector.

## 1.16.0 - 2024-10-16
Expand Down
16 changes: 10 additions & 6 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from urllib.parse import urljoin
from posixpath import join as urljoin

from aiohttp import ClientSession
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -116,6 +116,13 @@ async def _async_send_chunk(

return events_ids

@property
def _batchapi_url(self):
if intake_server := self.configuration.intake_server:
return urljoin(intake_server, "batch")
else:
return urljoin(self.intake_url, "batch")

async def push_data_to_intakes(
self, events: Sequence[EventType]
) -> list[str]: # pragma: no cover
Expand All @@ -128,18 +135,15 @@ async def push_data_to_intakes(
Returns:
list[str]:
"""
if intake_server := self.configuration.intake_server:
batch_api = urljoin(intake_server, "batch")
else:
batch_api = urljoin(self.intake_url, "batch")
self._last_events_time = datetime.utcnow()

result_ids = []

chunks = self._chunk_events(events)

async with self.session() as session:
forwarders = [
self._async_send_chunk(session, batch_api, chunk_index, chunk)
self._async_send_chunk(session, self._batchapi_url, chunk_index, chunk)
for chunk_index, chunk in enumerate(chunks)
]
async for ids in limit_concurrency(forwarders, self.max_concurrency_tasks):
Expand Down
1 change: 1 addition & 0 deletions sekoia_automation/scripts/sync_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def load_module(self, module_path: Path):
module_info = json.load(fd)

docker_name = self._get_module_docker_name(module_info)
module_info["docker"] = f"{docker_name}:{module_info['version']}"
if self.registry_check and not self.check_image_on_registry(
docker_name, module_info["version"]
):
Expand Down
23 changes: 22 additions & 1 deletion tests/aio/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import AsyncGenerator
from datetime import datetime
from unittest.mock import Mock, patch
from urllib.parse import urljoin
from posixpath import join as urljoin

import pytest
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -206,6 +206,27 @@ async def test_async_connector_raise_error(
assert str(e) == expected_error


@pytest.mark.parametrize(
'base_url,expected_batchapi_url',
[
('http://intake.fake.url/', 'http://intake.fake.url/batch'),
('http://fake.url/intake/', 'http://fake.url/intake/batch'),
('http://fake.url/intake', 'http://fake.url/intake/batch'),
]
)
def test_async_connector_batchapi_url(storage, mocked_trigger_logs, base_url: str, expected_batchapi_url: str):
with patch("sentry_sdk.set_tag"):
async_connector = DummyAsyncConnector(data_path=storage)

async_connector.trigger_activation = "2022-03-14T11:16:14.236930Z"
async_connector.configuration = {
"intake_key": "",
"intake_server": base_url,
}

assert async_connector._batchapi_url == expected_batchapi_url


@pytest.mark.asyncio
async def test_async_connector_async_next_run(
async_connector: DummyAsyncConnector, faker: Faker
Expand Down
3 changes: 3 additions & 0 deletions tests/scripts/test_sync_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def test_no_module_success(tmp_module, module, action, trigger, connector, **kwa
assert history[0].headers["Authorization"] == f"Bearer {API_KEY}"
assert history[1].method == "PATCH"
assert history[1].url == f"{SYMPOHNY_URL}/modules/{module['uuid']}"
assert "docker" in history[1].json()
assert history[1].headers["Authorization"] == f"Bearer {API_KEY}"
assert history[2].method == "GET"
assert history[2].url == f"{SYMPOHNY_URL}/triggers/{trigger['uuid']}"
Expand Down Expand Up @@ -105,6 +106,7 @@ def test_no_module_404(tmp_module, module, action, trigger, connector, **kwargs)
assert history[0].headers["Authorization"] == f"Bearer {API_KEY}"
assert history[1].method == "POST"
assert history[1].url == f"{SYMPOHNY_URL}/modules"
assert "docker" in history[1].json()
assert history[1].headers["Authorization"] == f"Bearer {API_KEY}"
assert history[2].method == "GET"
assert history[2].url == f"{SYMPOHNY_URL}/triggers/{trigger['uuid']}"
Expand Down Expand Up @@ -168,6 +170,7 @@ def test_with_module(tmp_module, module, action, trigger, connector, **kwargs):
assert history[1].method == "PATCH"
assert history[1].url == f"{SYMPOHNY_URL}/modules/{module['uuid']}"
assert history[1].headers["Authorization"] == f"Bearer {API_KEY}"
assert "docker" in history[1].json()
assert history[2].method == "GET"
assert history[2].url == f"{SYMPOHNY_URL}/triggers/{trigger['uuid']}"
assert history[2].headers["Authorization"] == f"Bearer {API_KEY}"
Expand Down

0 comments on commit 9d5cc0a

Please sign in to comment.