Skip to content

Commit

Permalink
refactor: change publisher fake subscriber generation logic (#1729)
Browse files Browse the repository at this point in the history
* refactor: change publisher fake subscriber generation logic

* docs: generate API References

* lint: allow return Any in exception middleware

* lint: fix precommit

* lint: fix precommit

---------

Co-authored-by: Lancetnik <[email protected]>
Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent 65ebc3c commit 72de4ec
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 187 deletions.
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2024-08-25T10:14:51Z"
"generated_at": "2024-08-25T12:22:17Z"
}
8 changes: 3 additions & 5 deletions faststream/broker/middlewares/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
from types import TracebackType

from faststream.broker.message import StreamMessage
from faststream.types import AsyncFuncAny, SendableMessage
from faststream.types import AsyncFuncAny


GeneralExceptionHandler: TypeAlias = Callable[..., None]
PublishingExceptionHandler: TypeAlias = Callable[..., "SendableMessage"]
PublishingExceptionHandler: TypeAlias = Callable[..., "Any"]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[
..., Awaitable["SendableMessage"]
]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable["Any"]]
CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler]
CastedPublishingHandlers: TypeAlias = Dict[
Type[Exception], CastedPublishingExceptionHandler
Expand Down
25 changes: 7 additions & 18 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from faststream.utils.functions import timeout_scope

if TYPE_CHECKING:
from faststream.broker.wrapper.call import HandlerCallWrapper
from faststream.confluent.publisher.asyncapi import AsyncAPIPublisher
from faststream.confluent.subscriber.usecase import LogicSubscriber
from faststream.types import SendableMessage
Expand All @@ -42,8 +41,8 @@ async def _fake_connect( # type: ignore[override]
def create_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "AsyncAPIPublisher[Any]",
) -> "HandlerCallWrapper[Any, Any, Any]":
sub: Optional[Any] = None
) -> Tuple["LogicSubscriber[Any]", bool]:
sub: Optional[LogicSubscriber[Any]] = None
for handler in broker._subscribers.values():
if _is_handler_matches(
handler, topic=publisher.topic, partition=publisher.partition
Expand All @@ -52,6 +51,8 @@ def create_publisher_fake_subscriber(
break

if sub is None:
is_real = False

if publisher.partition:
tp = TopicPartition(
topic=publisher.topic, partition=publisher.partition
Expand All @@ -68,22 +69,10 @@ def create_publisher_fake_subscriber(
auto_offset_reset="earliest",
)

if not sub.calls:

@sub # type: ignore[misc]
async def publisher_response_subscriber(msg: Any) -> None:
pass

broker.setup_subscriber(sub)

return sub.calls[0].handler
else:
is_real = True

@staticmethod
def remove_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "AsyncAPIPublisher[Any]",
) -> None:
broker._subscribers.pop(hash(publisher), None)
return sub, is_real


class FakeProducer(AsyncConfluentFastProducer):
Expand Down
55 changes: 26 additions & 29 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple
from unittest.mock import AsyncMock, MagicMock

import anyio
Expand All @@ -21,7 +21,6 @@
from faststream.utils.functions import timeout_scope

if TYPE_CHECKING:
from faststream.broker.wrapper.call import HandlerCallWrapper
from faststream.kafka.publisher.asyncapi import AsyncAPIPublisher
from faststream.kafka.subscriber.usecase import LogicSubscriber
from faststream.types import SendableMessage
Expand All @@ -45,35 +44,33 @@ async def _fake_connect( # type: ignore[override]
def create_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "AsyncAPIPublisher[Any]",
) -> "HandlerCallWrapper[Any, Any, Any]":
if publisher.partition:
tp = TopicPartition(topic=publisher.topic, partition=publisher.partition)
sub = broker.subscriber(
partitions=[tp],
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)
) -> Tuple["LogicSubscriber[Any]", bool]:
sub: Optional[LogicSubscriber[Any]] = None
for handler in broker._subscribers.values():
if _is_handler_matches(handler, publisher.topic, publisher.partition):
sub = handler
break

if sub is None:
is_real = False

if publisher.partition:
tp = TopicPartition(
topic=publisher.topic, partition=publisher.partition
)
sub = broker.subscriber(
partitions=[tp],
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)
else:
sub = broker.subscriber(
publisher.topic,
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)
else:
sub = broker.subscriber(
publisher.topic,
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)

if not sub.calls:

@sub # type: ignore[misc]
async def publisher_response_subscriber(msg: Any) -> None:
pass

broker.setup_subscriber(sub)
is_real = True

return sub.calls[0].handler

@staticmethod
def remove_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "AsyncAPIPublisher[Any]",
) -> None:
broker._subscribers.pop(hash(publisher), None)
return sub, is_real


class FakeProducer(AioKafkaFastProducer):
Expand Down
40 changes: 17 additions & 23 deletions faststream/nats/testing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from unittest.mock import AsyncMock

import anyio
Expand All @@ -12,13 +12,12 @@
from faststream.nats.parser import NatsParser
from faststream.nats.publisher.producer import NatsFastProducer
from faststream.nats.schemas.js_stream import is_subject_match_wildcard
from faststream.nats.subscriber.usecase import LogicSubscriber
from faststream.testing.broker import TestBroker
from faststream.utils.functions import timeout_scope

if TYPE_CHECKING:
from faststream.broker.wrapper.call import HandlerCallWrapper
from faststream.nats.publisher.asyncapi import AsyncAPIPublisher
from faststream.nats.subscriber.usecase import LogicSubscriber
from faststream.types import AnyDict, SendableMessage

