Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: extend response api #1607

Merged
merged 17 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ search:
- [PublisherUsecase](api/faststream/broker/publisher/usecase/PublisherUsecase.md)
- response
- [Response](api/faststream/broker/response/Response.md)
- [ensure_response](api/faststream/broker/response/ensure_response.md)
- router
- [ArgsContainer](api/faststream/broker/router/ArgsContainer.md)
- [BrokerRouter](api/faststream/broker/router/BrokerRouter.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/broker/response/ensure_response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.response.ensure_response
33 changes: 12 additions & 21 deletions faststream/broker/response.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage
from faststream.types import AnyDict


class Response:
def __new__(
cls,
body: Union[
"SendableMessage",
"Response",
],
**kwargs: Any,
) -> "Response":
"""Create a new instance of the class."""
if isinstance(body, cls):
return body

else:
return super().__new__(cls)

def __init__(
self,
body: "SendableMessage",
body: "Any",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
) -> None:
"""Initialize a handler."""
if not isinstance(body, Response):
self.body = body
self.headers = headers or {}
self.correlation_id = correlation_id
self.body = body
self.headers = headers or {}
self.correlation_id = correlation_id

def add_headers(
self,
Expand All @@ -50,3 +34,10 @@ def as_publish_kwargs(self) -> "AnyDict":
"correlation_id": self.correlation_id,
}
return publish_options


def ensure_response(response: Union["Response", "Any"]) -> "Response":
if isinstance(response, Response):
return response

return Response(response)
4 changes: 2 additions & 2 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from faststream.asyncapi.abc import AsyncAPIOperation
from faststream.asyncapi.message import parse_handler_params
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.response import Response
from faststream.broker.response import ensure_response
from faststream.broker.subscriber.call_item import HandlerItem
from faststream.broker.subscriber.proto import SubscriberProto
from faststream.broker.types import (
Expand Down Expand Up @@ -351,7 +351,7 @@ async def process_message(self, msg: MsgType) -> Any:
for m in middlewares:
stack.push_async_exit(m.__aexit__)

result_msg = Response(
result_msg = ensure_response(
await h.call(
message=message,
# consumer middlewares
Expand Down
34 changes: 33 additions & 1 deletion faststream/confluent/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class KafkaResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
34 changes: 33 additions & 1 deletion faststream/kafka/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class KafkaResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
30 changes: 29 additions & 1 deletion faststream/nats/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class NatsResponse(Response):
pass
def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
stream: Optional[str] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)
self.stream = stream

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"stream": self.stream,
}
return publish_options
5 changes: 5 additions & 0 deletions faststream/rabbit/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def subscriber( # type: ignore[override]
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
5 changes: 5 additions & 0 deletions faststream/rabbit/fastapi/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ def subscriber( # type: ignore[override]
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
61 changes: 60 additions & 1 deletion faststream/rabbit/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,64 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from aio_pika.abc import DateType, TimeoutType

from faststream.rabbit.types import AioPikaSendableMessage
from faststream.types import AnyDict


class RabbitResponse(Response):
pass
def __init__(
self,
body: "AioPikaSendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
message_id: Optional[str] = None,
mandatory: bool = True,
immediate: bool = False,
timeout: "TimeoutType" = None,
persist: Optional[bool] = None,
priority: Optional[int] = None,
message_type: Optional[str] = None,
content_type: Optional[str] = None,
expiration: Optional["DateType"] = None,
content_encoding: Optional[str] = None,
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.message_id = message_id
self.mandatory = mandatory
self.immediate = immediate
self.timeout = timeout
self.persist = persist
self.priority = priority
self.message_type = message_type
self.content_type = content_type
self.expiration = expiration
self.content_encoding = content_encoding

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
**super().as_publish_kwargs(),
"message_id": self.message_id,
"mandatory": self.mandatory,
"immediate": self.immediate,
"timeout": self.timeout,
"persist": self.persist,
"priority": self.priority,
"message_type": self.message_type,
"content_type": self.content_type,
"expiration": self.expiration,
"content_encoding": self.content_encoding,
}
return publish_options
5 changes: 5 additions & 0 deletions faststream/rabbit/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ def __init__(
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Expand Down
12 changes: 12 additions & 0 deletions faststream/rabbit/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import TYPE_CHECKING, Iterable, Optional, Union

from faststream.rabbit.subscriber.asyncapi import AsyncAPISubscriber
Expand Down Expand Up @@ -28,6 +29,17 @@ def create_subscriber(
description_: Optional[str],
include_in_schema: bool,
) -> AsyncAPISubscriber:
if reply_config: # pragma: no cover
warnings.warn(
(
"\n`reply_config` was deprecated in **FastStream 0.5.16**."
"\nPlease, use `RabbitResponse` object as a handler return instead."
"\nArgument will be removed in **FastStream 0.6.0**."
),
DeprecationWarning,
stacklevel=2,
)

return AsyncAPISubscriber(
queue=queue,
exchange=exchange,
Expand Down
8 changes: 7 additions & 1 deletion faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from urllib.parse import urlparse

import anyio
from anyio import move_on_after
from redis.asyncio.client import Redis
from redis.asyncio.connection import (
Expand All @@ -22,6 +23,7 @@
Encoder,
parse_url,
)
from redis.exceptions import ConnectionError
from typing_extensions import Annotated, Doc, TypeAlias, override

from faststream.__about__ import __version__
Expand Down Expand Up @@ -488,4 +490,8 @@ async def ping(self, timeout: Optional[float]) -> bool:
if self._connection is None:
return False

return await self._connection.ping()
while True:
try:
return await self._connection.ping()
except ConnectionError: # noqa: PERF203
await anyio.sleep(0.1)
3 changes: 3 additions & 0 deletions faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> Optional[Any]:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down Expand Up @@ -277,6 +278,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> Any:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down Expand Up @@ -335,6 +337,7 @@ async def publish( # type: ignore[override]
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**kwargs: Any, # option to suppress maxlen
) -> None:
assert self._producer, NOT_CONNECTED_YET # nosec B101

Expand Down
Loading
Loading