From 9cdd788cf4cc4113f11fd7fa7d4c2b5ef40b2886 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 23 Jan 2024 13:42:12 -0800 Subject: [PATCH] Java client: Fix how UDS connection established (#838) * Fix * `sync` instead of `syncUninterruptibly`. Signed-off-by: Yury-Fridlyand --- .../src/main/java/glide/api/RedisClient.java | 23 ++++++++++++------- .../connectors/handlers/ChannelHandler.java | 8 ++++--- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index 4fa1b14c51..6b40292bc8 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -22,16 +22,23 @@ public class RedisClient extends BaseClient { * @return a Future to connect and return a RedisClient */ public static CompletableFuture CreateClient(RedisClientConfiguration config) { - ChannelHandler channelHandler = buildChannelHandler(); - ConnectionManager connectionManager = buildConnectionManager(channelHandler); - CommandManager commandManager = buildCommandManager(channelHandler); - // TODO: Support exception throwing, including interrupted exceptions - return connectionManager - .connectToRedis(config) - .thenApply(ignore -> new RedisClient(connectionManager, commandManager)); + try { + ChannelHandler channelHandler = buildChannelHandler(); + ConnectionManager connectionManager = buildConnectionManager(channelHandler); + CommandManager commandManager = buildCommandManager(channelHandler); + // TODO: Support exception throwing, including interrupted exceptions + return connectionManager + .connectToRedis(config) + .thenApply(ignore -> new RedisClient(connectionManager, commandManager)); + } catch (InterruptedException e) { + // Something bad happened while we were establishing netty connection to UDS + var future = new CompletableFuture(); + future.completeExceptionally(e); + return future; + } } - protected static ChannelHandler buildChannelHandler() { + protected static ChannelHandler buildChannelHandler() throws InterruptedException { CallbackDispatcher callbackDispatcher = new CallbackDispatcher(); return new ChannelHandler(callbackDispatcher, getSocket()); } diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index f91bbed3db..bce035ea19 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -28,7 +28,8 @@ public class ChannelHandler { private final CallbackDispatcher callbackDispatcher; /** Open a new channel for a new client. */ - public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) { + public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) + throws InterruptedException { this( ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()), Platform.getClientUdsNettyChannelType(), @@ -51,14 +52,15 @@ public ChannelHandler( Class domainSocketChannelClass, ChannelInitializer channelInitializer, DomainSocketAddress domainSocketAddress, - CallbackDispatcher callbackDispatcher) { + CallbackDispatcher callbackDispatcher) + throws InterruptedException { channel = new Bootstrap() .group(eventLoopGroup) .channel(domainSocketChannelClass) .handler(channelInitializer) .connect(domainSocketAddress) - // TODO call here .sync() if needed or remove this comment + .sync() .channel(); this.callbackDispatcher = callbackDispatcher; }