Skip to content

Commit

Permalink
Refactor channel closure handling (#118)
Browse files Browse the repository at this point in the history
* Ensure resources complete with Closing Error when client closes. New command submissions on closed clients now also return a future with a ClosingException Set. (Consistent with Py and Node clients)

* Added a private boolean that saves the state of channel handler.

* Changed Boolean to Atomic Boolean added IT tests and modified UT tests.

* Fix IT
  • Loading branch information
SanHalacogluImproving authored Mar 4, 2024
1 parent 5d5a36c commit be50490
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public void distributeClosingException(String message) {
}

public void shutdownGracefully() {
responses.values().forEach(future -> future.cancel(false));
String msg = "Operation terminated: The closing process has been initiated for the resource.";
responses.values().forEach(future -> future.completeExceptionally(new ClosingException(msg)));
responses.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RedisRequest;
Expand All @@ -22,6 +23,11 @@ public class ChannelHandler {

protected final Channel channel;
protected final CallbackDispatcher callbackDispatcher;
private AtomicBoolean isClosed = new AtomicBoolean(false);

public boolean isClosed() {
return this.isClosed.get() || !this.channel.isOpen();
}

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup.
Expand Down Expand Up @@ -84,6 +90,7 @@ public CompletableFuture<Response> connect(ConnectionRequest request) {

/** Closes the UDS connection and frees corresponding resources. */
public ChannelFuture close() {
this.isClosed.set(true);
callbackDispatcher.shutdownGracefully();
return channel.close();
}
Expand Down
7 changes: 7 additions & 0 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public <T> CompletableFuture<T> submitNewCommand(
*/
protected <T> CompletableFuture<T> submitCommandToChannel(
RedisRequest.Builder command, RedisExceptionCheckedFunction<Response, T> responseHandler) {
if (channel.isClosed()) {
var errorFuture = new CompletableFuture<T>();
errorFuture.completeExceptionally(
new ClosingException("Channel closed: Unable to submit command."));
return errorFuture;
}

// write command request to channel
// when complete, convert the response to our expected type T using the given responseHandler
return channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,8 @@ public void rethrow_error_if_UDS_channel_closed() {
stopRustCoreLibMock();
try {
var exception =
assertThrows(
ExecutionException.class, () -> client.customCommand(new String[0]).get(1, SECONDS));
assertTrue(exception.getCause() instanceof RuntimeException);

// Not a public class, can't import
assertEquals(
"io.netty.channel.StacklessClosedChannelException",
exception.getCause().getCause().getClass().getName());
assertThrows(ExecutionException.class, () -> client.customCommand(new String[0]).get());
assertTrue(exception.getCause() instanceof ClosingException);
} finally {
// restart mock to let other tests pass if this one failed
startRustCoreLibMock(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void submitNewCommand_return_Object_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -81,6 +82,7 @@ public void submitNewCommand_return_Null_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -107,6 +109,7 @@ public void submitNewCommand_return_String_result() {
CompletableFuture<Response> future = new CompletableFuture<>();
future.complete(respPointerResponse);
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

// exercise
CompletableFuture<Object> result =
Expand All @@ -126,6 +129,7 @@ public void submitNewCommand_return_String_result() {
public void prepare_request_with_simple_routes(SimpleRoute routeType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -156,6 +160,7 @@ public void prepare_request_with_simple_routes(SimpleRoute routeType) {
public void prepare_request_with_slot_id_routes(SlotType slotType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -188,6 +193,7 @@ public void prepare_request_with_slot_id_routes(SlotType slotType) {
public void prepare_request_with_slot_key_routes(SlotType slotType) {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -239,6 +245,7 @@ public void submitNewCommand_with_Transaction_sends_protobuf_request() {

CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down Expand Up @@ -279,6 +286,7 @@ public void submitNewCommand_with_ClusterTransaction_with_route_sends_protobuf_r

CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);
Expand Down
32 changes: 32 additions & 0 deletions java/integTest/src/test/java/glide/cluster/ClientTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.cluster;

import static glide.TestConfiguration.CLUSTER_PORTS;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import glide.api.RedisClusterClient;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.exceptions.ClosingException;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;

public class ClientTests {
@Test
@SneakyThrows
public void close_client_throws_ExecutionException_with_ClosingException_cause() {
RedisClusterClient client =
RedisClusterClient.CreateClient(
RedisClusterClientConfiguration.builder()
.address(NodeAddress.builder().port(CLUSTER_PORTS[0]).build())
.build())
.get();

client.close();
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.set("foo", "bar").get());
assertTrue(executionException.getCause() instanceof ClosingException);
}
}
32 changes: 32 additions & 0 deletions java/integTest/src/test/java/glide/standalone/ClientTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.standalone;

import static glide.TestConfiguration.STANDALONE_PORTS;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import glide.api.RedisClient;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.api.models.exceptions.ClosingException;
import java.util.concurrent.ExecutionException;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;

public class ClientTests {
@Test
@SneakyThrows
public void close_client_throws_ExecutionException_with_ClosingException_cause() {
RedisClient client =
RedisClient.CreateClient(
RedisClientConfiguration.builder()
.address(NodeAddress.builder().port(STANDALONE_PORTS[0]).build())
.build())
.get();

client.close();
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.set("key", "value").get());
assertTrue(executionException.getCause() instanceof ClosingException);
}
}

0 comments on commit be50490

Please sign in to comment.