From 5f10964407bda17bd02bfd46c1e53e9167c11048 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 1 Nov 2023 18:47:13 -0700 Subject: [PATCH] Iteration 5: some fixes. Signed-off-by: Yury-Fridlyand --- .../benchmarks/BenchmarkingApp.java | 4 +- .../clients/babushka/JniNettyClient.java | 115 +++++++++++++----- .../benchmarks/utils/Benchmarking.java | 5 +- 3 files changed, 92 insertions(+), 32 deletions(-) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index ad5bffe1a2..ff4573693b 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -245,12 +245,12 @@ public RunConfiguration() { configuration = "Release"; resultsFile = Optional.of("res_java.json");//Optional.empty(); dataSize = new int[] {100, 4000}; - concurrentTasks = new int[] {100, 1000}; + concurrentTasks = new int[] {100}; clients = new ClientName[] { // ClientName.BABUSHKA_ASYNC, //ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC - ClientName.JNI_NETTY + ClientName.JNI_NETTY, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC }; host = "localhost"; port = 6379; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java index 600d674580..5c9e9023fc 100755 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -18,12 +18,14 @@ import com.google.protobuf.InvalidProtocolBufferException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -31,6 +33,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleUserEventChannelHandler; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.kqueue.KQueue; @@ -69,13 +72,23 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; public class JniNettyClient implements SyncClient, AsyncClient, AutoCloseable { + // https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html + private final static int AUTO_FLUSH_THRESHOLD = 512;//1024; + private final AtomicInteger nonFlushedCounter = new AtomicInteger(0); + + private final static int AUTO_FLUSH_TIMEOUT_MILLIS = 100; + + private final static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000; + // Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection request always). + // TODO clean up completed futures private final List> responses = Collections.synchronizedList(new ArrayList<>()); - private final static String unixSocket = getSocket(); + private final String unixSocket = getSocket(); // TODO static or move to constructor? private static String getSocket() { @@ -83,13 +96,15 @@ private static String getSocket() { return RedisClient.startSocketListenerExternal(); } catch (Exception | UnsatisfiedLinkError e) { System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e); - return null; + throw new RuntimeException(e); } } private Channel channel = null; private EventLoopGroup group = null; + // We support MacOS and Linux only, because Babushka does not support Windows, because tokio does not support it. + // Probably we should use NIO (NioEventLoopGroup) for Windows. private final static boolean isMacOs = isMacOs(); private static boolean isMacOs() { try { @@ -101,6 +116,7 @@ private static boolean isMacOs() { } static { + // TODO fix: netty still doesn't use slf4j nor log4j InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); } @@ -128,7 +144,7 @@ public void connectToRedis(ConnectionSettings connectionSettings) { Response connected = null; try { connected = waitForResult(asyncConnectToRedis(connectionSettings)); - System.out.printf("Connection %s%n", connected != null ? connected.getConstantResponse() : null); + //System.out.printf("Connection %s%n", connected != null ? connected.getConstantResponse() : null); } catch (Exception e) { System.err.println("Connection time out"); } @@ -139,9 +155,11 @@ public void connectToRedis(ConnectionSettings connectionSettings) { private void createChannel() { // TODO maybe move to constructor or to static? // ====== - //EventLoopGroup group = new NioEventLoopGroup(); try { channel = new Bootstrap() + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(1024 * 1024 * 2 + 10, 1024 * 1024 * 10)) + .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) .group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup()) .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) .handler(new ChannelInitializer() { @@ -191,8 +209,21 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //System.out.printf("=== write %s %s %s %n", ctx, msg, promise); - - super.write(ctx, Unpooled.copiedBuffer((byte[])msg), promise); + var bytes = (byte[])msg; + + boolean needFlush = false; + synchronized (nonFlushedCounter) { + if (nonFlushedCounter.addAndGet(bytes.length) >= AUTO_FLUSH_THRESHOLD) { + nonFlushedCounter.set(0); + needFlush = true; + } + } + if (needFlush) { + // flush outside the sync block + flush(ctx); + //System.out.println("-- auto flush - buffer"); + } + super.write(ctx, Unpooled.copiedBuffer(bytes), promise); } @Override @@ -224,6 +255,24 @@ protected void eventReceived(ChannelHandlerContext ctx, String evt) throws Excep @Override public void closeConnection() { try { + channel.flush(); + + long waitStarted = System.nanoTime(); + long waitUntil = waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos + for (var future : responses) { + if (future.isDone()) { + continue; + } + try { + future.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); + } catch (InterruptedException | ExecutionException ignored) { + } catch (TimeoutException e) { + future.cancel(true); + // TODO cancel the rest + break; + } + } + // channel.closeFuture().sync(); // } catch (InterruptedException ignored) { } finally { @@ -374,46 +423,54 @@ public Future asyncConnectToRedis(ConnectionSettings connectionSetting return future; } - @Override - public Future asyncSet(String key, String value) { + private CompletableFuture submitNewCommand(RequestType command, List args) { int callbackId = getNextCallbackId(); - //System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); + //System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), callbackId); RedisRequest request = RedisRequest.newBuilder() .setCallbackIdx(callbackId) .setSingleCommand( Command.newBuilder() - .setRequestType(RequestType.SetString) - .setArgsArray(ArgsArray.newBuilder().addArgs(key).addArgs(value).build()) + .setRequestType(command) + .setArgsArray(ArgsArray.newBuilder().addAllArgs(args).build()) .build()) .setRoute( Routes.newBuilder() .setSimpleRoutes(SimpleRoutes.AllNodes) .build()) .build(); - channel.writeAndFlush(request.toByteArray()); - return responses.get(callbackId); + channel.write(request.toByteArray()); + return autoFlushFutureWrapper(responses.get(callbackId)); + } + + private CompletableFuture autoFlushFutureWrapper(Future future) { + return CompletableFuture.supplyAsync(() -> { + try { + return future.get(AUTO_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + //System.out.println("-- auto flush - timeout"); + channel.flush(); + } + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public Future asyncSet(String key, String value) { + //System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); + return submitNewCommand(RequestType.SetString, List.of(key, value)); } @Override public Future asyncGet(String key) { - int callbackId = getNextCallbackId(); //System.out.printf("== get(%s), callback %d%n", key, callbackId); - RedisRequest request = - RedisRequest.newBuilder() - .setCallbackIdx(callbackId) - .setSingleCommand( - Command.newBuilder() - .setRequestType(RequestType.GetString) - .setArgsArray(ArgsArray.newBuilder().addArgs(key).build()) - .build()) - .setRoute( - Routes.newBuilder() - .setSimpleRoutes(SimpleRoutes.AllNodes) - .build()) - .build(); - channel.writeAndFlush(request.toByteArray()); - return responses.get(callbackId) + return submitNewCommand(RequestType.GetString, List.of(key)) .thenApply(response -> response.hasRespPointer() ? RedisClient.valueFromPointer(response.getRespPointer()).toString() : null); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index c385dc333b..95d3a8ffac 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -148,7 +148,7 @@ public static void printResults( public static void testClientSetGet( Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { for (int concurrentNum : config.concurrentTasks) { - int iterations = 1000; + int iterations = 100000; Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX); for (int clientCount : config.clientCount) { for (int dataSize : config.dataSize) { @@ -263,6 +263,9 @@ public static void testClientSetGet( iterations / ((after - before) / TPS_NORMALIZATION)); } printResults(calculatedResults, (after - before) / TPS_NORMALIZATION, iterations); + try { + Thread.sleep(2000); + } catch (InterruptedException ignored) {} } } }