Skip to content

Commit

Permalink
Redis subscriber get_one tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KrySeyt committed Aug 28, 2024
1 parent 6d6b139 commit f17ce3d
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 13 deletions.
2 changes: 1 addition & 1 deletion faststream/confluent/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async def close(self) -> None:

self.task = None

async def get_one(self, timeout: float = 5.0) -> "Optional[KafkaMessage]":
async def get_one(self, *, timeout: float = 5.0) -> "Optional[KafkaMessage]":
assert self.consumer, "You should start subscriber at first."
assert ( # nosec B101
not self.calls
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def close(self) -> None:

self.task = None

async def get_one(self, timeout: float = 5) -> "Optional[KafkaMessage]":
async def get_one(self, *, timeout: float = 5.0,) -> "Optional[KafkaMessage]":
assert self.consumer, "You should start subscriber at first."
assert ( # nosec B101
not self.calls
Expand Down
8 changes: 4 additions & 4 deletions faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,9 @@ async def get_one(
not self.calls
), "You can't use `get_one` method if subscriber has registered handlers."

stream_message = await self._client.xread(
stream_message = await self._client.xread(
{self.stream_sub.name: self.last_id},
block=timeout * 100000,
block=timeout * 1000,
count=1,
)

Expand Down Expand Up @@ -914,9 +914,9 @@ async def get_one(
not self.calls
), "You can't use `get_one` method if subscriber has registered handlers."

stream_message = await self._client.xread(
stream_message = await self._client.xread(
{self.stream_sub.name: self.last_id},
block=timeout * 100000,
block=timeout * 1000,
count=1,
)

Expand Down
7 changes: 4 additions & 3 deletions tests/brokers/confluent/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,18 @@ async def subscriber_with_auto_commit(m):
async def test_get_one(
self,
queue: str,
event: asyncio.Event,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(5)
message = await subscriber.get_one(timeout=5)

async def publish():
await asyncio.sleep(3)
Expand All @@ -364,7 +365,7 @@ async def test_get_one_timeout(
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = object()
async def coro():
Expand Down
4 changes: 2 additions & 2 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ async def test_get_one(
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = None
async def set_msg():
Expand All @@ -349,7 +349,7 @@ async def test_get_one_timeout(
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = object()
async def coro():
Expand Down
4 changes: 2 additions & 2 deletions tests/brokers/rabbit/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ async def test_get_one(
subscriber = broker.subscriber(queue, exchange=exchange)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = None
async def set_msg():
Expand Down Expand Up @@ -422,7 +422,7 @@ async def test_get_one_timeout(
subscriber = broker.subscriber(queue, exchange=exchange)

async with self.patch_broker(broker) as br:
await broker.start()
await br.start()

message = object()
async def coro():
Expand Down
170 changes: 170 additions & 0 deletions tests/brokers/redis/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,63 @@ async def handler(msg):

mock.assert_called_once_with("hello")

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
event: asyncio.Event,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(timeout=5)

async def publish():
await asyncio.sleep(0.5)
await br.publish("test_message", queue)

await asyncio.wait(
(
asyncio.create_task(consume()),
asyncio.create_task(publish()),
),
timeout=10
)

assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await br.start()

message = object()
async def coro():
nonlocal message
message = await subscriber.get_one(timeout=1)

await asyncio.wait(
(
asyncio.create_task(coro()),
),
timeout=3
)

assert message is None


@pytest.mark.redis
@pytest.mark.asyncio
Expand Down Expand Up @@ -311,6 +368,63 @@ async def handler(msg):

assert [{1, "hi"}] == [set(r.result()) for r in result]

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
event: asyncio.Event,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(list=queue)

async with self.patch_broker(broker) as br:
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(timeout=5)

async def publish():
await asyncio.sleep(0.5)
await br.publish("test_message", list=queue)

await asyncio.wait(
(
asyncio.create_task(consume()),
asyncio.create_task(publish()),
),
timeout=10
)

assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(list=queue)

async with self.patch_broker(broker) as br:
await br.start()

message = object()
async def coro():
nonlocal message
message = await subscriber.get_one(timeout=1)

await asyncio.wait(
(
asyncio.create_task(coro()),
),
timeout=3
)

assert message is None


@pytest.mark.redis
@pytest.mark.asyncio
Expand Down Expand Up @@ -592,3 +706,59 @@ async def handler(msg: RedisMessage):
m.mock.assert_called_once()

assert event.is_set()

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(stream=queue)

async with self.patch_broker(broker) as br:
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(timeout=3)

async def publish():
await asyncio.sleep(0.5)
await br.publish("test_message", stream=queue)

await asyncio.wait(
(
asyncio.create_task(consume()),
asyncio.create_task(publish()),
),
timeout=10
)

assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(stream=queue)

async with self.patch_broker(broker) as br:
await br.start()

message = object()
async def coro():
nonlocal message
message = await subscriber.get_one(timeout=1)

await asyncio.wait(
(
asyncio.create_task(coro()),
),
timeout=3
)

assert message is None

0 comments on commit f17ce3d

Please sign in to comment.