Skip to content

Commit

Permalink
refactor: remove RMQ publish rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Sep 13, 2024
1 parent fba8e91 commit 1662627
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 147 deletions.
3 changes: 1 addition & 2 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@

from faststream._internal.basic_types import (
AnyDict,
DecodedMessage,
Decorator,
LoggerProto,
SendableMessage,
Expand Down Expand Up @@ -712,7 +711,7 @@ async def publish( # type: ignore[override]
Optional[float],
Doc("Timeout to send message to NATS."),
] = None,
) -> Optional["DecodedMessage"]:
) -> None:
"""Publish message directly.
This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def publish( # type: ignore[override]
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
**kwargs: Any, # suprress stream option
) -> Optional[Any]:
) -> None:
payload, content_type = encode_message(message)

headers_to_send = {
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def publish(
timeout: Optional[float] = None,
# publisher specific
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
) -> None:
"""Publish message directly.
Args:
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def publish( # type: ignore[override]
# NatsJSFastProducer compatibility
timeout: Optional[float] = None,
stream: Optional[str] = None,
) -> Any:
) -> None:
incoming = build_message(
message=message,
subject=subject,
Expand Down
38 changes: 3 additions & 35 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import anyio
from aio_pika import connect_robust
from typing_extensions import Annotated, Doc, deprecated, override
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
from faststream._internal.constants import EMPTY
Expand All @@ -36,6 +36,7 @@
from ssl import SSLContext
from types import TracebackType

import aiormq
from aio_pika import (
IncomingMessage,
RobustChannel,
Expand Down Expand Up @@ -570,36 +571,6 @@ async def publish( # type: ignore[override]
"Reply message routing key to send with (always sending to default exchange)."
),
] = None,
rpc: Annotated[
bool,
Doc("Whether to wait for reply in blocking mode."),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"Please, use `request` method instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = False,
rpc_timeout: Annotated[
Optional[float],
Doc("RPC reply waiting time."),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"Please, use `request` method with `timeout` instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = 30.0,
raise_timeout: Annotated[
bool,
Doc(
"Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
"RPC request returns `None` at timeout by default."
),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"`request` always raises TimeoutError instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = False,
# message args
correlation_id: Annotated[
Optional[str],
Expand Down Expand Up @@ -648,7 +619,7 @@ async def publish( # type: ignore[override]
Optional[int],
Doc("The message priority (0 by default)."),
] = None,
) -> Optional[Any]:
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
"""Publish message directly.
This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
Expand Down Expand Up @@ -680,9 +651,6 @@ async def publish( # type: ignore[override]
user_id=user_id,
timeout=timeout,
priority=priority,
rpc=rpc,
rpc_timeout=rpc_timeout,
raise_timeout=raise_timeout,
)

@override
Expand Down
82 changes: 23 additions & 59 deletions faststream/rabbit/publisher/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Optional,
Type,
Union,
Expand All @@ -13,8 +11,6 @@

from faststream._internal.publisher.proto import ProducerProto
from faststream._internal.subscriber.utils import resolve_custom_func
from faststream._internal.utils.functions import fake_context, timeout_scope
from faststream.exceptions import WRONG_PUBLISH_ARGS
from faststream.rabbit.parser import AioPikaParser
from faststream.rabbit.schemas import RABBIT_REPLY, RabbitExchange

Expand All @@ -26,7 +22,6 @@
from aio_pika.abc import AbstractIncomingMessage, DateType, HeadersType, TimeoutType
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from faststream._internal.basic_types import SendableMessage
from faststream._internal.types import (
AsyncCallable,
CustomCallable,
Expand Down Expand Up @@ -67,9 +62,6 @@ async def publish( # type: ignore[override]
mandatory: bool = True,
immediate: bool = False,
timeout: "TimeoutType" = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
persist: bool = False,
reply_to: Optional[str] = None,
headers: Optional["HeadersType"] = None,
Expand All @@ -82,57 +74,29 @@ async def publish( # type: ignore[override]
message_type: Optional[str] = None,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
) -> Optional[Any]:
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
"""Publish a message to a RabbitMQ queue."""
context: AsyncContextManager[
Optional[MemoryObjectReceiveStream[IncomingMessage]]
]
if rpc:
if reply_to is not None:
raise WRONG_PUBLISH_ARGS

context = _RPCCallback(
self._rpc_lock,
await self.declarer.declare_queue(RABBIT_REPLY),
)
else:
context = fake_context()

async with context as response_queue:
r = await self._publish(
message=message,
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
timeout=timeout,
persist=persist,
reply_to=reply_to if response_queue is None else RABBIT_REPLY.name,
headers=headers,
content_type=content_type,
content_encoding=content_encoding,
priority=priority,
correlation_id=correlation_id,
expiration=expiration,
message_id=message_id,
timestamp=timestamp,
message_type=message_type,
user_id=user_id,
app_id=app_id,
)

if response_queue is None:
return r

else:
msg: Optional[IncomingMessage] = None
with timeout_scope(rpc_timeout, raise_timeout):
msg = await response_queue.receive()

if msg: # pragma: no branch
return await self._decoder(await self._parser(msg))

return None
return await self._publish(
message=message,
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
timeout=timeout,
persist=persist,
reply_to=reply_to,
headers=headers,
content_type=content_type,
content_encoding=content_encoding,
priority=priority,
correlation_id=correlation_id,
expiration=expiration,
message_id=message_id,
timestamp=timestamp,
message_type=message_type,
user_id=user_id,
app_id=app_id,
)

@override
async def request( # type: ignore[override]
Expand Down Expand Up @@ -208,7 +172,7 @@ async def _publish(
message_type: Optional[str],
user_id: Optional[str],
app_id: Optional[str],
) -> Union["aiormq.abc.ConfirmationFrameType", "SendableMessage"]:
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
"""Publish a message to a RabbitMQ exchange."""
message = AioPikaParser.encode_message(
message=message,
Expand Down
39 changes: 3 additions & 36 deletions faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)

from aio_pika import IncomingMessage
from typing_extensions import Annotated, Doc, TypedDict, Unpack, deprecated, override
from typing_extensions import Annotated, Doc, TypedDict, Unpack, override

from faststream._internal.publisher.usecase import PublisherUsecase
from faststream._internal.utils.functions import return_input
Expand All @@ -23,6 +23,7 @@
from faststream.rabbit.subscriber.usecase import LogicSubscriber

if TYPE_CHECKING:
import aiormq
from aio_pika.abc import DateType, HeadersType, TimeoutType

from faststream._internal.basic_types import AnyDict, AsyncFunc
Expand Down Expand Up @@ -216,44 +217,13 @@ async def publish( # type: ignore[override]
Optional["DateType"],
Doc("Message publish timestamp. Generated automatically if not presented."),
] = None,
# rpc args
rpc: Annotated[
bool,
Doc("Whether to wait for reply in blocking mode."),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"Please, use `request` method instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = False,
rpc_timeout: Annotated[
Optional[float],
Doc("RPC reply waiting time."),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"Please, use `request` method with `timeout` instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = 30.0,
raise_timeout: Annotated[
bool,
Doc(
"Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
"RPC request returns `None` at timeout by default."
),
deprecated(
"Deprecated in **FastStream 0.5.17**. "
"`request` always raises TimeoutError instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = False,
# publisher specific
_extra_middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
**publish_kwargs: "Unpack[PublishKwargs]",
) -> Optional[Any]:
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
assert self._producer, NOT_CONNECTED_YET # nosec B101

kwargs: AnyDict = {
Expand All @@ -266,9 +236,6 @@ async def publish( # type: ignore[override]
"message_id": message_id,
"timestamp": timestamp,
# specific args
"rpc": rpc,
"rpc_timeout": rpc_timeout,
"raise_timeout": raise_timeout,
"reply_to": self.reply_to,
**self.message_kwargs,
**publish_kwargs,
Expand Down
14 changes: 2 additions & 12 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@

from faststream._internal.subscriber.utils import resolve_custom_func
from faststream._internal.testing.broker import TestBroker
from faststream._internal.utils.functions import timeout_scope
from faststream.exceptions import WRONG_PUBLISH_ARGS, SubscriberNotFound
from faststream.exceptions import SubscriberNotFound
from faststream.message import gen_cor_id
from faststream.rabbit.broker.broker import RabbitBroker
from faststream.rabbit.parser import AioPikaParser
Expand Down Expand Up @@ -200,9 +199,6 @@ async def publish( # type: ignore[override]
mandatory: bool = True,
immediate: bool = False,
timeout: "TimeoutType" = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
persist: bool = False,
reply_to: Optional[str] = None,
headers: Optional["HeadersType"] = None,
Expand All @@ -219,9 +215,6 @@ async def publish( # type: ignore[override]
"""Publish a message to a RabbitMQ queue or exchange."""
exch = RabbitExchange.validate(exchange)

if rpc and reply_to:
raise WRONG_PUBLISH_ARGS

incoming = build_message(
message=message,
exchange=exch,
Expand All @@ -245,10 +238,7 @@ async def publish( # type: ignore[override]
if _is_handler_suitable(
handler, incoming.routing_key, incoming.headers, exch
):
with timeout_scope(rpc_timeout, raise_timeout):
response = await self._execute_handler(incoming, handler)
if rpc:
return await self._decoder(await self._parser(response))
await self._execute_handler(incoming, handler)

return None

Expand Down

0 comments on commit 1662627

Please sign in to comment.