Skip to content

Commit

Permalink
Feat: extend response api (airtai#1607)
Browse files Browse the repository at this point in the history
* Feat; init

* Feat: add redis response and fix nats, redis response

* Fix: unificate code

* Fix: remove some args

* Fix: rewrite init method, override as_publish_kwargs

* chore: add DeprecationWarning for ReplyConfig

* tests: check RMQ DeprecationWarning

* lint: fix mypy

* Feat: stage 1 add response to all brokers, need specify to "Broker"Response class

* fix: correct Response subclass usage

* docs: generate API

* fix: wait for connect in Redis ping

* docs: generate API

* tests: confluent big timeout

* fix: confluent test_response

---------

Co-authored-by: Daniil Dumchenko <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2024
1 parent 2dc9f45 commit 36e5a17
Show file tree
Hide file tree
Showing 22 changed files with 464 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ search:
- [PublisherUsecase](api/faststream/broker/publisher/usecase/PublisherUsecase.md)
- response
- [Response](api/faststream/broker/response/Response.md)
- [ensure_response](api/faststream/broker/response/ensure_response.md)
- router
- [ArgsContainer](api/faststream/broker/router/ArgsContainer.md)
- [BrokerRouter](api/faststream/broker/router/BrokerRouter.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/broker/response/ensure_response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.response.ensure_response
33 changes: 12 additions & 21 deletions faststream/broker/response.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage
from faststream.types import AnyDict


class Response:
def __new__(
cls,
body: Union[
"SendableMessage",
"Response",
],
**kwargs: Any,
) -> "Response":
"""Create a new instance of the class."""
if isinstance(body, cls):
return body

else:
return super().__new__(cls)

def __init__(
self,
body: "SendableMessage",
body: "Any",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
) -> None:
"""Initialize a handler."""
if not isinstance(body, Response):
self.body = body
self.headers = headers or {}
self.correlation_id = correlation_id
self.body = body
self.headers = headers or {}
self.correlation_id = correlation_id

def add_headers(
self,
Expand All @@ -50,3 +34,10 @@ def as_publish_kwargs(self) -> "AnyDict":
"correlation_id": self.correlation_id,
}
return publish_options


def ensure_response(response: Union["Response", "Any"]) -> "Response":
if isinstance(response, Response):
return response

return Response(response)
4 changes: 2 additions & 2 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from faststream.asyncapi.abc import AsyncAPIOperation
from faststream.asyncapi.message import parse_handler_params
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.response import Response
from faststream.broker.response import ensure_response
from faststream.broker.subscriber.call_item import HandlerItem
from faststream.broker.subscriber.proto import SubscriberProto
from faststream.broker.types import (
Expand Down Expand Up @@ -351,7 +351,7 @@ async def process_message(self, msg: MsgType) -> Any:
for m in middlewares:
stack.push_async_exit(m.__aexit__)

result_msg = Response(
result_msg = ensure_response(
await h.call(
message=message,
# consumer middlewares
Expand Down
34 changes: 33 additions & 1 deletion faststream/confluent/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class KafkaResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
34 changes: 33 additions & 1 deletion faststream/kafka/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class KafkaResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
30 changes: 29 additions & 1 deletion faststream/nats/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class NatsResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
stream: Optional[str] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)
self.stream = stream

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"stream": self.stream,
}
return publish_options
5 changes: 5 additions & 0 deletions faststream/rabbit/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def subscriber( # type: ignore[override]
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
5 changes: 5 additions & 0 deletions faststream/rabbit/fastapi/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ def subscriber( # type: ignore[override]
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
61 changes: 60 additions & 1 deletion faststream/rabbit/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,64 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from aio_pika.abc import DateType, TimeoutType

from faststream.rabbit.types import AioPikaSendableMessage
from faststream.types import AnyDict


class RabbitResponse(Response):
pass
def __init__(
self,
body: "AioPikaSendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
message_id: Optional[str] = None,
mandatory: bool = True,
immediate: bool = False,
timeout: "TimeoutType" = None,
persist: Optional[bool] = None,
priority: Optional[int] = None,
message_type: Optional[str] = None,
content_type: Optional[str] = None,
expiration: Optional["DateType"] = None,
content_encoding: Optional[str] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.message_id = message_id
self.mandatory = mandatory
self.immediate = immediate
self.timeout = timeout
self.persist = persist
self.priority = priority
self.message_type = message_type
self.content_type = content_type
self.expiration = expiration
self.content_encoding = content_encoding

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"message_id": self.message_id,
"mandatory": self.mandatory,
"immediate": self.immediate,
"timeout": self.timeout,
"persist": self.persist,
"priority": self.priority,
"message_type": self.message_type,
"content_type": self.content_type,
"expiration": self.expiration,
"content_encoding": self.content_encoding,
}
return publish_options
5 changes: 5 additions & 0 deletions faststream/rabbit/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ def __init__(
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
12 changes: 12 additions & 0 deletions faststream/rabbit/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import TYPE_CHECKING, Iterable, Optional, Union

from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber
Expand Down Expand Up @@ -28,6 +29,17 @@ def create_subscriber(
description_: Optional[str],
include_in_schema: bool,
) -> AsyncAPISubscriber:
if reply_config: # pragma: no cover
warnings.warn(
(
"\n`reply_config` was deprecated in **FastStream 0.5.16**."
"\nPlease, use `RabbitResponse` object as a handler return instead."
"\nArgument will be removed in **FastStream 0.6.0**."
),
DeprecationWarning,
stacklevel=2,
)

return AsyncAPISubscriber(
queue=queue,
exchange=exchange,
Expand Down
8 changes: 7 additions & 1 deletion faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from urllib.parse import urlparse

import anyio
from anyio import move_on_after
from redis.asyncio.client import Redis
from redis.asyncio.connection import (
Expand All @@ -22,6 +23,7 @@
Encoder,
parse_url,
)
from redis.exceptions import ConnectionError
from typing_extensions import Annotated, Doc, TypeAlias, override

from faststream.__about__ import __version__
Expand Down Expand Up @@ -488,4 +490,8 @@ async def ping(self, timeout: Optional[float]) -> bool:
if self._connection is None:
return False

return await self._connection.ping()
while True:
try:
return await self._connection.ping()
except ConnectionError: # noqa: PERF203
await anyio.sleep(0.1)
3 changes: 3 additions & 0 deletions faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> Optional[Any]:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down Expand Up @@ -277,6 +278,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> Any:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down Expand Up @@ -335,6 +337,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down
Loading

0 comments on commit 36e5a17

Please sign in to comment.