From a3075d5ae3eaff3796cd0049431a2aef299fdd4b Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 1 Nov 2023 12:22:30 -0700 Subject: [PATCH] Iteration 4: benchmark. Signed-off-by: Yury-Fridlyand --- babushka-core/src/client/mod.rs | 2 - babushka-core/src/rotating_buffer.rs | 8 --- babushka-core/src/socket_listener.rs | 12 ---- benchmarks/utilities/csv_exporter.py | 8 ++- java/benchmarks/build.gradle | 2 +- .../benchmarks/BenchmarkingApp.java | 12 +++- .../clients/babushka/JniNettyClient.java | 55 +++++++++++++++---- .../benchmarks/utils/Benchmarking.java | 4 +- java/src/lib.rs | 5 +- 9 files changed, 63 insertions(+), 45 deletions(-) mode change 100644 => 100755 benchmarks/utilities/csv_exporter.py diff --git a/babushka-core/src/client/mod.rs b/babushka-core/src/client/mod.rs index 3aa16c4edd..f9927b6c56 100644 --- a/babushka-core/src/client/mod.rs +++ b/babushka-core/src/client/mod.rs @@ -264,8 +264,6 @@ impl Client { pub async fn new(request: ConnectionRequest) -> Result { const DEFAULT_CLIENT_CREATION_TIMEOUT: Duration = Duration::from_millis(2500); - log_info("Connection configuration", format!("received {} addresses", request.addresses.len())); - log_info( "Connection configuration", sanitized_request_string(&request), diff --git a/babushka-core/src/rotating_buffer.rs b/babushka-core/src/rotating_buffer.rs index 6b31b21f54..2a00642cec 100644 --- a/babushka-core/src/rotating_buffer.rs +++ b/babushka-core/src/rotating_buffer.rs @@ -28,17 +28,11 @@ impl RotatingBuffer { if (start_pos + request_len as usize) > buffer_len { break; } else { - log_error("parse input", format!("incoming: start {start_pos}, len {request_len}, buffer_len: {buffer_len}")); - let bytes = buffer.slice(start_pos..start_pos + request_len as usize); - log_error("parse input", format!("{:#x?}", bytes.as_ref())); - //let str_bytes = std::str::from_utf8(&bytes[..]); - //log_error("String bytes", str_bytes.unwrap().to_string()); match T::parse_from_tokio_bytes( &buffer.slice(start_pos..start_pos + request_len as usize), ) { Ok(request) => { prev_position += request_len as usize + bytes_read; - results.push(request); } Err(err) => { @@ -52,8 +46,6 @@ impl RotatingBuffer { } } - log_error("parse input", format!("results: {}", results.len())); - if prev_position != buffer.len() { self.backing_buffer .extend_from_slice(&buffer[prev_position..]); diff --git a/babushka-core/src/socket_listener.rs b/babushka-core/src/socket_listener.rs index c861c30eb0..d2f319fce4 100644 --- a/babushka-core/src/socket_listener.rs +++ b/babushka-core/src/socket_listener.rs @@ -134,9 +134,6 @@ async fn write_to_output(writer: &Rc) { if output.is_empty() { return; } - - log_warn("write_to_output", format!("output: {} {:?}", output.len(), output)); - let mut total_written_bytes = 0; while total_written_bytes < output.len() { if let Err(err) = writer.socket.writable().await { @@ -168,8 +165,6 @@ async fn write_closing_error( callback_index: u32, writer: &Rc, ) -> Result<(), io::Error> { - log_warn("write_closing_error", format!("err: {}, callback: {callback_index}", err.err_message)); - let err = err.err_message; log_error("client creation", err.as_str()); let mut response = Response::new(); @@ -184,9 +179,6 @@ async fn write_result( callback_index: u32, writer: &Rc, ) -> Result<(), io::Error> { - - log_warn("write_result", format!("resp_result: {resp_result:?}, callback: {callback_index}")); - let mut response = Response::new(); response.callback_idx = callback_index; response.value = match resp_result { @@ -249,8 +241,6 @@ async fn write_to_writer(response: Response, writer: &Rc) -> Result<(), let mut vec = writer.accumulated_outputs.take(); let encode_result = response.write_length_delimited_to_vec(&mut vec); - log_warn("write_to_writer", format!("Response: {response:?}")); - // Write the response' length to the buffer match encode_result { Ok(_) => { @@ -535,8 +525,6 @@ async fn read_values_loop( return reason; } ReceivedValues(received_requests) => { - print!("Received {} requests: {:?}", received_requests.len(), received_requests); - log_error("parse input", format!("Received {} requests: {:?}", received_requests.len(), received_requests)); handle_requests(received_requests, client, &writer).await; } } diff --git a/benchmarks/utilities/csv_exporter.py b/benchmarks/utilities/csv_exporter.py old mode 100644 new mode 100755 index 3d48adfe17..af8d1b9259 --- a/benchmarks/utilities/csv_exporter.py +++ b/benchmarks/utilities/csv_exporter.py @@ -1,3 +1,5 @@ +#!/bin/python3 + import csv import json import os @@ -12,7 +14,7 @@ "is_cluster", "num_of_tasks", "data_size", - "clientCount", + "client_count", "tps", "get_non_existing_p50_latency", "get_non_existing_p90_latency", @@ -51,5 +53,5 @@ values = [json_object[field] for field in base_fields] writer.writerow(values) -for json_file_full_path in sys.argv[1:-1]: - os.remove(json_file_full_path) +# for json_file_full_path in sys.argv[1:-1]: +# os.remove(json_file_full_path) diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index 0ae52dad19..e719004762 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -45,7 +45,7 @@ java { application { // Define the main class for the application. mainClass = 'javababushka.benchmarks.BenchmarkingApp' - mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient' + // mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient' applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug" } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index 9e19abd4e3..ad5bffe1a2 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -5,6 +5,8 @@ import java.util.Arrays; import java.util.Optional; import java.util.stream.Stream; + +import javababushka.benchmarks.clients.babushka.JniNettyClient; import javababushka.benchmarks.clients.babushka.JniSyncClient; import javababushka.benchmarks.clients.jedis.JedisClient; import javababushka.benchmarks.clients.jedis.JedisPseudoAsyncClient; @@ -64,6 +66,10 @@ public static void main(String[] args) { case BABUSHKA_JNI: testClientSetGet(JniSyncClient::new, runConfiguration, false); break; + case JNI_NETTY: + testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false); + testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true); + break; case BABUSHKA_ASYNC: System.out.println("Babushka async not yet configured"); break; @@ -195,6 +201,7 @@ private static int[] parseIntListOption(String line) throws ParseException { } public enum ClientName { + JNI_NETTY("JNI netty"), JEDIS("Jedis"), JEDIS_ASYNC("Jedis async"), LETTUCE("Lettuce"), @@ -236,13 +243,14 @@ public static class RunConfiguration { public RunConfiguration() { configuration = "Release"; - resultsFile = Optional.empty(); + resultsFile = Optional.of("res_java.json");//Optional.empty(); dataSize = new int[] {100, 4000}; concurrentTasks = new int[] {100, 1000}; clients = new ClientName[] { // ClientName.BABUSHKA_ASYNC, - ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC + //ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC + ClientName.JNI_NETTY }; host = "localhost"; port = 6379; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java index 8e1d6eadf0..600d674580 100755 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -104,6 +104,19 @@ private static boolean isMacOs() { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); } + public JniNettyClient(boolean async) { + name += async ? " async" : " sync"; + } + + public JniNettyClient() {} + + private String name = "JNI Netty"; + + @Override + public String getName() { + return name; + } + @Override public void connectToRedis() { connectToRedis(new ConnectionSettings("localhost", 6379, false)); @@ -150,7 +163,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception buf.readBytes(bytes); // TODO surround parsing with try-catch var response = Response.parseFrom(bytes); - System.out.printf("== Received response with callback %d%n", response.getCallbackIdx()); + //System.out.printf("== Received response with callback %d%n", response.getCallbackIdx()); responses.get(response.getCallbackIdx()).complete(response); super.channelRead(ctx, bytes); } @@ -165,26 +178,26 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E .addLast(new ChannelOutboundHandlerAdapter() { @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { - System.out.printf("=== bind %s %s %s %n", ctx, localAddress, promise); + //System.out.printf("=== bind %s %s %s %n", ctx, localAddress, promise); super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { - System.out.printf("=== connect %s %s %s %s %n", ctx, remoteAddress, localAddress, promise); + //System.out.printf("=== connect %s %s %s %s %n", ctx, remoteAddress, localAddress, promise); super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - System.out.printf("=== write %s %s %s %n", ctx, msg, promise); + //System.out.printf("=== write %s %s %s %n", ctx, msg, promise); super.write(ctx, Unpooled.copiedBuffer((byte[])msg), promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { - System.out.printf("=== flush %s %n", ctx); + //System.out.printf("=== flush %s %n", ctx); super.flush(ctx); } }); @@ -218,11 +231,6 @@ public void closeConnection() { } } - @Override - public String getName() { - return "JNI Netty"; - } - @Override public void set(String key, String value) { waitForResult(asyncSet(key, value)); @@ -296,6 +304,29 @@ public static void main(String[] args) { long afterGetE = System.nanoTime(); System.out.printf("++++ get E: %d%n", afterGetE - beforeGetE); + /////// + + long beforeSetA = System.nanoTime(); + for (int i = 0; i < 1000; i++) { + client.asyncSet("name", "value"); + } + long afterSetA = System.nanoTime(); + System.out.printf("++++ set: %d%n", afterSetA - beforeSetA); + + long beforeGetNEA = System.nanoTime(); + for (int i = 0; i < 1000; i++) { + client.asyncGet("namevalue"); + } + long afterGetNEA = System.nanoTime(); + System.out.printf("++++ get NE: %d%n", afterGetNEA - beforeGetNEA); + + long beforeGetEA = System.nanoTime(); + for (int i = 0; i < 1000; i++) { + client.asyncGet(key); + } + long afterGetEA = System.nanoTime(); + System.out.printf("++++ get E: %d%n", afterGetEA - beforeGetEA); + client.closeConnection(); } @@ -346,7 +377,7 @@ public Future asyncConnectToRedis(ConnectionSettings connectionSetting @Override public Future asyncSet(String key, String value) { int callbackId = getNextCallbackId(); - System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); + //System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); RedisRequest request = RedisRequest.newBuilder() .setCallbackIdx(callbackId) @@ -367,7 +398,7 @@ public Future asyncSet(String key, String value) { @Override public Future asyncGet(String key) { int callbackId = getNextCallbackId(); - System.out.printf("== get(%s), callback %d%n", key, callbackId); + //System.out.printf("== get(%s), callback %d%n", key, callbackId); RedisRequest request = RedisRequest.newBuilder() .setCallbackIdx(callbackId) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index 1ab295c258..c385dc333b 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -148,7 +148,7 @@ public static void printResults( public static void testClientSetGet( Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { for (int concurrentNum : config.concurrentTasks) { - int iterations = + int iterations = 1000; Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX); for (int clientCount : config.clientCount) { for (int dataSize : config.dataSize) { @@ -249,6 +249,8 @@ public static void testClientSetGet( }); long after = System.nanoTime(); + clients.forEach(Client::closeConnection); + var calculatedResults = calculateResults(actionResults); if (config.resultsFile.isPresent()) { JsonWriter.Write( diff --git a/java/src/lib.rs b/java/src/lib.rs index cf0d728508..020ace59eb 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -9,7 +9,6 @@ use logger_core::Level; use redis::Value; fn redis_value_to_java<'local>(mut env: JNIEnv<'local>, val: Value) -> JObject<'local> { - println!("==r value {:?}", val); match val { Value::Nil => JObject::null(), Value::Status(str) => JObject::from(env.new_string(str).unwrap()), @@ -44,9 +43,7 @@ pub extern "system" fn Java_javababushka_client_RedisClient_valueFromPointer<'lo _class: JClass<'local>, pointer: jlong ) -> JObject<'local> { - println!("==r pointer {:?}", pointer); let value = unsafe { Box::from_raw(pointer as *mut Value) }; - println!("==r value {:?}", value); redis_value_to_java(env, *value) } @@ -87,7 +84,7 @@ pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerE ) -> JObject<'local> { let (tx, rx) = mpsc::channel::>(); - logger_core::init(Some(Level::Trace), None); + //logger_core::init(Some(Level::Trace), None); start_socket_listener(move |socket_path : Result| { // Signals that thread has started