Skip to content

Commit

Permalink
Update async connector to use configurable rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
vladyslav-huriev committed Feb 13, 2024
1 parent 3421902 commit 4be107b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.11.2] - 2023-02-13

### Fixed

- Fixes rate limiter in async connector. Make it more configurable

## [1.11.1] - 2023-01-29

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "sekoia-automation-sdk"

version = "1.11.1"
version = "1.11.2"
description = "SDK to create Sekoia.io playbook modules"
license = "MIT"
readme = "README.md"
Expand Down
18 changes: 15 additions & 3 deletions sekoia_automation/aio/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Contains connector with async version."""

import os
from abc import ABC
from asyncio import AbstractEventLoop, get_event_loop
from collections.abc import AsyncGenerator
Expand Down Expand Up @@ -68,15 +69,21 @@ def set_rate_limiter(cls, rate_limiter: AsyncLimiter) -> None:
cls._rate_limiter = rate_limiter

@classmethod
def get_rate_limiter(cls) -> AsyncLimiter:
def get_rate_limiter(cls) -> AsyncLimiter | None:
"""
Get or initialize rate limiter.
Returns:
AsyncLimiter:
"""
if cls._rate_limiter is None:
cls._rate_limiter = AsyncLimiter(1, 1)
requests_limit = os.getenv("REQUESTS_PER_SECOND_TO_INTAKE")

if requests_limit is not None and int(requests_limit) > 0:
cls._rate_limiter = AsyncLimiter(int(requests_limit), 1)

if requests_limit is None:
cls._rate_limiter = AsyncLimiter(1, 1)

return cls._rate_limiter

Expand All @@ -92,7 +99,12 @@ async def session(cls) -> AsyncGenerator[ClientSession, None]: # pragma: no cov
if cls._session is None:
cls._session = ClientSession()

async with cls.get_rate_limiter():
rate_limiter = cls.get_rate_limiter()

if rate_limiter:
async with rate_limiter:
yield cls._session
else:
yield cls._session

async def push_data_to_intakes(
Expand Down
57 changes: 56 additions & 1 deletion tests/aio/test_connector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test async connector."""

from unittest.mock import Mock, patch
import os
from unittest.mock import AsyncMock, Mock, patch
from urllib.parse import urljoin

import pytest
Expand Down Expand Up @@ -221,3 +222,57 @@ async def test_async_connector_raise_error(
except Exception as e:
assert isinstance(e, RuntimeError)
assert str(e) == expected_error


@pytest.mark.asyncio
async def test_session():
async with AsyncConnector.session() as session:
assert session is not None


@pytest.mark.asyncio
async def test_session_reuses_existing_session():
session_mock = Mock()
AsyncConnector._session = session_mock

async with AsyncConnector.session() as session:
assert session == session_mock


@pytest.mark.asyncio
async def test_session_with_rate_limiter():
mock_rate_limiter = AsyncMock()
AsyncConnector._rate_limiter = mock_rate_limiter

async with AsyncConnector.session() as session:
assert session is not None
mock_rate_limiter.__aenter__.assert_called_once()


@pytest.mark.asyncio
async def test_session_with_rate_limiter_none():
AsyncConnector._rate_limiter = None

async with AsyncConnector.session() as session:
assert session is not None
assert AsyncConnector._rate_limiter.max_rate == 1


@pytest.mark.asyncio
async def test_session_with_rate_limiter_from_env_variable():
os.environ["REQUESTS_PER_SECOND_TO_INTAKE"] = str(100)
AsyncConnector._rate_limiter = None

async with AsyncConnector.session() as session:
assert session is not None
assert AsyncConnector._rate_limiter.max_rate == 100


@pytest.mark.asyncio
async def test_session_with_rate_limiter_from_env_variable_with_zero():
os.environ["REQUESTS_PER_SECOND_TO_INTAKE"] = str(0)
AsyncConnector._rate_limiter = None

async with AsyncConnector.session() as session:
assert session is not None
assert AsyncConnector._rate_limiter is None

0 comments on commit 4be107b

Please sign in to comment.