-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
maxPingOut effectively limits the number of concurrent flush calls. This is not the case in other clients #291
Comments
While the go client is considered the gold standard - i don't see how this is logically wrong :-) If flush uses a ping, shouldn't flushes be limited the same as other pings? |
The connection timeout is essentially maxPingOut * pingInterval. If I have more threads concurrently calling flush than what I need to set maxPingOut to to get a desired connection timeout, I'm out of luck. |
@sasbury This doesn't seem right: https://github.com/nats-io/nats.java/blob/main/src/main/java/io/nats/client/impl/NatsConnection.java#L1275 handleCommunicationIssue closes the socket. I can definitely see throwing the IllegalStateException here but closing the socket b/c it was asked to ping and we're over max doesn't not seem right. Also, I'm wondering if maybe on flushes, if there is something in the pong queue already and we are over maxPingsOut, then just attach to the last future in the pong queue? If there is a legit connection issue, then it's going to timeout anyway. If it's slow then not sure. |
I got into the same issue, which is very unpleasant in my case. Case: my service tries to unsubscribe from multiple subjects in runtime simultaneously I could contribute to the fix, but I want to discuss how to do it. I'm not sure that I understand what is the case of using a queue for pongs future (and max ping limit as well). I see the comment in the code which says
But it's not clear to me why one wants to wait for a specific pong. How about replacing the queue with an atomic reference and reusing a completable future instance? // Define pong reference
private final AtomicReference<CompletableFuture<Boolean>> pongReference;
// Use it as follows
CompletableFuture<Boolean> pongFuture =
pongReference.updateAndGet(existing -> existing == null ? new CompletableFuture<>() : existing); In that case, to control connection liveliness, we can move the Replace: nats.java/src/main/java/io/nats/client/impl/NatsConnection.java Lines 471 to 480 in 73c6039
with if (isConnected()) {
try {
// The timer always uses the standard queue
softPing().get(pingMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException timeoutException) {
handleCommunicationIssue(timeoutException);
} catch (ExecutionException | InterruptedException e) {
handleCommunicationIssue(new TimeoutException(e.getMessage()));
}
} @sasbury @scottf, what do you think? p.s. I'm not sure that my proposal is correct. I see under the Sorry if this is completely wrong. I don't have enough context in the code base. |
For now, you might consider using multiple connections. I'll look into this when I have some time. |
flush calls sendPing(true) which causes the limit check.
In other clients, maxPingOut only limits the number of pings without a pong as sent due to the pingInterval. There pingInterval * maxPingOut defines the duration after which the connection becomes stale. Her it also limits how many parallel calls to flush are allowed.
nats.java/src/main/java/io/nats/client/impl/NatsConnection.java
Lines 1189 to 1192 in 7f7fe82
The text was updated successfully, but these errors were encountered: