-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jestream pull_subscribe() not reconnecting after server reboot #504
Comments
I tried to create a new test under @async_long_test
async def test_pull_subscribe_reconnect(self):
srv = self.server_pool[0]
nc = NATS()
await nc.connect()
js = nc.jetstream()
await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])
for i in range(10):
ack = await js.publish("foo.1", f"Hello from NATS!".encode())
assert ack.stream == "TEST1"
assert ack.seq == i + 1
consumer = await js.pull_subscribe("foo.1", "dur")
for i in range(10):
# Test fails if we uncomment the following lines
# srv.stop()
# srv.start()
msg, *rest = await consumer.fetch(1, timeout=10)
assert msg.data == b"Hello from NATS!"
assert msg.metadata.stream == "TEST1"
await msg.ack() This test passes, but if I uncomment the @wallyqs does this test describe correctly the auto-reconnect feature that we should expect from EditThe following test passes successfully (the call to @async_long_test
async def test_pull_subscribe_reconnect(self):
srv = self.server_pool[0]
nc = NATS()
await nc.connect(
max_reconnect_attempts=-1,
allow_reconnect=True,
reconnect_time_wait=1,
)
js = nc.jetstream()
await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])
for i in range(3):
ack = await js.publish("foo.1", "Hello from NATS!".encode())
assert ack.stream == "TEST1"
assert ack.seq == i + 1
consumer = await js.pull_subscribe("foo.1", "dur")
for i in range(3):
srv.stop()
srv.start()
while True:
try:
msg, *rest = await consumer.fetch(1, timeout=0.5)
except TimeoutError:
continue
break
assert msg.data == b"Hello from NATS!"
assert msg.metadata.stream == "TEST1"
await msg.ack() It seems that using a big or infinite timeout leads to the error in first test, because the fetch request is never sent to the client, and will never succeed (regardless of timeout value). For now, it seems preferable to use a lower timeout, and retry the fetch operation. I'd say that the following lines are "responsible" for this behavior ( await self._nc.publish(
self._nms,
json.dumps(next_req).encode(),
self._deliver,
)
# Wait for the response or raise timeout.
msg = await self._sub.next_msg(timeout) This publish operation never fails, whether a message is effectively sent to NATS or not, so we end up waiting for the whole timeout (at least that seem to be the case for the code sample provided in this issue). I'm wondering whether it is normal that the NATS client does not send the publish once it is reconnected though, naively I would think that the message to be published stays in the pending queue, and on reconnect the message is sent, so NATS will publish the message and the coroutine waiting on the subscription queue (e.g., It may be worth knowing that setting the connect option This last test illustrates very well what is happening (run with @async_long_test
async def test_pull_subscribe_reconnect(self):
srv = self.server_pool[0]
nc = NATS()
await nc.connect(
max_reconnect_attempts=-1,
allow_reconnect=True,
reconnect_time_wait=1,
pending_size=0,
)
js = nc.jetstream()
await js.add_stream(name="TEST1", subjects=["foo.1", "bar"])
for i in range(3):
ack = await js.publish("foo.1", "Hello from NATS!".encode())
assert ack.stream == "TEST1"
assert ack.seq == i + 1
consumer = await js.pull_subscribe("foo.1", "dur")
for i in range(3):
srv.stop()
srv.start()
await asyncio.sleep(0)
while True:
try:
msg, *rest = await consumer.fetch(1, timeout=0.5)
except TimeoutError:
if nc.is_connected:
print("TIMEOUT - Retrying immediately", file=sys.stderr)
else:
print(
"TIMEOUT - Retrying in 250ms (currently disconnected)",
file=sys.stderr,
)
await asyncio.sleep(0.250)
except OutboundBufferLimitError:
print("NOT CONNECTED - Retrying in 250ms", file=sys.stderr)
await asyncio.sleep(0.250)
else:
print("OK - Got message", file=sys.stderr)
break
assert msg.data == b"Hello from NATS!"
assert msg.metadata.stream == "TEST1"
await msg.ack() Captured Output:
Second editI'm pretty sure that the request for next message is not published due to an error encountered when flushing (broken pipe). When an error occurs within the flusher task, pending buffer is lost. This has a much broader impact that just pull subscriptions, but if we change the try:
if self._pending_data_size > 0:
- self.transport.writlines(self._pending[:])
+ pending = self._pending[:]
self._pending = []
self._pending_data_size = 0
+ self._transport.writelines(pending)
- await self._transport.drain()
+ try:
+ await self._transport.drain()
+ except BaseException:
+ if pending:
+ self._pending = pending + self._pending
+ self._pending_data_size += len(pending)
+ raise
except OSError as e:
if pending:
self._pending_data_size += len(pending)
self._pending = pending + self._pending then messages are indeed published on reconnect and test passes , however, the
I don't really understand what this means 😅 but I take it as "caller don't know how much of pending data has been written" so I guess the code above may send messages twice if an error occured after some data has been effectively written but before drain completes ? In the end, it seems that users should always consider the possibility that publish is never received by the server. Regarding the specific case of On nats-py side, it may be worth changing the |
I can second this bug, it hit is un production when we last did an update. Some notes: |
I've also been hitting this in production and had to restart services with pull subscriptions so they received messages again. @wallyqs Would you mind taking a look at this please? This bug has a pretty severe impact on production systems relying on JetStream. |
I was actually doing some thinking on this yesterday, |
hi, |
We are facing similar issues. Any updates on this? |
Generally its not advised to set very high timeouts. Pull Requests are not persisted throughout server reboots, and clients need to send new ones. The new JS API provides out of the box solution for this challenge, by providing kind of a subscription that handles timeouts, reconnects, etc. |
What version were you using?
v2.9.6
What environment was the server running in?
docker
Is this defect reproducible?
When the nats servers becomes unavailable for a short while any pull_subscription is lost (i.e. stops receiving messages), whereas push subscribers works as expected.
Reproduce
Run the following script. Reboot the nats server. Emit messages on the queue
testnats.reconnec
. Nothing is receivedIt works as expected if I uncomment the pull_subscribe logic and replace it with the commented out js.subscribe() line
Given the capability you are leveraging, describe your expectation?
Pull subscribers should reconnect after reboot
Given the expectation, what is the defect you are observing?
pull subscribers are not resubscribing
The text was updated successfully, but these errors were encountered: