diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 92a04da868..c8efe27247 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -359,6 +359,7 @@ search: - [PublisherUsecase](api/faststream/broker/publisher/usecase/PublisherUsecase.md) - response - [Response](api/faststream/broker/response/Response.md) + - [ensure_response](api/faststream/broker/response/ensure_response.md) - router - [ArgsContainer](api/faststream/broker/router/ArgsContainer.md) - [BrokerRouter](api/faststream/broker/router/BrokerRouter.md) diff --git a/docs/docs/en/api/faststream/broker/response/ensure_response.md b/docs/docs/en/api/faststream/broker/response/ensure_response.md new file mode 100644 index 0000000000..b4a98bd4a4 --- /dev/null +++ b/docs/docs/en/api/faststream/broker/response/ensure_response.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.broker.response.ensure_response diff --git a/faststream/broker/response.py b/faststream/broker/response.py index 5a811a77e0..fb08993251 100644 --- a/faststream/broker/response.py +++ b/faststream/broker/response.py @@ -1,37 +1,21 @@ from typing import TYPE_CHECKING, Any, Optional, Union if TYPE_CHECKING: - from faststream.types import AnyDict, SendableMessage + from faststream.types import AnyDict class Response: - def __new__( - cls, - body: Union[ - "SendableMessage", - "Response", - ], - **kwargs: Any, - ) -> "Response": - """Create a new instance of the class.""" - if isinstance(body, cls): - return body - - else: - return super().__new__(cls) - def __init__( self, - body: "SendableMessage", + body: "Any", *, headers: Optional["AnyDict"] = None, correlation_id: Optional[str] = None, ) -> None: """Initialize a handler.""" - if not isinstance(body, Response): - self.body = body - self.headers = headers or {} - self.correlation_id = correlation_id + self.body = body + self.headers = headers or {} + self.correlation_id = correlation_id def add_headers( self, @@ -50,3 +34,10 @@ def as_publish_kwargs(self) -> "AnyDict": "correlation_id": self.correlation_id, } return publish_options + + +def ensure_response(response: Union["Response", "Any"]) -> "Response": + if isinstance(response, Response): + return response + + return Response(response) diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index 661524e8b1..c5eab13fb2 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -20,7 +20,7 @@ from faststream.asyncapi.abc import AsyncAPIOperation from faststream.asyncapi.message import parse_handler_params from faststream.asyncapi.utils import to_camelcase -from faststream.broker.response import Response +from faststream.broker.response import ensure_response from faststream.broker.subscriber.call_item import HandlerItem from faststream.broker.subscriber.proto import SubscriberProto from faststream.broker.types import ( @@ -351,7 +351,7 @@ async def process_message(self, msg: MsgType) -> Any: for m in middlewares: stack.push_async_exit(m.__aexit__) - result_msg = Response( + result_msg = ensure_response( await h.call( message=message, # consumer middlewares diff --git a/faststream/confluent/response.py b/faststream/confluent/response.py index dc36bb6932..da420aa286 100644 --- a/faststream/confluent/response.py +++ b/faststream/confluent/response.py @@ -1,5 +1,37 @@ +from typing import TYPE_CHECKING, Optional + +from typing_extensions import override + from faststream.broker.response import Response +if TYPE_CHECKING: + from faststream.types import AnyDict, SendableMessage + class KafkaResponse(Response): - pass + def __init__( + self, + body: "SendableMessage", + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + timestamp_ms: Optional[int] = None, + key: Optional[bytes] = None, + ) -> None: + super().__init__( + body=body, + headers=headers, + correlation_id=correlation_id, + ) + + self.timestamp_ms = timestamp_ms + self.key = key + + @override + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + **super().as_publish_kwargs(), + "timestamp_ms": self.timestamp_ms, + "key": self.key, + } + return publish_options diff --git a/faststream/kafka/response.py b/faststream/kafka/response.py index dc36bb6932..da420aa286 100644 --- a/faststream/kafka/response.py +++ b/faststream/kafka/response.py @@ -1,5 +1,37 @@ +from typing import TYPE_CHECKING, Optional + +from typing_extensions import override + from faststream.broker.response import Response +if TYPE_CHECKING: + from faststream.types import AnyDict, SendableMessage + class KafkaResponse(Response): - pass + def __init__( + self, + body: "SendableMessage", + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + timestamp_ms: Optional[int] = None, + key: Optional[bytes] = None, + ) -> None: + super().__init__( + body=body, + headers=headers, + correlation_id=correlation_id, + ) + + self.timestamp_ms = timestamp_ms + self.key = key + + @override + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + **super().as_publish_kwargs(), + "timestamp_ms": self.timestamp_ms, + "key": self.key, + } + return publish_options diff --git a/faststream/nats/response.py b/faststream/nats/response.py index 6b77c7da25..b15bc97277 100644 --- a/faststream/nats/response.py +++ b/faststream/nats/response.py @@ -1,5 +1,33 @@ +from typing import TYPE_CHECKING, Optional + +from typing_extensions import override + from faststream.broker.response import Response +if TYPE_CHECKING: + from faststream.types import AnyDict, SendableMessage + class NatsResponse(Response): - pass + def __init__( + self, + body: "SendableMessage", + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + stream: Optional[str] = None, + ) -> None: + super().__init__( + body=body, + headers=headers, + correlation_id=correlation_id, + ) + self.stream = stream + + @override + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + **super().as_publish_kwargs(), + "stream": self.stream, + } + return publish_options diff --git a/faststream/rabbit/broker/registrator.py b/faststream/rabbit/broker/registrator.py index 0c0f99df70..8c6b0ba99c 100644 --- a/faststream/rabbit/broker/registrator.py +++ b/faststream/rabbit/broker/registrator.py @@ -61,6 +61,11 @@ def subscriber( # type: ignore[override] reply_config: Annotated[ Optional["ReplyConfig"], Doc("Extra options to use at replies publishing."), + deprecated( + "Deprecated in **FastStream 0.5.16**. " + "Please, use `RabbitResponse` object as a handler return instead. " + "Argument will be removed in **FastStream 0.6.0**." + ), ] = None, # broker arguments dependencies: Annotated[ diff --git a/faststream/rabbit/fastapi/router.py b/faststream/rabbit/fastapi/router.py index d0445badfb..71342b8787 100644 --- a/faststream/rabbit/fastapi/router.py +++ b/faststream/rabbit/fastapi/router.py @@ -489,6 +489,11 @@ def subscriber( # type: ignore[override] reply_config: Annotated[ Optional["ReplyConfig"], Doc("Extra options to use at replies publishing."), + deprecated( + "Deprecated in **FastStream 0.5.16**. " + "Please, use `RabbitResponse` object as a handler return instead. " + "Argument will be removed in **FastStream 0.6.0**." + ), ] = None, # broker arguments dependencies: Annotated[ diff --git a/faststream/rabbit/response.py b/faststream/rabbit/response.py index aea4457337..c145f295dd 100644 --- a/faststream/rabbit/response.py +++ b/faststream/rabbit/response.py @@ -1,5 +1,64 @@ +from typing import TYPE_CHECKING, Optional + +from typing_extensions import override + from faststream.broker.response import Response +if TYPE_CHECKING: + from aio_pika.abc import DateType, TimeoutType + + from faststream.rabbit.types import AioPikaSendableMessage + from faststream.types import AnyDict + class RabbitResponse(Response): - pass + def __init__( + self, + body: "AioPikaSendableMessage", + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + message_id: Optional[str] = None, + mandatory: bool = True, + immediate: bool = False, + timeout: "TimeoutType" = None, + persist: Optional[bool] = None, + priority: Optional[int] = None, + message_type: Optional[str] = None, + content_type: Optional[str] = None, + expiration: Optional["DateType"] = None, + content_encoding: Optional[str] = None, + ) -> None: + super().__init__( + body=body, + headers=headers, + correlation_id=correlation_id, + ) + + self.message_id = message_id + self.mandatory = mandatory + self.immediate = immediate + self.timeout = timeout + self.persist = persist + self.priority = priority + self.message_type = message_type + self.content_type = content_type + self.expiration = expiration + self.content_encoding = content_encoding + + @override + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + **super().as_publish_kwargs(), + "message_id": self.message_id, + "mandatory": self.mandatory, + "immediate": self.immediate, + "timeout": self.timeout, + "persist": self.persist, + "priority": self.priority, + "message_type": self.message_type, + "content_type": self.content_type, + "expiration": self.expiration, + "content_encoding": self.content_encoding, + } + return publish_options diff --git a/faststream/rabbit/router.py b/faststream/rabbit/router.py index 18f6747b9d..7e1986c8c6 100644 --- a/faststream/rabbit/router.py +++ b/faststream/rabbit/router.py @@ -213,6 +213,11 @@ def __init__( reply_config: Annotated[ Optional["ReplyConfig"], Doc("Extra options to use at replies publishing."), + deprecated( + "Deprecated in **FastStream 0.5.16**. " + "Please, use `RabbitResponse` object as a handler return instead. " + "Argument will be removed in **FastStream 0.6.0**." + ), ] = None, # broker arguments dependencies: Annotated[ diff --git a/faststream/rabbit/subscriber/factory.py b/faststream/rabbit/subscriber/factory.py index 1a185cafe6..e3a79bb3a7 100644 --- a/faststream/rabbit/subscriber/factory.py +++ b/faststream/rabbit/subscriber/factory.py @@ -1,3 +1,4 @@ +import warnings from typing import TYPE_CHECKING, Iterable, Optional, Union from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber @@ -28,6 +29,17 @@ def create_subscriber( description_: Optional[str], include_in_schema: bool, ) -> AsyncAPISubscriber: + if reply_config: # pragma: no cover + warnings.warn( + ( + "\n`reply_config` was deprecated in **FastStream 0.5.16**." + "\nPlease, use `RabbitResponse` object as a handler return instead." + "\nArgument will be removed in **FastStream 0.6.0**." + ), + DeprecationWarning, + stacklevel=2, + ) + return AsyncAPISubscriber( queue=queue, exchange=exchange, diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index aebfc1add9..0eba0f8f5f 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -13,6 +13,7 @@ ) from urllib.parse import urlparse +import anyio from anyio import move_on_after from redis.asyncio.client import Redis from redis.asyncio.connection import ( @@ -22,6 +23,7 @@ Encoder, parse_url, ) +from redis.exceptions import ConnectionError from typing_extensions import Annotated, Doc, TypeAlias, override from faststream.__about__ import __version__ @@ -488,4 +490,8 @@ async def ping(self, timeout: Optional[float]) -> bool: if self._connection is None: return False - return await self._connection.ping() + while True: + try: + return await self._connection.ping() + except ConnectionError: # noqa: PERF203 + await anyio.sleep(0.1) diff --git a/faststream/redis/publisher/usecase.py b/faststream/redis/publisher/usecase.py index 726506985d..90cef8b883 100644 --- a/faststream/redis/publisher/usecase.py +++ b/faststream/redis/publisher/usecase.py @@ -151,6 +151,7 @@ async def publish( # type: ignore[override] Iterable["PublisherMiddleware"], Doc("Extra middlewares to wrap publishing process."), ] = (), + **kwargs: Any, # option to suppress maxlen ) -> Optional[Any]: assert self._producer, NOT_CONNECTED_YET # nosec B101 @@ -277,6 +278,7 @@ async def publish( # type: ignore[override] Iterable["PublisherMiddleware"], Doc("Extra middlewares to wrap publishing process."), ] = (), + **kwargs: Any, # option to suppress maxlen ) -> Any: assert self._producer, NOT_CONNECTED_YET # nosec B101 @@ -335,6 +337,7 @@ async def publish( # type: ignore[override] Iterable["PublisherMiddleware"], Doc("Extra middlewares to wrap publishing process."), ] = (), + **kwargs: Any, # option to suppress maxlen ) -> None: assert self._producer, NOT_CONNECTED_YET # nosec B101 diff --git a/faststream/redis/response.py b/faststream/redis/response.py index d414b0ebe4..9656fbc7b3 100644 --- a/faststream/redis/response.py +++ b/faststream/redis/response.py @@ -1,5 +1,33 @@ +from typing import TYPE_CHECKING, Optional + +from typing_extensions import override + from faststream.broker.response import Response +if TYPE_CHECKING: + from faststream.types import AnyDict, SendableMessage + class RedisResponse(Response): - pass + def __init__( + self, + body: Optional["SendableMessage"] = None, + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + maxlen: Optional[int] = None, + ) -> None: + super().__init__( + body=body, + headers=headers, + correlation_id=correlation_id, + ) + self.maxlen = maxlen + + @override + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + **super().as_publish_kwargs(), + "maxlen": self.maxlen, + } + return publish_options diff --git a/tests/brokers/confluent/test_publish.py b/tests/brokers/confluent/test_publish.py index 0fed589efb..71a48b94d2 100644 --- a/tests/brokers/confluent/test_publish.py +++ b/tests/brokers/confluent/test_publish.py @@ -1,9 +1,11 @@ import asyncio from typing import Any, ClassVar, Dict +from unittest.mock import Mock import pytest -from faststream.confluent import KafkaBroker +from faststream import Context +from faststream.confluent import KafkaBroker, KafkaResponse from tests.brokers.base.publish import BrokerPublishTestcase @@ -96,3 +98,36 @@ async def pub(m): ) assert {1, "hi"} == {r.result() for r in result} + + @pytest.mark.asyncio() + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(queue, **self.subscriber_kwargs) + @pub_broker.publisher(topic=queue + "1") + async def handle(): + return KafkaResponse(1) + + @pub_broker.subscriber(queue + "1", **self.subscriber_kwargs) + async def handle_next(msg=Context("message")): + mock(body=msg.body) + event.set() + + async with self.patch_broker(pub_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout * 1.5, + ) + + assert event.is_set() + mock.assert_called_once_with(body=b"1") diff --git a/tests/brokers/kafka/test_publish.py b/tests/brokers/kafka/test_publish.py index e913e3c638..86280125a5 100644 --- a/tests/brokers/kafka/test_publish.py +++ b/tests/brokers/kafka/test_publish.py @@ -1,8 +1,10 @@ import asyncio +from unittest.mock import Mock import pytest -from faststream.kafka import KafkaBroker +from faststream import Context +from faststream.kafka import KafkaBroker, KafkaResponse from tests.brokers.base.publish import BrokerPublishTestcase @@ -92,3 +94,42 @@ async def pub(m): ) assert {1, "hi"} == {r.result() for r in result} + + @pytest.mark.asyncio() + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(queue) + @pub_broker.publisher(queue + "1") + async def handle(): + return KafkaResponse(1, key=b"1") + + @pub_broker.subscriber(queue + "1") + async def handle_next(msg=Context("message")): + mock( + body=msg.body, + key=msg.raw_message.key, + ) + event.set() + + async with self.patch_broker(pub_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with( + body=b"1", + key=b"1", + ) diff --git a/tests/brokers/nats/test_publish.py b/tests/brokers/nats/test_publish.py index 0f4aa8581d..ed9b272dcb 100644 --- a/tests/brokers/nats/test_publish.py +++ b/tests/brokers/nats/test_publish.py @@ -1,6 +1,10 @@ +import asyncio +from unittest.mock import Mock + import pytest -from faststream.nats import NatsBroker +from faststream import Context +from faststream.nats import NatsBroker, NatsResponse from tests.brokers.base.publish import BrokerPublishTestcase @@ -10,3 +14,42 @@ class TestPublish(BrokerPublishTestcase): def get_broker(self, apply_types: bool = False) -> NatsBroker: return NatsBroker(apply_types=apply_types) + + @pytest.mark.asyncio() + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(queue) + @pub_broker.publisher(queue + "1") + async def handle(): + return NatsResponse(1, correlation_id="1") + + @pub_broker.subscriber(queue + "1") + async def handle_next(msg=Context("message")): + mock( + body=msg.body, + correlation_id=msg.correlation_id, + ) + event.set() + + async with self.patch_broker(pub_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with( + body=b"1", + correlation_id="1", + ) diff --git a/tests/brokers/rabbit/test_publish.py b/tests/brokers/rabbit/test_publish.py index 7e1986246e..b43129433f 100644 --- a/tests/brokers/rabbit/test_publish.py +++ b/tests/brokers/rabbit/test_publish.py @@ -3,7 +3,8 @@ import pytest -from faststream.rabbit import RabbitBroker, ReplyConfig +from faststream import Context +from faststream.rabbit import RabbitBroker, RabbitResponse, ReplyConfig from faststream.rabbit.publisher.producer import AioPikaFastProducer from tests.brokers.base.publish import BrokerPublishTestcase from tests.tools import spy_decorator @@ -30,9 +31,11 @@ async def reply_handler(m): event.set() mock(m) - @pub_broker.subscriber(queue, reply_config=ReplyConfig(persist=True)) - async def handler(m): - return m + with pytest.warns(DeprecationWarning): + + @pub_broker.subscriber(queue, reply_config=ReplyConfig(persist=True)) + async def handler(m): + return m async with self.patch_broker(pub_broker) as br: with patch.object( @@ -57,3 +60,47 @@ async def handler(m): assert event.is_set() mock.assert_called_with("Hello!") + + @pytest.mark.asyncio() + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(queue) + @pub_broker.publisher(queue + "1") + async def handle(): + return RabbitResponse( + 1, + persist=True, + ) + + @pub_broker.subscriber(queue + "1") + async def handle_next(msg=Context("message")): + mock(body=msg.body) + event.set() + + async with self.patch_broker(pub_broker) as br: + with patch.object( + AioPikaFastProducer, + "publish", + spy_decorator(AioPikaFastProducer.publish), + ) as m: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + + assert m.mock.call_args.kwargs.get("persist") + + mock.assert_called_once_with(body=b"1") diff --git a/tests/brokers/redis/test_publish.py b/tests/brokers/redis/test_publish.py index 2c1f2b96ff..6c8557af2a 100644 --- a/tests/brokers/redis/test_publish.py +++ b/tests/brokers/redis/test_publish.py @@ -4,7 +4,8 @@ import pytest from redis.asyncio import Redis -from faststream.redis import ListSub, RedisBroker, StreamSub +from faststream import Context +from faststream.redis import ListSub, RedisBroker, RedisResponse, StreamSub from tests.brokers.base.publish import BrokerPublishTestcase from tests.tools import spy_decorator @@ -144,3 +145,41 @@ async def resp(msg): mock.assert_called_once_with("hi") assert m.mock.call_args_list[-1].kwargs["maxlen"] == 1 + + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: MagicMock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(list=queue) + @pub_broker.publisher(list=queue + "resp") + async def m(): + return RedisResponse(1, correlation_id="1") + + @pub_broker.subscriber(list=queue + "resp") + async def resp(msg=Context("message")): + mock( + body=msg.body, + correlation_id=msg.correlation_id, + ) + event.set() + + async with self.patch_broker(pub_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("", list=queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with( + body=b"1", + correlation_id="1", + ) diff --git a/tests/brokers/test_response.py b/tests/brokers/test_response.py index 3766b28f2b..785e0a21cf 100644 --- a/tests/brokers/test_response.py +++ b/tests/brokers/test_response.py @@ -1,14 +1,14 @@ -from faststream.broker.response import Response +from faststream.broker.response import Response, ensure_response def test_raw_data(): - resp = Response(1) + resp = ensure_response(1) assert resp.body == 1 assert resp.headers == {} def test_response_with_response_instance(): - resp = Response(Response(1, headers={"some": 1})) + resp = ensure_response(Response(1, headers={"some": 1})) assert resp.body == 1 assert resp.headers == {"some": 1} diff --git a/tests/docs/redis/test_security.py b/tests/docs/redis/test_security.py index f12a0cfc93..02963fbae8 100644 --- a/tests/docs/redis/test_security.py +++ b/tests/docs/redis/test_security.py @@ -35,7 +35,7 @@ async def test_base_security(): from docs.docs_src.redis.security.basic import broker async with broker: - await broker._connection.ping() + await broker.ping(3.0) assert connection.call_args.kwargs["ssl"]