Skip to content

Commit

Permalink
chore: better error reporting for ConnectionWatchDog
Browse files Browse the repository at this point in the history
  • Loading branch information
okg-cxf committed Aug 8, 2024
1 parent 19e6463 commit 1817719
Showing 1 changed file with 36 additions and 29 deletions.
65 changes: 36 additions & 29 deletions src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.RedisException;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectAttemptEvent;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
Expand Down Expand Up @@ -215,16 +217,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
channel = null;

if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
return;
}

if (!useAutoBatchFlushEndpoint) {
this.scheduleReconnect();
}
Expand Down Expand Up @@ -261,14 +253,16 @@ public void scheduleReconnect() {
logger.debug("{} scheduleReconnect()", logPrefix());

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "isEventLoopGroupActive() == false";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Skip reconnect scheduling, listener disabled";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

Expand All @@ -285,8 +279,9 @@ public void scheduleReconnect() {
reconnectScheduleTimeout = null;

if (!isEventLoopGroupActive()) {
logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Cannot execute scheduled reconnect timer, reconnect workers are terminated";
logger.warn(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

Expand All @@ -302,17 +297,25 @@ public void scheduleReconnect() {
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
notifyEndpointFailedToConnectIfNeeded();
notifyEndpointFailedToConnectIfNeeded("Skipping scheduleReconnect() because I have an active channel");
}
}

private void notifyEndpointFailedToConnectIfNeeded(String msg) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(new RedisException(msg));
}
}

private void notifyEndpointFailedToConnectIfNeeded() {
notifyEndpointFailedToConnectIfNeeded(new CancellationException());
private void notifyEndpointFailedToConnectIfNeeded(Throwable t) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(t);
}
}

private void notifyEndpointFailedToConnectIfNeeded(Exception e) {
private void notifyEndpointFailedToConnectIfNeeded(Supplier<Throwable> throwableSupplier) {
if (useAutoBatchFlushEndpoint) {
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(throwableSupplier.get());
}
}

Expand All @@ -335,26 +338,29 @@ public void run(int attempt) throws Exception {
* @param delay retry delay.
* @throws Exception when reconnection fails.
*/
private void run(int attempt, Duration delay) throws Exception {
private void run(int attempt, Duration delay) {

reconnectSchedulerSync.set(false);
reconnectScheduleTimeout = null;

if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "isEventLoopGroupActive() == false";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
notifyEndpointFailedToConnectIfNeeded();
final String errMsg = "Skip reconnect scheduling, listener disabled";
logger.debug(errMsg);
notifyEndpointFailedToConnectIfNeeded(errMsg);
return;
}

if (isReconnectSuspended()) {
logger.debug("Skip reconnect scheduling, reconnect is suspended");
notifyEndpointFailedToConnectIfNeeded();
final String msg = "Skip reconnect scheduling, reconnect is suspended";
logger.debug(msg);
notifyEndpointFailedToConnectIfNeeded(msg);
return;
}

Expand Down Expand Up @@ -411,7 +417,8 @@ private void run(int attempt, Duration delay) throws Exception {
if (!isReconnectSuspended()) {
scheduleReconnect();
} else {
notifyEndpointFailedToConnectIfNeeded();
notifyEndpointFailedToConnectIfNeeded(
() -> new RedisException("got error and then reconnect is suspended", t));
}
});
} catch (Exception e) {
Expand Down

0 comments on commit 1817719

Please sign in to comment.