Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Connectors workflow #140

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
26 changes: 4 additions & 22 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## 1.18.0 - 2024-11-26
## 1.19.0 - 2024-11-28

### Changed

- Add additional values to log events sent to the API
- In Generic actions, in case of error use the message from the response if available

## 1.17.2 - 2024-11-06

### Fixed

- Fix callback URL file for account validation

## 1.17.1 - 2024-11-04

### Fixed

- Change the way to handle docker image information when publishing a module
- Fix the module synchronization script

## 1.17.0 - 2024-11-04

### Added

- Add account validation (beta)
- Improvements for AsyncConnector.
- Improvements for Async Http workflow
- Remove duplicated parts and make the code more uniform for async http workflow

## 1.16.1 - 2024-10-30

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "sekoia-automation-sdk"

version = "1.18.0"
version = "1.19.0"
description = "SDK to create Sekoia.io playbook modules"
license = "MIT"
readme = "README.md"
Expand Down
117 changes: 89 additions & 28 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Contains connector with async version."""

import asyncio
import time
from abc import ABC
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Sequence
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
Expand All @@ -12,7 +13,11 @@
from aiolimiter import AsyncLimiter

from sekoia_automation.aio.helpers import limit_concurrency
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration
from sekoia_automation.connector import (
Connector,
DefaultConnectorConfiguration,
EventType,
)
from sekoia_automation.module import Module


Expand Down Expand Up @@ -44,24 +49,6 @@
self.max_concurrency_tasks = kwargs.pop("max_concurrency_tasks", 1000)
super().__init__(module=module, data_path=data_path, *args, **kwargs)

def set_client_session(self, session: ClientSession) -> None:
"""
Set client session.

Args:
session: ClientSession
"""
self._session = session

def set_rate_limiter(self, rate_limiter: AsyncLimiter) -> None:
"""
Set rate limiter.

Args:
rate_limiter:
"""
self._rate_limiter = rate_limiter

def get_rate_limiter(self) -> AsyncLimiter:
"""
Get or initialize rate limiter.
Expand Down Expand Up @@ -137,7 +124,7 @@
return urljoin(self.intake_url, "batch")

async def push_data_to_intakes(
self, events: list[str]
self, events: Sequence[EventType]
) -> list[str]: # pragma: no cover
"""
Custom method to push events to intakes.
Expand All @@ -164,13 +151,87 @@

return result_ids

def stop(self, *args, **kwargs):
"""
Stop the connector
"""
loop = asyncio.get_event_loop()
async def async_iterate(
self,
) -> AsyncGenerator[tuple[list[EventType], datetime | None], None]:
"""Iterate over events."""
raise NotImplementedError # To avoid type checking error

Check warning on line 158 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L158

Added line #L158 was not covered by tests

async def async_next_run(self) -> None:
processing_start = time.time()

result_last_event_date: datetime | None = None
total_number_of_events = 0
async for data in self.async_iterate(): # type: ignore
events, last_event_date = data
if last_event_date:
if (

Check warning on line 168 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L168

Added line #L168 was not covered by tests
not result_last_event_date
or last_event_date > result_last_event_date
):
result_last_event_date = last_event_date

Check warning on line 172 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L172

Added line #L172 was not covered by tests

if events:
total_number_of_events += len(events)
await self.push_data_to_intakes(events)

processing_end = time.time()
processing_time = processing_end - processing_start

# Metric about processing time
self.put_forward_events_duration(
intake_key=self.configuration.intake_key,
duration=processing_time,
)

# Metric about processing count
self.put_forwarded_events(
intake_key=self.configuration.intake_key, count=total_number_of_events
)

# Metric about events lag
if result_last_event_date:
lag = (datetime.utcnow() - result_last_event_date).total_seconds()
self.put_events_lag(intake_key=self.configuration.intake_key, lag=lag)

Check warning on line 195 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L194-L195

Added lines #L194 - L195 were not covered by tests

# Compute the remaining sleeping time.
# If greater than 0 and no messages where fetched, pause the connector
delta_sleep = (self.frequency or 0) - processing_time
if total_number_of_events == 0 and delta_sleep > 0:
self.log(message=f"Next batch in the future. Waiting {delta_sleep} seconds")

Check warning on line 201 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L201

Added line #L201 was not covered by tests

await asyncio.sleep(delta_sleep)

Check warning on line 203 in sekoia_automation/aio/connector.py

View check run for this annotation

Codecov / codecov/patch

sekoia_automation/aio/connector.py#L203

Added line #L203 was not covered by tests

async def on_shutdown(self) -> None:
"""
Called when connector is finishing processing.

Can be used for some resources cleanup.

Basically it emits shutdown event.
"""

# Put infinite arg only to have testing easier
async def async_run(self) -> None: # pragma: no cover
"""Runs Connector."""
while self.running:
try:
await self.async_next_run()
except Exception as e:
self.log_exception(
e,
message=f"Error while running connector {self.connector_name}",
)

if self.frequency:
await asyncio.sleep(self.frequency)

if self._session:
loop.run_until_complete(self._session.close())
await self._session.close()

super().stop(*args, **kwargs)
await self.on_shutdown()

def run(self) -> None: # pragma: no cover
"""Runs Connector."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self.async_run())
89 changes: 0 additions & 89 deletions sekoia_automation/aio/helpers/http/http_client.py

This file was deleted.

Loading
Loading