-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Useful wrappers and helpers #67
Changes from all commits
529334d
bb3a1d3
a062e5a
21200f0
1dc6d66
60828b2
7c695f3
9af574b
798c1a8
27b2730
ca377c7
9c95b04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
[run] | ||
relative_files = True | ||
omit = | ||
sekoia_automation/scripts/new_module/template/* | ||
sekoia_automation/scripts/new_module/template/* |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Darkheir Do you agree to this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine by me if it works. It will allow to run checks with the same version we will have in the CI pipeline. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,25 @@ | ||
# See https://pre-commit.com for more information | ||
# See https://pre-commit.com/hooks.html for more hooks | ||
repos: | ||
- repo: https://github.com/charliermarsh/ruff-pre-commit | ||
# Ruff version. | ||
rev: 'v0.0.285' | ||
- repo: local | ||
hooks: | ||
- id: ruff | ||
args: [ --fix, --exit-non-zero-on-fix ] | ||
- id: black | ||
name: Format with Black | ||
entry: poetry run black | ||
language: system | ||
types: [python] | ||
|
||
- repo: https://github.com/ambv/black | ||
rev: 23.3.0 | ||
hooks: | ||
- id: black | ||
language_version: python3 | ||
- id: ruff | ||
name: Format with Ruff | ||
entry: poetry run ruff | ||
language: system | ||
types: [ python ] | ||
args: [ --fix, --exit-non-zero-on-fix, . ] | ||
|
||
- repo: https://github.com/pre-commit/mirrors-mypy | ||
rev: 'v1.3.0' # Use the sha / tag you want to point at | ||
hooks: | ||
- id: mypy | ||
args: [--install-types, --non-interactive] | ||
Darkheir marked this conversation as resolved.
Show resolved
Hide resolved
|
||
exclude: sekoia_automation/scripts/new_module/template/ | ||
name: Validate types with MyPy | ||
entry: poetry run mypy | ||
language: system | ||
types: [ python ] | ||
pass_filenames: false | ||
args: [ . ] |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Package contains all utilities and wrappers for asynchronous mode.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
"""Contains connector with async version.""" | ||
from abc import ABC | ||
from asyncio import AbstractEventLoop, get_event_loop | ||
from collections.abc import AsyncGenerator | ||
from contextlib import asynccontextmanager | ||
from datetime import datetime | ||
from urllib.parse import urljoin | ||
|
||
from aiohttp import ClientSession | ||
from aiolimiter import AsyncLimiter | ||
|
||
from sekoia_automation.connector import Connector, DefaultConnectorConfiguration | ||
|
||
|
||
class AsyncConnector(Connector, ABC): | ||
"""Async version of Connector.""" | ||
|
||
configuration: DefaultConnectorConfiguration | ||
|
||
_event_loop: AbstractEventLoop | ||
|
||
_session: ClientSession | None = None | ||
_rate_limiter: AsyncLimiter | None = None | ||
|
||
def __init__(self, event_loop: AbstractEventLoop | None = None, *args, **kwargs): | ||
""" | ||
Initialize AsyncConnector. | ||
|
||
Optionally accepts event_loop to use, otherwise will use default event loop. | ||
|
||
Args: | ||
event_loop: AbstractEventLoop | None | ||
""" | ||
super().__init__(*args, **kwargs) | ||
|
||
self._event_loop = event_loop or get_event_loop() | ||
|
||
@classmethod | ||
def set_client_session(cls, session: ClientSession) -> None: | ||
""" | ||
Set client session. | ||
|
||
Args: | ||
session: ClientSession | ||
""" | ||
cls._session = session | ||
|
||
@classmethod | ||
def set_rate_limiter(cls, rate_limiter: AsyncLimiter) -> None: | ||
""" | ||
Set rate limiter. | ||
|
||
Args: | ||
rate_limiter: | ||
""" | ||
cls._rate_limiter = rate_limiter | ||
|
||
@classmethod | ||
def get_rate_limiter(cls) -> AsyncLimiter: | ||
""" | ||
Get or initialize rate limiter. | ||
|
||
Returns: | ||
AsyncLimiter: | ||
""" | ||
if cls._rate_limiter is None: | ||
cls._rate_limiter = AsyncLimiter(1, 1) | ||
|
||
return cls._rate_limiter | ||
|
||
@classmethod | ||
@asynccontextmanager | ||
async def session(cls) -> AsyncGenerator[ClientSession, None]: # pragma: no cover | ||
""" | ||
Get or initialize client session if it is not initialized yet. | ||
|
||
Returns: | ||
ClientSession: | ||
""" | ||
if cls._session is None: | ||
cls._session = ClientSession() | ||
|
||
async with cls.get_rate_limiter(): | ||
yield cls._session | ||
|
||
async def push_data_to_intakes( | ||
self, events: list[str] | ||
) -> list[str]: # pragma: no cover | ||
""" | ||
Custom method to push events to intakes. | ||
|
||
Args: | ||
events: list[str] | ||
|
||
Returns: | ||
list[str]: | ||
""" | ||
self._last_events_time = datetime.utcnow() | ||
batch_api = urljoin(self.configuration.intake_server, "/batch") | ||
|
||
self.log(f"Push {len(events)} events to intakes") | ||
|
||
result_ids = [] | ||
|
||
chunks = self._chunk_events(events, self.configuration.chunk_size) | ||
|
||
async with self.session() as session: | ||
for chunk_index, chunk in enumerate(chunks): | ||
self.log( | ||
"Start to push chunk {} with data count {} to intakes".format( | ||
chunk_index, | ||
len(chunk), | ||
) | ||
) | ||
|
||
request_body = { | ||
"intake_key": self.configuration.intake_key, | ||
"jsons": chunk, | ||
} | ||
|
||
for attempt in self._retry(): | ||
with attempt: | ||
async with session.post( | ||
batch_api, | ||
headers={"User-Agent": self._connector_user_agent}, | ||
json=request_body, | ||
) as response: | ||
if response.status >= 300: | ||
error = await response.text() | ||
error_message = f"Chunk {chunk_index} error: {error}" | ||
exception = RuntimeError(error_message) | ||
|
||
self.log(message=error_message, level="error") | ||
self.log_exception(exception) | ||
|
||
raise exception | ||
|
||
result = await response.json() | ||
|
||
self.log( | ||
f"Successfully pushed chunk {chunk_index} to intakes" | ||
) | ||
|
||
result_ids.extend(result.get("event_ids", [])) | ||
|
||
return result_ids |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
""" | ||
Package contains all utilities and useful helpers. | ||
|
||
NOTE!!!: each package inside requires additional libraries to be installed. | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
""" | ||
Utilities and wrappers to work with aws. | ||
|
||
To use this package you need to install additional libraries: | ||
|
||
* aiobotocore (https://github.com/aio-libs/aiobotocore) | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you remove the directory
tests
in the command line?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it because of incorrect configuration between local execution and gitlab ci.
For some reason it does not resolve paths with
tests
on local env