Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Connectors workflow #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Improvements for AsyncConnector.
- Improvements for Async Http workflow
- Remove duplicated parts and make the code more uniform for async http workflow

### Fixed

- Fix tests for async version of connector.
Expand Down
104 changes: 76 additions & 28 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Contains connector with async version."""

import asyncio
import time
from abc import ABC
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Sequence
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -12,7 +13,11 @@
from aiolimiter import AsyncLimiter

from sekoia_automation.aio.helpers import limit_concurrency
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration
from sekoia_automation.connector import (
Connector,
DefaultConnectorConfiguration,
EventType,
)
from sekoia_automation.module import Module


Expand Down Expand Up @@ -44,24 +49,6 @@
self.max_concurrency_tasks = kwargs.pop("max_concurrency_tasks", 1000)
super().__init__(module=module, data_path=data_path, *args, **kwargs)

def set_client_session(self, session: ClientSession) -> None:
"""
Set client session.

Args:
session: ClientSession
"""
self._session = session

def set_rate_limiter(self, rate_limiter: AsyncLimiter) -> None:
"""
Set rate limiter.

Args:
rate_limiter:
"""
self._rate_limiter = rate_limiter

def get_rate_limiter(self) -> AsyncLimiter:
"""
Get or initialize rate limiter.
Expand Down Expand Up @@ -130,7 +117,7 @@
return events_ids

async def push_data_to_intakes(
self, events: list[str]
self, events: Sequence[EventType]
) -> list[str]: # pragma: no cover
"""
Custom method to push events to intakes.
Expand All @@ -141,7 +128,6 @@
Returns:
list[str]:
"""
self._last_events_time = datetime.utcnow()
if intake_server := self.configuration.intake_server:
batch_api = urljoin(intake_server, "batch")
else:
Expand All @@ -161,13 +147,75 @@

return result_ids

def stop(self, *args, **kwargs):
"""
Stop the connector
"""
async def async_iterate(
self,
) -> AsyncGenerator[tuple[list[EventType], datetime | None], None]:
"""Iterate over events."""
yield [], None # To avoid type checking error

async def async_next_run(self) -> None:
processing_start = time.time()

result_last_event_date: datetime | None = None
total_number_of_events = 0
async for data in self.async_iterate():
events, last_event_date = data
if last_event_date:
if (

Check warning on line 164 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L164

Added line #L164 was not covered by tests
not result_last_event_date
or last_event_date > result_last_event_date
):
result_last_event_date = last_event_date

Check warning on line 168 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L168

Added line #L168 was not covered by tests

if events:
total_number_of_events += len(events)
await self.push_data_to_intakes(events)

Check warning on line 172 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L171-L172

Added lines #L171 - L172 were not covered by tests

processing_end = time.time()
processing_time = processing_end - processing_start

# Metric about processing time
self._forward_events_duration.labels(
intake_key=self.configuration.intake_key
).observe(processing_time)

# Metric about processing count
self._outcoming_events.labels(intake_key=self.configuration.intake_key).inc(
total_number_of_events
)

# Metric about events lag
if result_last_event_date:
lag = (datetime.utcnow() - result_last_event_date).total_seconds()
self._events_lag.labels(intake_key=self.configuration.intake_key).set(lag)

Check warning on line 190 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L189-L190

Added lines #L189 - L190 were not covered by tests

# Compute the remaining sleeping time.
# If greater than 0 and no messages where fetched, pause the connector
delta_sleep = (self.frequency or 0) - processing_time
if total_number_of_events == 0 and delta_sleep > 0:
self.log(message=f"Next batch in the future. Waiting {delta_sleep} seconds")

Check warning on line 196 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L196

Added line #L196 was not covered by tests

await asyncio.sleep(delta_sleep)

Check warning on line 198 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L198

Added line #L198 was not covered by tests

# Put infinite arg only to have testing easier
async def async_run(self) -> None: # pragma: no cover
"""Runs Connector."""
while self.running:
try:
await self.async_next_run()
except Exception as e:
self.log_exception(
e,
message=f"Error while running connector {self.connector_name}",
)

if self.frequency:
await asyncio.sleep(self.frequency)

def run(self) -> None: # pragma: no cover
"""Runs Connector."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())

if self._session:
loop.run_until_complete(self._session.close())

super().stop(*args, **kwargs)
89 changes: 0 additions & 89 deletions sekoia_automation/aio/helpers/http/http_client.py

This file was deleted.

Loading
Loading