From aa5aa527c91c8bb2708f2d5fedcf1b72d6d6d8a5 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 30 Nov 2023 10:51:58 -0800 Subject: [PATCH] Optimize a bit. Signed-off-by: Yury-Fridlyand --- .../client/src/main/java/babushka/Client.java | 31 ++---------- .../babushka/connection/SocketManager.java | 50 ++++++------------- .../src/test/java/babushka/ClientTest.java | 6 +-- 3 files changed, 21 insertions(+), 66 deletions(-) diff --git a/java/client/src/main/java/babushka/Client.java b/java/client/src/main/java/babushka/Client.java index 0d61efaa79..e8b6099a73 100644 --- a/java/client/src/main/java/babushka/Client.java +++ b/java/client/src/main/java/babushka/Client.java @@ -1,6 +1,5 @@ package babushka; -import babushka.connection.ReadHandler; import babushka.connection.SocketManager; import com.google.common.annotations.VisibleForTesting; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; @@ -11,7 +10,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.Pair; import redis_request.RedisRequestOuterClass.Command; import redis_request.RedisRequestOuterClass.Command.ArgsArray; import redis_request.RedisRequestOuterClass.RedisRequest; @@ -71,20 +69,6 @@ public void connectToRedis(String host, int port, boolean useSsl, boolean cluste waitForResult(asyncConnectToRedis(host, port, useSsl, clusterMode)); } - /** - * Create a unique callback ID (request ID) and a corresponding registered future for the - * response.
- * Should be used for every request submitted to ensure that it can be tracked by {@link - * SocketManager} and {@link ReadHandler}. - * - * @return New callback ID and new future to be returned to user. - */ - private synchronized Pair> getNextCallback() { - var future = new CompletableFuture(); - int callbackId = socketManager.registerRequest(future); - return Pair.of(callbackId, future); - } - /** Build a protobuf connection request. See {@link #connectToRedis}. */ // TODO support more parameters and/or configuration object @VisibleForTesting @@ -112,14 +96,10 @@ static ConnectionRequest getConnectionRequest( public CompletableFuture asyncConnectToRedis( String host, int port, boolean useSsl, boolean clusterMode) { var request = getConnectionRequest(host, port, useSsl, clusterMode); - var future = new CompletableFuture(); - socketManager.registerConnection(future); - socketManager.writeAndFlush(request); - return future; + return socketManager.connect(request); } private CompletableFuture submitNewCommand(RequestType command, List args) { - var commandId = getNextCallback(); // TODO this explicitly uses ForkJoin thread pool. May be we should use another one. return CompletableFuture.supplyAsync( () -> { @@ -128,18 +108,15 @@ private CompletableFuture submitNewCommand(RequestType command, List f); } diff --git a/java/client/src/main/java/babushka/connection/SocketManager.java b/java/client/src/main/java/babushka/connection/SocketManager.java index e134d83b7e..df2a6cf7c2 100644 --- a/java/client/src/main/java/babushka/connection/SocketManager.java +++ b/java/client/src/main/java/babushka/connection/SocketManager.java @@ -4,7 +4,7 @@ import babushka.BabushkaCoreNativeDefinitions; import babushka.Client; -import com.google.protobuf.GeneratedMessageV3; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; @@ -20,6 +20,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import redis_request.RedisRequestOuterClass.RedisRequest; /** * A UDS connection manager. This class is responsible for: @@ -91,7 +92,7 @@ public static synchronized SocketManager getInstance() { } /** The singleton instance. */ - private static SocketManager INSTANCE = null; + private static SocketManager INSTANCE = new SocketManager(); /** Constructor for the single instance. */ private SocketManager() { @@ -122,44 +123,23 @@ cpuCount, new DefaultThreadFactory("NettyWrapper-kqueue-elg", true)) } } - /** - * Write a protobuf message to the socket.
- * Always {@link #registerRequest} before submitting! - */ - public void write(GeneratedMessageV3 message) { - channel.write(message.toByteArray()); - } - - /** - * Write a protobuf message to the socket and flush it.
- * Always {@link #registerRequest} before submitting! - */ - public void writeAndFlush(GeneratedMessageV3 message) { - channel.writeAndFlush(message.toByteArray()); - } - - /** - * Register a new request to be sent. Socket Manager takes responsibility for tracking the - * returned callback ID in all incoming responses. Once response received, the given future - * completes with it. - * - * @param future A client promise for response. - * @return Unique callback ID which should set into request. - */ - public int registerRequest(CompletableFuture future) { + /** Write a protobuf message to the socket. */ + public CompletableFuture write(RedisRequest.Builder request, boolean flush) { + var future = new CompletableFuture(); int callbackId = requestId.incrementAndGet(); + request.setCallbackIdx(callbackId); SocketManagerResources.responses.put(callbackId, future); - return callbackId; + if (flush) channel.writeAndFlush(request.build().toByteArray()); + else channel.write(request.build().toByteArray()); + return future; } - /** - * Register a new connection request similar to {@link #registerRequest}.
- * No callback ID returned, because connection request/response pair have no such field (subject - * to change). Track issue #600 for more - * details. - */ - public void registerConnection(CompletableFuture future) { + /** Write a protobuf message to the socket. */ + public CompletableFuture connect(ConnectionRequest request) { + var future = new CompletableFuture(); SocketManagerResources.connectionRequests.add(future); + channel.writeAndFlush(request.toByteArray()); + return future; } /** diff --git a/java/client/src/test/java/babushka/ClientTest.java b/java/client/src/test/java/babushka/ClientTest.java index af03243416..5014d20c98 100644 --- a/java/client/src/test/java/babushka/ClientTest.java +++ b/java/client/src/test/java/babushka/ClientTest.java @@ -2,14 +2,13 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import babushka.connection.SocketManager; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import response.ResponseOuterClass; public class ClientTest { @@ -41,8 +40,7 @@ public void test_asyncConnectToRedis_success() { // verify // assertTrue(connectionResponse instanceof CompletableFuture); - Mockito.verify(socketManager, times(1)).registerConnection(eq(connectionResponse)); - Mockito.verify(socketManager, times(1)).writeAndFlush(eq(connectionRequest)); + verify(socketManager).connect(eq(connectionRequest)); // teardown }