Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Connectors workflow #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Improvements for AsyncConnector.
- Improvements for Async Http workflow
- Remove duplicated parts and make the code more uniform for async http workflow

## 1.16.1 - 2024-10-30

### Changed
Expand Down
103 changes: 76 additions & 27 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Contains connector with async version."""

import asyncio
import time
from abc import ABC
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Sequence
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -12,7 +13,11 @@
from aiolimiter import AsyncLimiter

from sekoia_automation.aio.helpers import limit_concurrency
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration
from sekoia_automation.connector import (
Connector,
DefaultConnectorConfiguration,
EventType,
)
from sekoia_automation.module import Module


Expand Down Expand Up @@ -44,24 +49,6 @@ def __init__(
self.max_concurrency_tasks = kwargs.pop("max_concurrency_tasks", 1000)
super().__init__(module=module, data_path=data_path, *args, **kwargs)

def set_client_session(self, session: ClientSession) -> None:
"""
Set client session.

Args:
session: ClientSession
"""
self._session = session

def set_rate_limiter(self, rate_limiter: AsyncLimiter) -> None:
"""
Set rate limiter.

Args:
rate_limiter:
"""
self._rate_limiter = rate_limiter

def get_rate_limiter(self) -> AsyncLimiter:
"""
Get or initialize rate limiter.
Expand Down Expand Up @@ -137,7 +124,7 @@ def _batchapi_url(self):
return urljoin(self.intake_url, "batch")

async def push_data_to_intakes(
self, events: list[str]
self, events: Sequence[EventType]
) -> list[str]: # pragma: no cover
"""
Custom method to push events to intakes.
Expand All @@ -164,13 +151,75 @@ async def push_data_to_intakes(

return result_ids

def stop(self, *args, **kwargs):
"""
Stop the connector
"""
async def async_iterate(
self,
) -> AsyncGenerator[tuple[list[EventType], datetime | None], None]:
"""Iterate over events."""
yield [], None # To avoid type checking error

async def async_next_run(self) -> None:
processing_start = time.time()

result_last_event_date: datetime | None = None
total_number_of_events = 0
async for data in self.async_iterate():
events, last_event_date = data
if last_event_date:
if (
not result_last_event_date
or last_event_date > result_last_event_date
):
result_last_event_date = last_event_date

if events:
total_number_of_events += len(events)
await self.push_data_to_intakes(events)

processing_end = time.time()
processing_time = processing_end - processing_start

# Metric about processing time
self._forward_events_duration.labels(
intake_key=self.configuration.intake_key
).observe(processing_time)

# Metric about processing count
self._outcoming_events.labels(intake_key=self.configuration.intake_key).inc(
total_number_of_events
)

# Metric about events lag
if result_last_event_date:
lag = (datetime.utcnow() - result_last_event_date).total_seconds()
self._events_lag.labels(intake_key=self.configuration.intake_key).set(lag)

# Compute the remaining sleeping time.
# If greater than 0 and no messages where fetched, pause the connector
delta_sleep = (self.frequency or 0) - processing_time
if total_number_of_events == 0 and delta_sleep > 0:
self.log(message=f"Next batch in the future. Waiting {delta_sleep} seconds")

await asyncio.sleep(delta_sleep)

# Put infinite arg only to have testing easier
async def async_run(self) -> None: # pragma: no cover
"""Runs Connector."""
while self.running:
try:
await self.async_next_run()
except Exception as e:
self.log_exception(
e,
message=f"Error while running connector {self.connector_name}",
)

if self.frequency:
await asyncio.sleep(self.frequency)

def run(self) -> None: # pragma: no cover
"""Runs Connector."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())

if self._session:
loop.run_until_complete(self._session.close())

super().stop(*args, **kwargs)
89 changes: 0 additions & 89 deletions sekoia_automation/aio/helpers/http/http_client.py

This file was deleted.

150 changes: 0 additions & 150 deletions sekoia_automation/aio/helpers/http/token_refresher.py

This file was deleted.

Loading
Loading