diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index e597ae4679..8ab6e5e562 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -6,6 +6,8 @@ import connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy; import connection_request.ConnectionRequestOuterClass.ReadFromReplicaStrategy; import connection_request.ConnectionRequestOuterClass.TlsMode; +import io.netty.channel.unix.Socket; + import java.io.IOException; import java.net.StandardProtocolFamily; import java.net.UnixDomainSocketAddress; @@ -182,23 +184,33 @@ private static byte[] readSocketMessage(SocketChannel channel) throws IOExceptio return bytes; } - /* - private static sendMessage() { - Byte[] varint = varintBytes(request.toByteArray().length); + private static void sendMessage(SocketChannel channel, byte[] request) throws IOException { + Byte[] varint = varintBytes(request.length); - System.out.println("Request: \n" + request.toString()); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); for (Byte b : varint) { buffer.put(b); } - buffer.put(request.toByteArray()); + buffer.put(request); buffer.flip(); while (buffer.hasRemaining()) { channel.write(buffer); } } - */ + + private static void printResponse(SocketChannel channel) throws IOException { + byte[] responseBuffer = readSocketMessage(channel); + + try { + Response response = decodeMessage(responseBuffer); + System.out.println(response); + Object o = RedisClient.valueFromPointer(response.getRespPointer()); + System.out.println(o); + } catch (Exception e) { + e.printStackTrace(); + } + } // main application entrypoint public static void main(String[] args) throws InterruptedException { @@ -217,7 +229,7 @@ public static void main(String[] args) throws InterruptedException { try { SocketChannel channel = SocketChannel.open(StandardProtocolFamily.UNIX); channel.connect(socketAddress); - ConnectionRequest request = + ConnectionRequest connectionRequest = ConnectionRequest.newBuilder() .addAddresses(AddressInfo.newBuilder().setHost("localhost").setPort(6379)) .setTlsMode(TlsMode.NoTls) @@ -237,78 +249,31 @@ public static void main(String[] args) throws InterruptedException { .setDatabaseId(0) .build(); - Byte[] varint = varintBytes(request.toByteArray().length); - - System.out.println("Request: \n" + request.toString()); - ByteBuffer buffer = ByteBuffer.allocate(1024); - buffer.clear(); - for (Byte b : varint) { - buffer.put(b); - } - buffer.put(request.toByteArray()); - buffer.flip(); - while (buffer.hasRemaining()) { - channel.write(buffer); - } - - timeout = 0; - byte[] responseBuffer = null; - while (responseBuffer == null && timeout < maxTimeout) { - timeout++; - responseBuffer = readSocketMessage(channel); - Thread.sleep(250); - } - - try { - Response response = decodeMessage(responseBuffer); - System.out.println(response); - } catch (Exception e) { - e.printStackTrace(); - } + sendMessage(channel, connectionRequest.toByteArray()); + readSocketMessage(channel); RedisRequest pingRequest = - RedisRequest.newBuilder() - .setCallbackIdx(0) - .setSingleCommand( - Command.newBuilder() - .setRequestType(RequestType.Ping) - .setArgsArray(ArgsArray.newBuilder())) - .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes)) - .build(); + RedisRequest.newBuilder() + .setCallbackIdx(0) + .setSingleCommand( + Command.newBuilder() + .setRequestType(RequestType.Ping) + .setArgsArray(ArgsArray.newBuilder())) + .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes)) + .build(); - Byte[] varint2 = varintBytes(pingRequest.toByteArray().length); + sendMessage(channel, pingRequest.toByteArray()); + printResponse(channel); - System.out.println("Request: \n" + pingRequest.toString()); - ByteBuffer pingBuffer = ByteBuffer.allocate(1024); - pingBuffer.clear(); - for (Byte b : varint2) { - pingBuffer.put(b); - } - pingBuffer.put(pingRequest.toByteArray()); - pingBuffer.flip(); - while (pingBuffer.hasRemaining()) { - channel.write(pingBuffer); - } - - System.out.println("Before read from socket"); - timeout = 0; - byte[] pingResponseBuffer = null; - while (pingResponseBuffer == null && timeout < maxTimeout) { - timeout++; - pingResponseBuffer = readSocketMessage(channel); - Thread.sleep(250); - } - System.out.println("After read from socket"); - - try { - Response pingResponse = decodeMessage(pingResponseBuffer); - System.out.println(pingResponse); - Object o = RedisClient.valueFromPointer(pingResponse.getRespPointer()); - - System.out.println(o); - } catch (Exception e) { - e.printStackTrace(); - } + RedisRequest getExistingRequest = + RedisRequest.newBuilder() + .setCallbackIdx(0) + .setSingleCommand( + Command.newBuilder() + .setRequestType(RequestType.Ping) + .setArgsArray(ArgsArray.newBuilder())) + .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes)) + .build(); } catch (IOException e) { e.printStackTrace();