Skip to content

Commit

Permalink
Nats core and JS get_one tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KrySeyt committed Aug 30, 2024
1 parent f17ce3d commit cdc5b49
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 7 deletions.
1 change: 0 additions & 1 deletion faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,6 @@ async def get_one(self, *, timeout: float = 5) -> Optional[NatsMessage]:
self.subscription = await self._connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
**self.extra_options,
)

try:
Expand Down
113 changes: 113 additions & 0 deletions tests/brokers/nats/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,116 @@ async def handler(filename: str):

assert event.is_set()
mock.assert_called_once_with("hello")

async def test_get_one(
self,
queue: str,
event: asyncio.Event,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(timeout=5)

async def publish():
await asyncio.sleep(0.5)
await br.publish("test_message", queue)

await asyncio.wait(
(
asyncio.create_task(consume()),
asyncio.create_task(publish()),
),
timeout=10
)

assert message is not None
assert await message.decode() == "test_message"

async def test_get_one_timeout(
self,
queue: str,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue)

async with self.patch_broker(broker) as br:
await br.start()

message = object()
async def coro():
nonlocal message
message = await subscriber.get_one(timeout=1)

await asyncio.wait(
(
asyncio.create_task(coro()),
),
timeout=3
)

assert message is None

async def test_get_one_js(
self,
queue: str,
event: asyncio.Event,
stream: JStream,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue, stream=stream)

async with self.patch_broker(broker) as br:
await br.start()

message = None
async def consume():
nonlocal message
message = await subscriber.get_one(timeout=5)

async def publish():
await asyncio.sleep(0.5)
await br.publish("test_message", queue, stream=stream.name)

await asyncio.wait(
(
asyncio.create_task(consume()),
asyncio.create_task(publish()),
),
timeout=10
)

assert message is not None
assert await message.decode() == "test_message"

async def test_get_one_timeout_js(
self,
queue: str,
stream: JStream,
):
broker = self.get_broker(apply_types=True)
subscriber = broker.subscriber(queue, stream=stream)

async with self.patch_broker(broker) as br:
await br.start()

message = object()
async def coro():
nonlocal message
message = await subscriber.get_one(timeout=1)

await asyncio.wait(
(
asyncio.create_task(coro()),
),
timeout=3
)

assert message is None

6 changes: 0 additions & 6 deletions tests/brokers/redis/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def handler(msg):

mock.assert_called_once_with("hello")

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
Expand Down Expand Up @@ -124,7 +123,6 @@ async def publish():
assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
Expand Down Expand Up @@ -368,7 +366,6 @@ async def handler(msg):

assert [{1, "hi"}] == [set(r.result()) for r in result]

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
Expand Down Expand Up @@ -400,7 +397,6 @@ async def publish():
assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
Expand Down Expand Up @@ -707,7 +703,6 @@ async def handler(msg: RedisMessage):

assert event.is_set()

@pytest.mark.asyncio
async def test_get_one(
self,
queue: str,
Expand Down Expand Up @@ -738,7 +733,6 @@ async def publish():
assert message is not None
assert await message.decode() == "test_message"

@pytest.mark.asyncio
async def test_get_one_timeout(
self,
queue: str,
Expand Down

0 comments on commit cdc5b49

Please sign in to comment.