Skip to content

Commit

Permalink
Iteration 5: some fixes.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 2, 2023
1 parent a3075d5 commit 5f10964
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ public RunConfiguration() {
configuration = "Release";
resultsFile = Optional.of("res_java.json");//Optional.empty();
dataSize = new int[] {100, 4000};
concurrentTasks = new int[] {100, 1000};
concurrentTasks = new int[] {100};
clients =
new ClientName[] {
// ClientName.BABUSHKA_ASYNC,
//ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
ClientName.JNI_NETTY
ClientName.JNI_NETTY, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
};
host = "localhost";
port = 6379;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.SimpleUserEventChannelHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
Expand Down Expand Up @@ -69,27 +72,39 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class JniNettyClient implements SyncClient, AsyncClient<Response>, AutoCloseable {

// https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html
private final static int AUTO_FLUSH_THRESHOLD = 512;//1024;
private final AtomicInteger nonFlushedCounter = new AtomicInteger(0);

private final static int AUTO_FLUSH_TIMEOUT_MILLIS = 100;

private final static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000;

// Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection request always).
// TODO clean up completed futures
private final List<CompletableFuture<Response>> responses = Collections.synchronizedList(new ArrayList<>());

private final static String unixSocket = getSocket();
private final String unixSocket = getSocket();

// TODO static or move to constructor?
private static String getSocket() {
try {
return RedisClient.startSocketListenerExternal();
} catch (Exception | UnsatisfiedLinkError e) {
System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e);
return null;
throw new RuntimeException(e);
}
}

private Channel channel = null;
private EventLoopGroup group = null;

// We support MacOS and Linux only, because Babushka does not support Windows, because tokio does not support it.
// Probably we should use NIO (NioEventLoopGroup) for Windows.
private final static boolean isMacOs = isMacOs();
private static boolean isMacOs() {
try {
Expand All @@ -101,6 +116,7 @@ private static boolean isMacOs() {
}

static {
// TODO fix: netty still doesn't use slf4j nor log4j
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
}

Expand Down Expand Up @@ -128,7 +144,7 @@ public void connectToRedis(ConnectionSettings connectionSettings) {
Response connected = null;
try {
connected = waitForResult(asyncConnectToRedis(connectionSettings));
System.out.printf("Connection %s%n", connected != null ? connected.getConstantResponse() : null);
//System.out.printf("Connection %s%n", connected != null ? connected.getConstantResponse() : null);
} catch (Exception e) {
System.err.println("Connection time out");
}
Expand All @@ -139,9 +155,11 @@ public void connectToRedis(ConnectionSettings connectionSettings) {
private void createChannel() {
// TODO maybe move to constructor or to static?
// ======
//EventLoopGroup group = new NioEventLoopGroup();
try {
channel = new Bootstrap()
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(1024 * 1024 * 2 + 10, 1024 * 1024 * 10))
.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup())
.channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class)
.handler(new ChannelInitializer<UnixChannel>() {
Expand Down Expand Up @@ -191,8 +209,21 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//System.out.printf("=== write %s %s %s %n", ctx, msg, promise);

super.write(ctx, Unpooled.copiedBuffer((byte[])msg), promise);
var bytes = (byte[])msg;

boolean needFlush = false;
synchronized (nonFlushedCounter) {
if (nonFlushedCounter.addAndGet(bytes.length) >= AUTO_FLUSH_THRESHOLD) {
nonFlushedCounter.set(0);
needFlush = true;
}
}
if (needFlush) {
// flush outside the sync block
flush(ctx);
//System.out.println("-- auto flush - buffer");
}
super.write(ctx, Unpooled.copiedBuffer(bytes), promise);
}

@Override
Expand Down Expand Up @@ -224,6 +255,24 @@ protected void eventReceived(ChannelHandlerContext ctx, String evt) throws Excep
@Override
public void closeConnection() {
try {
channel.flush();

long waitStarted = System.nanoTime();
long waitUntil = waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos
for (var future : responses) {
if (future.isDone()) {
continue;
}
try {
future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS);
} catch (InterruptedException | ExecutionException ignored) {
} catch (TimeoutException e) {
future.cancel(true);
// TODO cancel the rest
break;
}
}

// channel.closeFuture().sync();
// } catch (InterruptedException ignored) {
} finally {
Expand Down Expand Up @@ -374,46 +423,54 @@ public Future<Response> asyncConnectToRedis(ConnectionSettings connectionSetting
return future;
}

@Override
public Future<Response> asyncSet(String key, String value) {
private CompletableFuture<Response> submitNewCommand(RequestType command, List<String> args) {
int callbackId = getNextCallbackId();
//System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId);
//System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), callbackId);
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(callbackId)
.setSingleCommand(
Command.newBuilder()
.setRequestType(RequestType.SetString)
.setArgsArray(ArgsArray.newBuilder().addArgs(key).addArgs(value).build())
.setRequestType(command)
.setArgsArray(ArgsArray.newBuilder().addAllArgs(args).build())
.build())
.setRoute(
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.AllNodes)
.build())
.build();
channel.writeAndFlush(request.toByteArray());
return responses.get(callbackId);
channel.write(request.toByteArray());
return autoFlushFutureWrapper(responses.get(callbackId));
}

private <T> CompletableFuture<T> autoFlushFutureWrapper(Future<T> future) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get(AUTO_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
//System.out.println("-- auto flush - timeout");
channel.flush();
}
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}

@Override
public Future<Response> asyncSet(String key, String value) {
//System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId);
return submitNewCommand(RequestType.SetString, List.of(key, value));
}

@Override
public Future<String> asyncGet(String key) {
int callbackId = getNextCallbackId();
//System.out.printf("== get(%s), callback %d%n", key, callbackId);
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(callbackId)
.setSingleCommand(
Command.newBuilder()
.setRequestType(RequestType.GetString)
.setArgsArray(ArgsArray.newBuilder().addArgs(key).build())
.build())
.setRoute(
Routes.newBuilder()
.setSimpleRoutes(SimpleRoutes.AllNodes)
.build())
.build();
channel.writeAndFlush(request.toByteArray());
return responses.get(callbackId)
return submitNewCommand(RequestType.GetString, List.of(key))
.thenApply(response -> response.hasRespPointer()
? RedisClient.valueFromPointer(response.getRespPointer()).toString()
: null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static void printResults(
public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
int iterations = 1000;
int iterations = 100000;
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
for (int clientCount : config.clientCount) {
for (int dataSize : config.dataSize) {
Expand Down Expand Up @@ -263,6 +263,9 @@ public static void testClientSetGet(
iterations / ((after - before) / TPS_NORMALIZATION));
}
printResults(calculatedResults, (after - before) / TPS_NORMALIZATION, iterations);
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {}
}
}
}
Expand Down

0 comments on commit 5f10964

Please sign in to comment.