Skip to content

Commit

Permalink
Optimize a bit.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 30, 2023
1 parent 540120e commit aa5aa52
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 66 deletions.
31 changes: 4 additions & 27 deletions java/client/src/main/java/babushka/Client.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package babushka;

import babushka.connection.ReadHandler;
import babushka.connection.SocketManager;
import com.google.common.annotations.VisibleForTesting;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
Expand All @@ -11,7 +10,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import redis_request.RedisRequestOuterClass.Command;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;
import redis_request.RedisRequestOuterClass.RedisRequest;
Expand Down Expand Up @@ -71,20 +69,6 @@ public void connectToRedis(String host, int port, boolean useSsl, boolean cluste
waitForResult(asyncConnectToRedis(host, port, useSsl, clusterMode));
}

/**
* Create a unique callback ID (request ID) and a corresponding registered future for the
* response.<br>
* Should be used for every request submitted to ensure that it can be tracked by {@link
* SocketManager} and {@link ReadHandler}.
*
* @return New callback ID and new future to be returned to user.
*/
private synchronized Pair<Integer, CompletableFuture<Response>> getNextCallback() {
var future = new CompletableFuture<Response>();
int callbackId = socketManager.registerRequest(future);
return Pair.of(callbackId, future);
}

/** Build a protobuf connection request. See {@link #connectToRedis}. */
// TODO support more parameters and/or configuration object
@VisibleForTesting
Expand Down Expand Up @@ -112,14 +96,10 @@ static ConnectionRequest getConnectionRequest(
public CompletableFuture<Response> asyncConnectToRedis(
String host, int port, boolean useSsl, boolean clusterMode) {
var request = getConnectionRequest(host, port, useSsl, clusterMode);
var future = new CompletableFuture<Response>();
socketManager.registerConnection(future);
socketManager.writeAndFlush(request);
return future;
return socketManager.connect(request);
}

private CompletableFuture<Response> submitNewCommand(RequestType command, List<String> args) {
var commandId = getNextCallback();
// TODO this explicitly uses ForkJoin thread pool. May be we should use another one.
return CompletableFuture.supplyAsync(
() -> {
Expand All @@ -128,18 +108,15 @@ private CompletableFuture<Response> submitNewCommand(RequestType command, List<S
commandArgs.addArgs(arg);
}

RedisRequest request =
RedisRequest.Builder builder =
RedisRequest.newBuilder()
.setCallbackIdx(commandId.getKey())
.setSingleCommand(
Command.newBuilder()
.setRequestType(command)
.setArgsArray(commandArgs.build())
.build())
.setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build())
.build();
socketManager.writeAndFlush(request);
return commandId.getValue();
.setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build());
return socketManager.write(builder, true);
})
.thenCompose(f -> f);
}
Expand Down
50 changes: 15 additions & 35 deletions java/client/src/main/java/babushka/connection/SocketManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import babushka.BabushkaCoreNativeDefinitions;
import babushka.Client;
import com.google.protobuf.GeneratedMessageV3;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
Expand All @@ -20,6 +20,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import redis_request.RedisRequestOuterClass.RedisRequest;

/**
* A UDS connection manager. This class is responsible for:
Expand Down Expand Up @@ -91,7 +92,7 @@ public static synchronized SocketManager getInstance() {
}

/** The singleton instance. */
private static SocketManager INSTANCE = null;
private static SocketManager INSTANCE = new SocketManager();

/** Constructor for the single instance. */
private SocketManager() {
Expand Down Expand Up @@ -122,44 +123,23 @@ cpuCount, new DefaultThreadFactory("NettyWrapper-kqueue-elg", true))
}
}

/**
* Write a protobuf message to the socket.<br>
* Always {@link #registerRequest} before submitting!
*/
public void write(GeneratedMessageV3 message) {
channel.write(message.toByteArray());
}

/**
* Write a protobuf message to the socket and flush it.<br>
* Always {@link #registerRequest} before submitting!
*/
public void writeAndFlush(GeneratedMessageV3 message) {
channel.writeAndFlush(message.toByteArray());
}

/**
* Register a new request to be sent. Socket Manager takes responsibility for tracking the
* returned callback ID in all incoming responses. Once response received, the given future
* completes with it.
*
* @param future A client promise for response.
* @return Unique callback ID which should set into request.
*/
public int registerRequest(CompletableFuture<Response> future) {
/** Write a protobuf message to the socket. */
public CompletableFuture<Response> write(RedisRequest.Builder request, boolean flush) {
var future = new CompletableFuture<Response>();
int callbackId = requestId.incrementAndGet();
request.setCallbackIdx(callbackId);
SocketManagerResources.responses.put(callbackId, future);
return callbackId;
if (flush) channel.writeAndFlush(request.build().toByteArray());
else channel.write(request.build().toByteArray());
return future;
}

/**
* Register a new connection request similar to {@link #registerRequest}.<br>
* No callback ID returned, because connection request/response pair have no such field (subject
* to change). Track <a href="https://github.com/aws/babushka/issues/600">issue #600</a> for more
* details.
*/
public void registerConnection(CompletableFuture<Response> future) {
/** Write a protobuf message to the socket. */
public CompletableFuture<Response> connect(ConnectionRequest request) {
var future = new CompletableFuture<Response>();
SocketManagerResources.connectionRequests.add(future);
channel.writeAndFlush(request.toByteArray());
return future;
}

/**
Expand Down
6 changes: 2 additions & 4 deletions java/client/src/test/java/babushka/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import babushka.connection.SocketManager;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import response.ResponseOuterClass;

public class ClientTest {
Expand Down Expand Up @@ -41,8 +40,7 @@ public void test_asyncConnectToRedis_success() {

// verify
// assertTrue(connectionResponse instanceof CompletableFuture);
Mockito.verify(socketManager, times(1)).registerConnection(eq(connectionResponse));
Mockito.verify(socketManager, times(1)).writeAndFlush(eq(connectionRequest));
verify(socketManager).connect(eq(connectionRequest));

// teardown
}
Expand Down

0 comments on commit aa5aa52

Please sign in to comment.