__all__ = ("TestNatsBroker",)
Expand All @@ -31,18 +30,21 @@ class TestNatsBroker(TestBroker[NatsBroker]):
def create_publisher_fake_subscriber(
broker: NatsBroker,
publisher: "AsyncAPIPublisher",
) -> "HandlerCallWrapper[Any, Any, Any]":
sub = broker.subscriber(publisher.subject)

if not sub.calls:

@sub
async def publisher_response_subscriber(msg: Any) -> None:
pass

broker.setup_subscriber(sub)

return sub.calls[0].handler
) -> Tuple["LogicSubscriber[Any]", bool]:
sub: Optional[LogicSubscriber[Any]] = None
publisher_stream = publisher.stream.name if publisher.stream else None
for handler in broker._subscribers.values():
if _is_handler_suitable(handler, publisher.subject, publisher_stream):
sub = handler
break

if sub is None:
is_real = False
sub = broker.subscriber(publisher.subject)
else:
is_real = True

return sub, is_real

@staticmethod
async def _fake_connect( # type: ignore[override]
Expand All @@ -56,14 +58,6 @@ async def _fake_connect( # type: ignore[override]
)
return AsyncMock()

@staticmethod
def remove_publisher_fake_subscriber(
broker: NatsBroker, publisher: "AsyncAPIPublisher"
) -> None:
broker._subscribers.pop(
LogicSubscriber.get_routing_hash(publisher.subject), None
)


class FakeProducer(NatsFastProducer):
def __init__(self, broker: NatsBroker) -> None:
Expand Down
56 changes: 24 additions & 32 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generator, Mapping, Optional, Union
from typing import TYPE_CHECKING, Any, Generator, Mapping, Optional, Tuple, Union
from unittest import mock
from unittest.mock import AsyncMock

Expand All @@ -22,14 +22,13 @@
RabbitExchange,
RabbitQueue,
)
from faststream.rabbit.subscriber.usecase import LogicSubscriber
from faststream.testing.broker import TestBroker
from faststream.utils.functions import timeout_scope

if TYPE_CHECKING:
from aio_pika.abc import DateType, HeadersType, TimeoutType

from faststream.broker.wrapper.call import HandlerCallWrapper
from faststream.rabbit.subscriber.usecase import LogicSubscriber
from faststream.rabbit.types import AioPikaSendableMessage


Expand All @@ -39,9 +38,8 @@
class TestRabbitBroker(TestBroker[RabbitBroker]):
"""A class to test RabbitMQ brokers."""

@classmethod
@contextmanager
def _patch_broker(cls, broker: RabbitBroker) -> Generator[None, None, None]:
def _patch_broker(self, broker: RabbitBroker) -> Generator[None, None, None]:
with mock.patch.object(
broker,
"_channel",
Expand All @@ -61,34 +59,28 @@ async def _fake_connect(broker: RabbitBroker, *args: Any, **kwargs: Any) -> None
def create_publisher_fake_subscriber(
broker: RabbitBroker,
publisher: AsyncAPIPublisher,
) -> "HandlerCallWrapper[Any, Any, Any]":
sub = broker.subscriber(
queue=publisher.routing,
exchange=publisher.exchange,
)

if not sub.calls:

@sub
async def publisher_response_subscriber(msg: Any) -> None:
pass

broker.setup_subscriber(sub)

return sub.calls[0].handler
) -> Tuple["LogicSubscriber", bool]:
sub: Optional[LogicSubscriber] = None
for handler in broker._subscribers.values():
if _is_handler_suitable(
handler,
publisher.routing,
{},
publisher.exchange,
):
sub = handler
break

if sub is None:
is_real = False
sub = broker.subscriber(
queue=publisher.routing,
exchange=publisher.exchange,
)
else:
is_real = True

@staticmethod
def remove_publisher_fake_subscriber(
broker: RabbitBroker,
publisher: AsyncAPIPublisher,
) -> None:
broker._subscribers.pop(
LogicSubscriber.get_routing_hash(
queue=RabbitQueue.validate(publisher.routing),
exchange=RabbitExchange.validate(publisher.exchange),
),
None,
)
return sub, is_real


class PatchedMessage(IncomingMessage):
Expand Down
25 changes: 14 additions & 11 deletions faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ def __init__(

self._producer = None

@property
@abstractmethod
def subscriber_property(self) -> "AnyDict":
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
raise NotImplementedError()


Expand Down Expand Up @@ -93,10 +92,10 @@ def __init__(
def __hash__(self) -> int:
return hash(f"publisher:pubsub:{self.channel.name}")

@property
def subscriber_property(self) -> "AnyDict":
@override
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
return {
"channel": self.channel,
"channel": self.channel.name if name_only else self.channel,
"list": None,
"stream": None,
}
Expand Down Expand Up @@ -306,11 +305,11 @@ def __init__(
def __hash__(self) -> int:
return hash(f"publisher:list:{self.list.name}")

@property
def subscriber_property(self) -> "AnyDict":
@override
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
return {
"channel": None,
"list": self.list,
"list": self.list.name if name_only else self.list,
"stream": None,
}

Expand Down Expand Up @@ -571,9 +570,13 @@ def __init__(
def __hash__(self) -> int:
return hash(f"publisher:stream:{self.stream.name}")

@property
def subscriber_property(self) -> "AnyDict":
return {"channel": None, "list": None, "stream": self.stream}
@override
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
return {
"channel": None,
"list": None,
"stream": self.stream.name if name_only else self.stream,
}

def add_prefix(self, prefix: str) -> None:
stream_sub = deepcopy(self.stream)
Expand Down
Loading

0 comments on commit 72de4ec

Please sign in to comment.