From f17ce3db2dd75989946c617ac86e01c94d614015 Mon Sep 17 00:00:00 2001 From: Vladimir Kibisov Date: Wed, 28 Aug 2024 23:56:44 +0300 Subject: [PATCH] Redis subscriber get_one tests --- faststream/confluent/subscriber/usecase.py | 2 +- faststream/kafka/subscriber/usecase.py | 2 +- faststream/redis/subscriber/usecase.py | 8 +- tests/brokers/confluent/test_consume.py | 7 +- tests/brokers/kafka/test_consume.py | 4 +- tests/brokers/rabbit/test_consume.py | 4 +- tests/brokers/redis/test_consume.py | 170 +++++++++++++++++++++ 7 files changed, 184 insertions(+), 13 deletions(-) diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index c35c4df014..24ab4165d5 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -171,7 +171,7 @@ async def close(self) -> None: self.task = None - async def get_one(self, timeout: float = 5.0) -> "Optional[KafkaMessage]": + async def get_one(self, *, timeout: float = 5.0) -> "Optional[KafkaMessage]": assert self.consumer, "You should start subscriber at first." assert ( # nosec B101 not self.calls diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 30cb52f460..569f535d4c 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -182,7 +182,7 @@ async def close(self) -> None: self.task = None - async def get_one(self, timeout: float = 5) -> "Optional[KafkaMessage]": + async def get_one(self, *, timeout: float = 5.0,) -> "Optional[KafkaMessage]": assert self.consumer, "You should start subscriber at first." assert ( # nosec B101 not self.calls diff --git a/faststream/redis/subscriber/usecase.py b/faststream/redis/subscriber/usecase.py index bdcfcb580d..071d533797 100644 --- a/faststream/redis/subscriber/usecase.py +++ b/faststream/redis/subscriber/usecase.py @@ -801,9 +801,9 @@ async def get_one( not self.calls ), "You can't use `get_one` method if subscriber has registered handlers." - stream_message = await self._client.xread( + stream_message = await self._client.xread( {self.stream_sub.name: self.last_id}, - block=timeout * 100000, + block=timeout * 1000, count=1, ) @@ -914,9 +914,9 @@ async def get_one( not self.calls ), "You can't use `get_one` method if subscriber has registered handlers." - stream_message = await self._client.xread( + stream_message = await self._client.xread( {self.stream_sub.name: self.last_id}, - block=timeout * 100000, + block=timeout * 1000, count=1, ) diff --git a/tests/brokers/confluent/test_consume.py b/tests/brokers/confluent/test_consume.py index 5e6ced37e0..fa07ff323c 100644 --- a/tests/brokers/confluent/test_consume.py +++ b/tests/brokers/confluent/test_consume.py @@ -328,17 +328,18 @@ async def subscriber_with_auto_commit(m): async def test_get_one( self, queue: str, + event: asyncio.Event, ): broker = self.get_broker(apply_types=True) subscriber = broker.subscriber(queue) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = None async def consume(): nonlocal message - message = await subscriber.get_one(5) + message = await subscriber.get_one(timeout=5) async def publish(): await asyncio.sleep(3) @@ -364,7 +365,7 @@ async def test_get_one_timeout( subscriber = broker.subscriber(queue) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = object() async def coro(): diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index d354c966cd..12cd5101d1 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -322,7 +322,7 @@ async def test_get_one( subscriber = broker.subscriber(queue) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = None async def set_msg(): @@ -349,7 +349,7 @@ async def test_get_one_timeout( subscriber = broker.subscriber(queue) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = object() async def coro(): diff --git a/tests/brokers/rabbit/test_consume.py b/tests/brokers/rabbit/test_consume.py index c75e2aaeac..2c3cf23987 100644 --- a/tests/brokers/rabbit/test_consume.py +++ b/tests/brokers/rabbit/test_consume.py @@ -394,7 +394,7 @@ async def test_get_one( subscriber = broker.subscriber(queue, exchange=exchange) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = None async def set_msg(): @@ -422,7 +422,7 @@ async def test_get_one_timeout( subscriber = broker.subscriber(queue, exchange=exchange) async with self.patch_broker(broker) as br: - await broker.start() + await br.start() message = object() async def coro(): diff --git a/tests/brokers/redis/test_consume.py b/tests/brokers/redis/test_consume.py index bda2af699e..2e1be6ff43 100644 --- a/tests/brokers/redis/test_consume.py +++ b/tests/brokers/redis/test_consume.py @@ -92,6 +92,63 @@ async def handler(msg): mock.assert_called_once_with("hello") + @pytest.mark.asyncio + async def test_get_one( + self, + queue: str, + event: asyncio.Event, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = None + async def consume(): + nonlocal message + message = await subscriber.get_one(timeout=5) + + async def publish(): + await asyncio.sleep(0.5) + await br.publish("test_message", queue) + + await asyncio.wait( + ( + asyncio.create_task(consume()), + asyncio.create_task(publish()), + ), + timeout=10 + ) + + assert message is not None + assert await message.decode() == "test_message" + + @pytest.mark.asyncio + async def test_get_one_timeout( + self, + queue: str, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = object() + async def coro(): + nonlocal message + message = await subscriber.get_one(timeout=1) + + await asyncio.wait( + ( + asyncio.create_task(coro()), + ), + timeout=3 + ) + + assert message is None + @pytest.mark.redis @pytest.mark.asyncio @@ -311,6 +368,63 @@ async def handler(msg): assert [{1, "hi"}] == [set(r.result()) for r in result] + @pytest.mark.asyncio + async def test_get_one( + self, + queue: str, + event: asyncio.Event, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(list=queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = None + async def consume(): + nonlocal message + message = await subscriber.get_one(timeout=5) + + async def publish(): + await asyncio.sleep(0.5) + await br.publish("test_message", list=queue) + + await asyncio.wait( + ( + asyncio.create_task(consume()), + asyncio.create_task(publish()), + ), + timeout=10 + ) + + assert message is not None + assert await message.decode() == "test_message" + + @pytest.mark.asyncio + async def test_get_one_timeout( + self, + queue: str, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(list=queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = object() + async def coro(): + nonlocal message + message = await subscriber.get_one(timeout=1) + + await asyncio.wait( + ( + asyncio.create_task(coro()), + ), + timeout=3 + ) + + assert message is None + @pytest.mark.redis @pytest.mark.asyncio @@ -592,3 +706,59 @@ async def handler(msg: RedisMessage): m.mock.assert_called_once() assert event.is_set() + + @pytest.mark.asyncio + async def test_get_one( + self, + queue: str, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(stream=queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = None + async def consume(): + nonlocal message + message = await subscriber.get_one(timeout=3) + + async def publish(): + await asyncio.sleep(0.5) + await br.publish("test_message", stream=queue) + + await asyncio.wait( + ( + asyncio.create_task(consume()), + asyncio.create_task(publish()), + ), + timeout=10 + ) + + assert message is not None + assert await message.decode() == "test_message" + + @pytest.mark.asyncio + async def test_get_one_timeout( + self, + queue: str, + ): + broker = self.get_broker(apply_types=True) + subscriber = broker.subscriber(stream=queue) + + async with self.patch_broker(broker) as br: + await br.start() + + message = object() + async def coro(): + nonlocal message + message = await subscriber.get_one(timeout=1) + + await asyncio.wait( + ( + asyncio.create_task(coro()), + ), + timeout=3 + ) + + assert message is None