Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Connectors workflow #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.16.0] - 2024-10-02
squioc marked this conversation as resolved.
Show resolved Hide resolved

### Changed

- Improvements for AsyncConnector.
- Improvements for Async Http workflow
- Remove duplicated parts and make the code more uniform for async http workflow


## [1.15.0] - 2024-09-28

### Changed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "sekoia-automation-sdk"

version = "1.15.0"
version = "1.16.0"
squioc marked this conversation as resolved.
Show resolved Hide resolved
description = "SDK to create Sekoia.io playbook modules"
license = "MIT"
readme = "README.md"
Expand Down
139 changes: 98 additions & 41 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Contains connector with async version."""

import asyncio
import time
from abc import ABC
from asyncio import AbstractEventLoop, get_event_loop
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Sequence
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -12,7 +13,11 @@
from aiolimiter import AsyncLimiter

from sekoia_automation.aio.helpers import limit_concurrency
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration
from sekoia_automation.connector import (
Connector,
DefaultConnectorConfiguration,
EventType,
)
from sekoia_automation.module import Module


Expand All @@ -21,16 +26,13 @@

configuration: DefaultConnectorConfiguration

_event_loop: AbstractEventLoop

_session: ClientSession | None = None
_rate_limiter: AsyncLimiter | None = None

def __init__(
self,
module: Module | None = None,
data_path: Path | None = None,
event_loop: AbstractEventLoop | None = None,
*args,
**kwargs,
):
Expand All @@ -47,55 +49,31 @@
self.max_concurrency_tasks = kwargs.pop("max_concurrency_tasks", 1000)
super().__init__(module=module, data_path=data_path, *args, **kwargs)

self._event_loop = event_loop or get_event_loop()

@classmethod
def set_client_session(cls, session: ClientSession) -> None:
"""
Set client session.

Args:
session: ClientSession
"""
cls._session = session

@classmethod
def set_rate_limiter(cls, rate_limiter: AsyncLimiter) -> None:
"""
Set rate limiter.

Args:
rate_limiter:
"""
cls._rate_limiter = rate_limiter

@classmethod
def get_rate_limiter(cls) -> AsyncLimiter:
def get_rate_limiter(self) -> AsyncLimiter:
"""
Get or initialize rate limiter.

Returns:
AsyncLimiter:
"""
if cls._rate_limiter is None:
cls._rate_limiter = AsyncLimiter(1, 1)
if self._rate_limiter is None:
self._rate_limiter = AsyncLimiter(1, 1)

return cls._rate_limiter
return self._rate_limiter

@classmethod
@asynccontextmanager
async def session(cls) -> AsyncGenerator[ClientSession, None]: # pragma: no cover
async def session(self) -> AsyncGenerator[ClientSession, None]: # pragma: no cover
"""
Get or initialize client session if it is not initialized yet.

Returns:
ClientSession:
"""
if cls._session is None:
cls._session = ClientSession()
if self._session is None:
self._session = ClientSession()

async with cls.get_rate_limiter():
yield cls._session
async with self.get_rate_limiter():
yield self._session

async def _async_send_chunk(
self, session: ClientSession, url: str, chunk_index: int, chunk: list[str]
Expand Down Expand Up @@ -139,7 +117,7 @@
return events_ids

async def push_data_to_intakes(
self, events: list[str]
self, events: Sequence[EventType]
) -> list[str]: # pragma: no cover
"""
Custom method to push events to intakes.
Expand All @@ -150,7 +128,6 @@
Returns:
list[str]:
"""
self._last_events_time = datetime.utcnow()
if intake_server := self.configuration.intake_server:
batch_api = urljoin(intake_server, "batch")
else:
Expand All @@ -169,3 +146,83 @@
result_ids.extend(ids)

return result_ids

async def async_iterate(
self,
) -> AsyncGenerator[tuple[list[EventType], datetime | None], None]:
"""Iterate over events."""
yield [], None # To avoid type checking error

async def async_next_run(self) -> None:
processing_start = time.time()

result_last_event_date: datetime | None = None
total_number_of_events = 0
async for data in self.async_iterate():
events, last_event_date = data
if last_event_date:
if (

Check warning on line 164 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L164

Added line #L164 was not covered by tests
not result_last_event_date
or last_event_date > result_last_event_date
):
result_last_event_date = last_event_date

Check warning on line 168 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L168

Added line #L168 was not covered by tests

if events:
total_number_of_events += len(events)
await self.push_data_to_intakes(events)

Check warning on line 172 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L171-L172

Added lines #L171 - L172 were not covered by tests

processing_end = time.time()
processing_time = processing_end - processing_start

# Metric about processing time
self._forward_events_duration.labels(
intake_key=self.configuration.intake_key
).observe(processing_time)

# Metric about processing count
self._outcoming_events.labels(intake_key=self.configuration.intake_key).inc(
total_number_of_events
)

# Metric about events lag
if result_last_event_date:
lag = (datetime.utcnow() - result_last_event_date).total_seconds()
self._events_lag.labels(intake_key=self.configuration.intake_key).set(lag)

Check warning on line 190 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L189-L190

Added lines #L189 - L190 were not covered by tests

# Compute the remaining sleeping time.
# If greater than 0 and no messages where fetched, pause the connector
delta_sleep = (self.frequency or 0) - processing_time
if total_number_of_events == 0 and delta_sleep > 0:
self.log(message=f"Next batch in the future. Waiting {delta_sleep} seconds")

Check warning on line 196 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L196

Added line #L196 was not covered by tests

await asyncio.sleep(delta_sleep)

Check warning on line 198 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L198

Added line #L198 was not covered by tests

# Put infinite arg only to have testing easier
async def async_run(self) -> None: # pragma: no cover
"""Runs Connector."""
while self.running:
try:
await self.async_next_run()
except Exception as e:
self.log_exception(
e,
message=f"Error while running connector {self.connector_name}",
)

if self.frequency:
await asyncio.sleep(self.frequency)

def stop(self, *args, **kwargs):
"""
Stop the connector
"""
super().stop(*args, **kwargs)
loop = asyncio.get_event_loop()

if self._session:
loop.run_until_complete(self._session.close())
squioc marked this conversation as resolved.
Show resolved Hide resolved

def run(self) -> None: # pragma: no cover
"""Runs Connector."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())
91 changes: 0 additions & 91 deletions sekoia_automation/aio/helpers/http/http_client.py

This file was deleted.

Loading
Loading