diff --git a/faststream/nats/parser.py b/faststream/nats/parser.py index 25d61d4901..c3a1c32928 100644 --- a/faststream/nats/parser.py +++ b/faststream/nats/parser.py @@ -51,6 +51,10 @@ async def decode_message( class NatsParser(NatsBaseParser): """A class to parse NATS core messages.""" + def __init__(self, *, pattern: str, no_ack: bool) -> None: + super().__init__(pattern=pattern) + self.no_ack = no_ack + async def parse_message( self, message: "Msg", @@ -62,7 +66,8 @@ async def parse_message( headers = message.header or {} - message._ackd = True # prevent message from acking + if not self.no_ack: + message._ackd = True # prevent message from acking return NatsMessage( raw_message=message, diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index ffc4aedb70..eedb27932f 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -39,7 +39,7 @@ def __init__( ) -> None: self._connection = connection - default = NatsParser(pattern="") + default = NatsParser(pattern="", no_ack=False) self._parser = resolve_custom_func(parser, default.parse_message) self._decoder = resolve_custom_func(decoder, default.decode_message) @@ -141,7 +141,7 @@ def __init__( ) -> None: self._connection = connection - default = NatsParser(pattern="") + default = NatsParser(pattern="", no_ack=False) self._parser = resolve_custom_func(parser, default.parse_message) self._decoder = resolve_custom_func(decoder, default.decode_message) diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index 85c133770c..90074526e9 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -395,7 +395,7 @@ def __init__( description_: Optional[str], include_in_schema: bool, ) -> None: - parser_ = NatsParser(pattern=subject) + parser_ = NatsParser(pattern=subject, no_ack=no_ack) self.queue = queue diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index c742fb9e48..9b38e5ddc4 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -160,6 +160,32 @@ async def handler(msg: NatsMessage): assert event.is_set() + async def test_core_consume_no_ack( + self, + queue: str, + event: asyncio.Event, + stream: JStream, + ): + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber(queue, no_ack=True) + async def handler(msg: NatsMessage): + if not msg.raw_message._ackd: + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello", queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + async def test_consume_ack_manual( self, queue: str,