Skip to content

Commit

Permalink
Async and sync clients
Browse files Browse the repository at this point in the history
  • Loading branch information
vladyslav-huriev committed May 22, 2024
1 parent 94fab9e commit 3a13bb0
Show file tree
Hide file tree
Showing 27 changed files with 2,366 additions and 583 deletions.
1,224 changes: 651 additions & 573 deletions poetry.lock

Large diffs are not rendered by default.

25 changes: 19 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ sekoia-automation = 'sekoia_automation.cli:app'
[tool.poetry.dependencies]
python = ">=3.10,<3.12"
requests = "^2.25"
requests-ratelimiter = "^0.4.2"
sentry-sdk = "*"
tenacity = "*"
boto3 = "^1.28"
Expand Down Expand Up @@ -69,6 +70,14 @@ types-pyyaml = "^6.0.12.10"
types-python-slugify = "^8.0.0.2"
pre-commit = "^3.3.3"

[tool.poetry.group.lint.dependencies]
ruff = "*"
black = "*"
mypy = "*"
isort = "*"
autoflake = "^1.4"
poethepoet = { version = "^0.16.5", extras = ["poetry_plugin"] }

[tool.poetry.extras]
all = [
"aiohttp",
Expand All @@ -94,16 +103,13 @@ logging = [
"loguru"
]


[tool.poetry.group.lint.dependencies]
ruff = "*"
black = "*"
mypy = "*"

[tool.black]
target-version = ["py311"]
force-exclude = "tests/expectations/sample_module/main.py|sekoia_automation/scripts/new_module/template/"

[tool.autoflake]
exclude = "sekoia_automation/scripts/**/*"

[tool.pytest.ini_options]
minversion = "6.0"
addopts = '''
Expand Down Expand Up @@ -139,3 +145,10 @@ exclude = [
"tests/"
]
disable_error_code = "annotation-unchecked"

[tool.poe.tasks.lint]
sequence = [
{ cmd = "poetry run autoflake ." },
{ cmd = "poetry run black ." },
]
help = "Make code linting and formatting."
2 changes: 1 addition & 1 deletion sekoia_automation/aio/helpers/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class HttpClient:
Http client with optional rate limiting.
Example:
>>> from sekoia_automation.helpers.aio.http.http_client import HttpClient
>>> from sekoia_automation.http.aio.http_client import HttpClient
>>> class CustomHttpClient(HttpClient):
>>> def __init__(self):
>>> super().__init__()
Expand Down
Empty file.
Empty file.
210 changes: 210 additions & 0 deletions sekoia_automation/http/aio/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""AsyncHttpClient."""

import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any, Optional

from aiohttp import ClientResponse, ClientResponseError, ClientSession
from aiohttp.web_response import Response
from aiolimiter import AsyncLimiter

from sekoia_automation.http.http_client import AbstractHttpClient
from sekoia_automation.http.rate_limiter import RateLimiterConfig
from sekoia_automation.http.retry import RetryPolicy


class AsyncHttpClient(AbstractHttpClient[Response]):
def __init__(
self,
retry_policy: RetryPolicy | None = None,
rate_limiter_config: RateLimiterConfig | None = None,
):
"""
Initialize AsyncHttpClient.
Args:
retry_policy: RetryPolicy | None
rate_limiter_config: AsyncLimiter | None
"""
super().__init__(retry_policy, rate_limiter_config)
self._session: ClientSession | None = None

self._rate_limiter: AsyncLimiter | None = None
if rate_limiter_config:
self._rate_limiter = AsyncLimiter(
max_rate=rate_limiter_config.max_rate,
time_period=rate_limiter_config.time_period,
)

@asynccontextmanager
async def session(self) -> AsyncGenerator[ClientSession, None]:
"""
Get properly configured session with retry and async limiter.
Yields:
AsyncGenerator[ClientSession, None]:
"""
if self._session is None:
self._session = ClientSession()

if self._rate_limiter:
async with self._rate_limiter:
yield self._session
else:
yield self._session

@asynccontextmanager
async def get_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Get callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("GET", url, *args, **kwargs) as result:
yield result

@asynccontextmanager
async def post_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Post callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("POST", url, *args, **kwargs) as result:
yield result

@asynccontextmanager
async def put_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Put callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("PUT", url, *args, **kwargs) as response:
yield response

@asynccontextmanager
async def delete_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Delete callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("DELETE", url, *args, **kwargs) as response:
yield response

@asynccontextmanager
async def patch_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Patch callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("PATCH", url, *args, **kwargs) as response:
yield response

@asynccontextmanager
async def head_retry(
self, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Head callable.
Args:
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
async with self.request_retry("HEAD", url, *args, **kwargs) as response:
yield response

@asynccontextmanager
async def request_retry(
self, method: str, url: str, *args: Any, **kwargs: Optional[Any]
) -> AsyncGenerator[ClientResponse, None]:
"""
Request callable.
Args:
method: str
url: str
args: Any
kwargs: Optional[Any]
Returns:
ClientResponse:
"""
attempts = 1
backoff_factor = 0.1
if self._retry_policy is not None and self._retry_policy.max_retries > 0:
attempts = self._retry_policy.max_retries
backoff_factor = self._retry_policy.backoff_factor

for attempt in range(attempts):
try:
async with self.session() as session:
async with session.request(
method, url, *args, **kwargs
) as response:
if (
self._retry_policy is not None
and response.status in self._retry_policy.status_forcelist
and attempt < self._retry_policy.max_retries - 1
):
message = f"Status {response.status} is in forcelist"
raise ClientResponseError(
response.request_info,
response.history,
status=response.status,
message=message,
)

yield response

break
except ClientResponseError:
await asyncio.sleep(backoff_factor * (2**attempt))
Loading

0 comments on commit 3a13bb0

Please sign in to comment.