diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 43005eea8..20923b1fe 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -25,10 +25,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionBuilder; import io.lettuce.core.ConnectionEvents; +import io.lettuce.core.RedisException; import io.lettuce.core.event.EventBus; import io.lettuce.core.event.connection.ReconnectAttemptEvent; import io.lettuce.core.event.connection.ReconnectFailedEvent; @@ -215,16 +217,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { channel = null; if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) { - if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - return; - } - - if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - return; - } - if (!useAutoBatchFlushEndpoint) { this.scheduleReconnect(); } @@ -261,14 +253,16 @@ public void scheduleReconnect() { logger.debug("{} scheduleReconnect()", logPrefix()); if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "isEventLoopGroupActive() == false"; + logger.debug(errMsg); + notifyEndpointFailedToConnectIfNeeded(errMsg); return; } if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Skip reconnect scheduling, listener disabled"; + logger.debug(errMsg); + notifyEndpointFailedToConnectIfNeeded(errMsg); return; } @@ -285,8 +279,9 @@ public void scheduleReconnect() { reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { - logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Cannot execute scheduled reconnect timer, reconnect workers are terminated"; + logger.warn(errMsg); + notifyEndpointFailedToConnectIfNeeded(errMsg); return; } @@ -302,17 +297,25 @@ public void scheduleReconnect() { } } else { logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix()); - notifyEndpointFailedToConnectIfNeeded(); + notifyEndpointFailedToConnectIfNeeded("Skipping scheduleReconnect() because I have an active channel"); + } + } + + private void notifyEndpointFailedToConnectIfNeeded(String msg) { + if (useAutoBatchFlushEndpoint) { + ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(new RedisException(msg)); } } - private void notifyEndpointFailedToConnectIfNeeded() { - notifyEndpointFailedToConnectIfNeeded(new CancellationException()); + private void notifyEndpointFailedToConnectIfNeeded(Throwable t) { + if (useAutoBatchFlushEndpoint) { + ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(t); + } } - private void notifyEndpointFailedToConnectIfNeeded(Exception e) { + private void notifyEndpointFailedToConnectIfNeeded(Supplier throwableSupplier) { if (useAutoBatchFlushEndpoint) { - ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e); + ((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(throwableSupplier.get()); } } @@ -335,26 +338,29 @@ public void run(int attempt) throws Exception { * @param delay retry delay. * @throws Exception when reconnection fails. */ - private void run(int attempt, Duration delay) throws Exception { + private void run(int attempt, Duration delay) { reconnectSchedulerSync.set(false); reconnectScheduleTimeout = null; if (!isEventLoopGroupActive()) { - logger.debug("isEventLoopGroupActive() == false"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "isEventLoopGroupActive() == false"; + logger.debug(errMsg); + notifyEndpointFailedToConnectIfNeeded(errMsg); return; } if (!isListenOnChannelInactive()) { - logger.debug("Skip reconnect scheduling, listener disabled"); - notifyEndpointFailedToConnectIfNeeded(); + final String errMsg = "Skip reconnect scheduling, listener disabled"; + logger.debug(errMsg); + notifyEndpointFailedToConnectIfNeeded(errMsg); return; } if (isReconnectSuspended()) { - logger.debug("Skip reconnect scheduling, reconnect is suspended"); - notifyEndpointFailedToConnectIfNeeded(); + final String msg = "Skip reconnect scheduling, reconnect is suspended"; + logger.debug(msg); + notifyEndpointFailedToConnectIfNeeded(msg); return; } @@ -411,7 +417,8 @@ private void run(int attempt, Duration delay) throws Exception { if (!isReconnectSuspended()) { scheduleReconnect(); } else { - notifyEndpointFailedToConnectIfNeeded(); + notifyEndpointFailedToConnectIfNeeded( + () -> new RedisException("got error and then reconnect is suspended", t)); } }); } catch (Exception e) {