diff --git a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java index dfaf01bbe7..6c5e86e2d2 100644 --- a/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java +++ b/java/client/src/main/java/glide/connectors/handlers/CallbackDispatcher.java @@ -129,7 +129,8 @@ public void distributeClosingException(String message) { } public void shutdownGracefully() { - responses.values().forEach(future -> future.cancel(false)); + String msg = "Operation terminated: The closing process has been initiated for the resource."; + responses.values().forEach(future -> future.completeExceptionally(new ClosingException(msg))); responses.clear(); } } diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 32637219d1..4800316803 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -9,6 +9,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.unix.DomainSocketAddress; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.NonNull; import lombok.RequiredArgsConstructor; import redis_request.RedisRequestOuterClass.RedisRequest; @@ -22,6 +23,11 @@ public class ChannelHandler { protected final Channel channel; protected final CallbackDispatcher callbackDispatcher; + private AtomicBoolean isClosed = new AtomicBoolean(false); + + public boolean isClosed() { + return this.isClosed.get() || !this.channel.isOpen(); + } /** * Open a new channel for a new client and running it on the provided EventLoopGroup. @@ -84,6 +90,7 @@ public CompletableFuture connect(ConnectionRequest request) { /** Closes the UDS connection and frees corresponding resources. */ public ChannelFuture close() { + this.isClosed.set(true); callbackDispatcher.shutdownGracefully(); return channel.close(); } diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 770830ce2f..253a83f317 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -109,6 +109,13 @@ public CompletableFuture submitNewCommand( */ protected CompletableFuture submitCommandToChannel( RedisRequest.Builder command, RedisExceptionCheckedFunction responseHandler) { + if (channel.isClosed()) { + var errorFuture = new CompletableFuture(); + errorFuture.completeExceptionally( + new ClosingException("Channel closed: Unable to submit command.")); + return errorFuture; + } + // write command request to channel // when complete, convert the response to our expected type T using the given responseHandler return channel diff --git a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java index 52be39bacc..331af6fa39 100644 --- a/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java +++ b/java/client/src/test/java/glide/connection/ConnectionWithGlideMockTests.java @@ -173,14 +173,8 @@ public void rethrow_error_if_UDS_channel_closed() { stopRustCoreLibMock(); try { var exception = - assertThrows( - ExecutionException.class, () -> client.customCommand(new String[0]).get(1, SECONDS)); - assertTrue(exception.getCause() instanceof RuntimeException); - - // Not a public class, can't import - assertEquals( - "io.netty.channel.StacklessClosedChannelException", - exception.getCause().getCause().getClass().getName()); + assertThrows(ExecutionException.class, () -> client.customCommand(new String[0]).get()); + assertTrue(exception.getCause() instanceof ClosingException); } finally { // restart mock to let other tests pass if this one failed startRustCoreLibMock(null); diff --git a/java/client/src/test/java/glide/managers/CommandManagerTest.java b/java/client/src/test/java/glide/managers/CommandManagerTest.java index 0f7f539ebb..a64c6499ad 100644 --- a/java/client/src/test/java/glide/managers/CommandManagerTest.java +++ b/java/client/src/test/java/glide/managers/CommandManagerTest.java @@ -60,6 +60,7 @@ public void submitNewCommand_return_Object_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -81,6 +82,7 @@ public void submitNewCommand_return_Null_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -107,6 +109,7 @@ public void submitNewCommand_return_String_result() { CompletableFuture future = new CompletableFuture<>(); future.complete(respPointerResponse); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); // exercise CompletableFuture result = @@ -126,6 +129,7 @@ public void submitNewCommand_return_String_result() { public void prepare_request_with_simple_routes(SimpleRoute routeType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -156,6 +160,7 @@ public void prepare_request_with_simple_routes(SimpleRoute routeType) { public void prepare_request_with_slot_id_routes(SlotType slotType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -188,6 +193,7 @@ public void prepare_request_with_slot_id_routes(SlotType slotType) { public void prepare_request_with_slot_key_routes(SlotType slotType) { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -239,6 +245,7 @@ public void submitNewCommand_with_Transaction_sends_protobuf_request() { CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); @@ -279,6 +286,7 @@ public void submitNewCommand_with_ClusterTransaction_with_route_sends_protobuf_r CompletableFuture future = new CompletableFuture<>(); when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + when(channelHandler.isClosed()).thenReturn(false); ArgumentCaptor captor = ArgumentCaptor.forClass(RedisRequest.Builder.class); diff --git a/java/integTest/src/test/java/glide/cluster/ClientTests.java b/java/integTest/src/test/java/glide/cluster/ClientTests.java new file mode 100644 index 0000000000..3ad92f59ce --- /dev/null +++ b/java/integTest/src/test/java/glide/cluster/ClientTests.java @@ -0,0 +1,32 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.cluster; + +import static glide.TestConfiguration.CLUSTER_PORTS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.RedisClusterClient; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.configuration.RedisClusterClientConfiguration; +import glide.api.models.exceptions.ClosingException; +import java.util.concurrent.ExecutionException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +public class ClientTests { + @Test + @SneakyThrows + public void close_client_throws_ExecutionException_with_ClosingException_cause() { + RedisClusterClient client = + RedisClusterClient.CreateClient( + RedisClusterClientConfiguration.builder() + .address(NodeAddress.builder().port(CLUSTER_PORTS[0]).build()) + .build()) + .get(); + + client.close(); + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.set("foo", "bar").get()); + assertTrue(executionException.getCause() instanceof ClosingException); + } +} diff --git a/java/integTest/src/test/java/glide/standalone/ClientTests.java b/java/integTest/src/test/java/glide/standalone/ClientTests.java new file mode 100644 index 0000000000..db248b2bce --- /dev/null +++ b/java/integTest/src/test/java/glide/standalone/ClientTests.java @@ -0,0 +1,32 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.standalone; + +import static glide.TestConfiguration.STANDALONE_PORTS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import glide.api.RedisClient; +import glide.api.models.configuration.NodeAddress; +import glide.api.models.configuration.RedisClientConfiguration; +import glide.api.models.exceptions.ClosingException; +import java.util.concurrent.ExecutionException; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +public class ClientTests { + @Test + @SneakyThrows + public void close_client_throws_ExecutionException_with_ClosingException_cause() { + RedisClient client = + RedisClient.CreateClient( + RedisClientConfiguration.builder() + .address(NodeAddress.builder().port(STANDALONE_PORTS[0]).build()) + .build()) + .get(); + + client.close(); + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.set("key", "value").get()); + assertTrue(executionException.getCause() instanceof ClosingException); + } +}