Skip to content

Commit

Permalink
Open channel (connection) per client, share thread pools.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Dec 4, 2023
1 parent fbff211 commit 701da44
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package babushka.benchmarks.clients.babushka;

import babushka.Client;
import babushka.benchmarks.clients.AsyncClient;
import babushka.benchmarks.clients.SyncClient;
import babushka.benchmarks.utils.ConnectionSettings;
import babushka.client.Commands;
import babushka.client.Connection;
import java.util.concurrent.Future;

public class JniNettyClient implements SyncClient, AsyncClient {

private final Connection connection;
private Commands commands = null;
private final Client testClient = new Client();
private String name = "JNI Netty";

public JniNettyClient(boolean async) {
name += async ? " async" : " sync";
connection = new Connection();
}

@Override
Expand All @@ -25,36 +22,37 @@ public String getName() {

@Override
public void closeConnection() {
connection.closeConnection();
testClient.getConnection().closeConnection();
}

@Override
public void connectToRedis(ConnectionSettings connectionSettings) {
connection.connectToRedis(
connectionSettings.host,
connectionSettings.port,
connectionSettings.useSsl,
connectionSettings.clusterMode);
commands = connection.getCommands();
testClient
.getConnection()
.connectToRedis(
connectionSettings.host,
connectionSettings.port,
connectionSettings.useSsl,
connectionSettings.clusterMode);
}

@Override
public Future<Boolean> asyncSet(String key, String value) {
return commands.asyncSet(key, value);
return testClient.getCommands().asyncSet(key, value);
}

@Override
public Future<String> asyncGet(String key) {
return commands.asyncGet(key);
return testClient.getCommands().asyncGet(key);
}

@Override
public void set(String key, String value) {
commands.set(key, value);
testClient.getCommands().set(key, value);
}

@Override
public String get(String key) {
return commands.get(key);
return testClient.getCommands().get(key);
}
}
32 changes: 32 additions & 0 deletions java/client/src/main/java/babushka/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package babushka;

import babushka.client.ChannelHolder;
import babushka.client.Commands;
import babushka.client.Connection;
import babushka.connection.CallbackManager;
import babushka.connection.SocketManager;
import java.nio.channels.NotYetConnectedException;
import lombok.Getter;

public class Client {

private final ChannelHolder channelHolder;
private final Commands commands;
@Getter private final Connection connection;

public Client() {
var callBackManager = new CallbackManager();
channelHolder =
new ChannelHolder(
SocketManager.getInstance().openNewChannel(callBackManager), callBackManager);
connection = new Connection(channelHolder);
commands = new Commands(channelHolder);
}

public Commands getCommands() {
if (!connection.isConnected()) {
throw new NotYetConnectedException();
}
return commands;
}
}
40 changes: 40 additions & 0 deletions java/client/src/main/java/babushka/client/ChannelHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package babushka.client;

import babushka.connection.CallbackManager;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.channel.Channel;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

@RequiredArgsConstructor
public class ChannelHolder {
private final Channel channel;
private final CallbackManager callbackManager;

/** Write a protobuf message to the socket. */
public CompletableFuture<Response> write(RedisRequest.Builder request, boolean flush) {
var commandId = callbackManager.registerRequest();
request.setCallbackIdx(commandId.getKey());

if (flush) {
channel.writeAndFlush(request.build().toByteArray());
} else {
channel.write(request.build().toByteArray());
}
return commandId.getValue();
}

/** Write a protobuf message to the socket. */
public CompletableFuture<Response> connect(ConnectionRequest request) {
channel.writeAndFlush(request.toByteArray());
return callbackManager.getConnectionPromise();
}

/** Closes the UDS connection and frees corresponding resources. */
public void close() {
channel.close();
callbackManager.clean();
}
}
10 changes: 4 additions & 6 deletions java/client/src/main/java/babushka/client/Commands.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package babushka.client;

import babushka.FFI.BabushkaCoreNativeDefinitions;
import babushka.connection.SocketManager;
import babushka.tools.Awaiter;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass.RedisRequest;
import redis_request.RedisRequestOuterClass.RequestType;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

@RequiredArgsConstructor
public class Commands {
private final SocketManager socketManager;

public Commands(SocketManager socketManager) {
this.socketManager = socketManager;
}
private final ChannelHolder channel;

/**
* Sync (blocking) set. See async option in {@link #asyncSet}.<br>
Expand Down Expand Up @@ -71,7 +69,7 @@ public Future<String> asyncGet(String key) {

private CompletableFuture<Response> submitRequest(RedisRequest.Builder builder) {
// TODO this explicitly uses ForkJoin thread pool. May be we should use another one.
return CompletableFuture.supplyAsync(() -> socketManager.write(builder, true))
return CompletableFuture.supplyAsync(() -> channel.write(builder, true))
.thenComposeAsync(f -> f);
}
}
27 changes: 8 additions & 19 deletions java/client/src/main/java/babushka/client/Connection.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
package babushka.client;

import babushka.connection.SocketManager;
import babushka.tools.Awaiter;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.ConstantResponse;

@RequiredArgsConstructor
public class Connection {

// TODO: not used yet, not implemented on rust side
// https://github.com/aws/babushka/issues/635
private final int connectionId = 0;

private final AtomicBoolean isConnected = new AtomicBoolean(false);

private final SocketManager socketManager;

public Connection() {
socketManager = SocketManager.getInstance();
}
private final ChannelHolder channel;

/**
* Sync (blocking) connect to REDIS. See async option in {@link #asyncConnectToRedis}.
Expand Down Expand Up @@ -48,7 +40,7 @@ public boolean connectToRedis(String host, int port, boolean useSsl, boolean clu
public CompletableFuture<Boolean> asyncConnectToRedis(
String host, int port, boolean useSsl, boolean clusterMode) {
var request = RequestBuilder.createConnectionRequest(host, port, useSsl, clusterMode);
return socketManager
return channel
.connect(request)
.thenApplyAsync(
response ->
Expand All @@ -62,16 +54,13 @@ public void closeConnection() {
}

/** Async (non-blocking) disconnect. See sync option in {@link #closeConnection}. */
// TODO Not implemented yet in rust core lib.
public CompletableFuture<Void> asyncCloseConnection() {
isConnected.setPlain(false);
return CompletableFuture.runAsync(() -> {});
return CompletableFuture.runAsync(channel::close);
}

public Commands getCommands() {
if (!isConnected.get()) {
throw new NotYetConnectedException();
}
return new Commands(socketManager);
/** Check that connection established. This doesn't validate whether it is alive. */
public boolean isConnected() {
return isConnected.get();
}
}
43 changes: 12 additions & 31 deletions java/client/src/main/java/babushka/connection/CallbackManager.java
Original file line number Diff line number Diff line change
@@ -1,81 +1,62 @@
package babushka.connection;

import java.util.Deque;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.Response;

/** Holder for resources owned by {@link SocketManager} and used by {@link ReadHandler}. */
class CallbackManager {
public class CallbackManager {

/** Unique request ID (callback ID). Thread-safe. */
private static final AtomicInteger requestId = new AtomicInteger(0);
private final AtomicInteger requestId = new AtomicInteger(0);

/**
* Storage of Futures to handle responses. Map key is callback id, which starts from 1.<br>
* Each future is a promise for every submitted by user request.
*/
private static final Map<Integer, CompletableFuture<Response>> responses =
new ConcurrentHashMap<>();
private final Map<Integer, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();

/**
* Storage for connection requests similar to {@link #responses}. Unfortunately, connection
* Storage for connection request similar to {@link #responses}. Unfortunately, connection
* requests can't be stored in the same storage, because callback ID = 0 is hardcoded for
* connection requests. Will be removed once <a
* href="https://github.com/aws/babushka/issues/600">issue #600 on GH</a> fixed.
* connection requests.
*/
private static final Deque<CompletableFuture<Response>> connectionRequests =
new ConcurrentLinkedDeque<>();
@Getter private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>();

/**
* Register a new request to be sent. Once response received, the given future completes with it.
*
* @return A pair of unique callback ID which should set into request and a client promise for
* response.
*/
public static Pair<Integer, CompletableFuture<Response>> registerRequest() {
public Pair<Integer, CompletableFuture<Response>> registerRequest() {
int callbackId = requestId.incrementAndGet();
var future = new CompletableFuture<Response>();
responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

/**
* Register a new connection request similar to {@link #registerRequest}.<br>
* No callback ID returned, because connection request/response pair have no such field (subject
* to change). Track <a href="https://github.com/aws/babushka/issues/600">issue #600</a> for more
* details.
*/
public static CompletableFuture<Response> registerConnection() {
var future = new CompletableFuture<Response>();
connectionRequests.add(future);
return future;
}

/**
* Complete the corresponding client promise and free resources.
*
* @param response A response received
*/
public static void completeRequest(Response response) {
public void completeRequest(Response response) {
int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
// can't distinguish connection requests since they have no
// callback ID
// https://github.com/aws/babushka/issues/600
connectionRequests.pop().completeAsync(() -> response);
connectionPromise.completeAsync(() -> response);
} else {
responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
}
}

public static void clean() {
public void clean() {
// TODO should we reply in uncompleted futures?
connectionRequests.clear();
responses.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

/** Builder for the channel used by {@link SocketManager}. */
@RequiredArgsConstructor
public class ChannelBuilder extends ChannelInitializer<UnixChannel> {

private final CallbackManager callbackManager;

@Override
public void initChannel(@NonNull UnixChannel ch) {
ch.pipeline()
// https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html
.addLast("protobufDecoder", new ProtobufVarint32FrameDecoder())
.addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast(new ReadHandler())
.addLast(new ReadHandler(callbackManager))
.addLast(new WriteHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.Response;

/** Handler for inbound traffic though UDS. Used by Netty. */
@RequiredArgsConstructor
public class ReadHandler extends ChannelInboundHandlerAdapter {

private final CallbackManager callbackManager;

/**
* Handles responses from babushka core:
*
Expand All @@ -25,7 +30,7 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
buf.readBytes(bytes);
// TODO surround parsing with try-catch, set error to future if parsing failed.
var response = Response.parseFrom(bytes);
CallbackManager.completeRequest(response);
callbackManager.completeRequest(response);
buf.release();
}

Expand Down
Loading

0 comments on commit 701da44

Please sign in to comment.