diff --git a/babushka-core/src/client/standalone_client.rs b/babushka-core/src/client/standalone_client.rs index 3a21defe5d..051cce4483 100644 --- a/babushka-core/src/client/standalone_client.rs +++ b/babushka-core/src/client/standalone_client.rs @@ -5,7 +5,7 @@ use crate::retry_strategies::RetryStrategy; use futures::{stream, StreamExt}; #[cfg(standalone_heartbeat)] use logger_core::log_debug; -use logger_core::{log_trace, log_warn}; +use logger_core::{log_error, log_trace, log_warn}; use protobuf::EnumOrUnknown; use redis::cluster_routing::is_readonly; use redis::{RedisError, RedisResult, Value}; @@ -186,7 +186,7 @@ impl StandaloneClient { } pub async fn send_packed_command(&mut self, cmd: &redis::Cmd) -> RedisResult { - log_trace("StandaloneClient", "sending command"); + log_error("StandaloneClient", "sending command"); let reconnecting_connection = self.get_connection(cmd); let mut connection = reconnecting_connection.get_connection().await?; let result = connection.send_packed_command(cmd).await; @@ -199,7 +199,10 @@ impl StandaloneClient { reconnecting_connection.reconnect(); Err(err) } - _ => result, + _ => { + log_error("StandaloneClient", "received response"); + result + }, } } diff --git a/babushka-core/src/socket_listener.rs b/babushka-core/src/socket_listener.rs index 23ba58b62a..4d793550df 100644 --- a/babushka-core/src/socket_listener.rs +++ b/babushka-core/src/socket_listener.rs @@ -124,7 +124,7 @@ impl UnixStreamListener { } } -async fn write_to_output(writer: &Rc) { +async fn write_to_output(writer: &Rc, callback_idx: u32) { let Ok(_guard) = writer.lock.try_lock() else { return; }; @@ -135,6 +135,9 @@ async fn write_to_output(writer: &Rc) { return; } let mut total_written_bytes = 0; + if callback_idx > 0 { + log_error("callback id sent ", callback_idx.to_string()); + } while total_written_bytes < output.len() { if let Err(err) = writer.socket.writable().await { let _res = writer.closing_sender.send(err.into()).await; // we ignore the error, because it means that the reader was dropped, which is ok. @@ -155,6 +158,9 @@ async fn write_to_output(writer: &Rc) { } } } + if callback_idx > 0 { + log_error("callback id sentdone", callback_idx.to_string()); + } output.clear(); output = writer.accumulated_outputs.replace(output); } @@ -238,9 +244,6 @@ async fn write_result( } async fn write_to_writer(response: Response, writer: &Rc) -> Result<(), io::Error> { - if response.callback_idx > 0 { - log_error("callback id sent ", response.callback_idx.to_string()); - } let mut vec = writer.accumulated_outputs.take(); let encode_result = response.write_length_delimited_to_vec(&mut vec); @@ -248,7 +251,7 @@ async fn write_to_writer(response: Response, writer: &Rc) -> Result<(), match encode_result { Ok(_) => { writer.accumulated_outputs.set(vec); - write_to_output(writer).await; + write_to_output(writer, response.callback_idx).await; Ok(()) } Err(err) => { diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index bc41dc82ce..ef47fa9d02 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -52,7 +52,7 @@ application { // Define the main class for the application. mainClass = 'javababushka.benchmarks.BenchmarkingApp' mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient' - applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug" + applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release" } tasks.withType(Test) { @@ -65,7 +65,7 @@ tasks.withType(Test) { } long unixdump_pid = 0; - +/* task('start_unixdump') { var process = new ProcessBuilder() .command('sudo', 'unixdump', '-b', '-s', "/run/user/1000/babushka-socket-") @@ -85,10 +85,9 @@ task('start_unixdump') { break; } } -*/ unixdump_pid = process.pid() } - +/* task('stop_unixdump') { if (unixdump_pid > 0) { "sudo killall -9 unixdump".execute().waitFor() @@ -102,7 +101,7 @@ task startUnixdump(type: SpawnProcessTask) { command "unixdump -s /run/user/1000/babushka-socket-*" ready 'Listening...' } - +*/ // run.dependsOn startUnixdump task stopUnixdump(type: KillProcessTask) { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java index 25d480b4fb..341ab1e210 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -211,7 +211,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception long parseBefore = System.nanoTime(); var response = Response.parseFrom(bytes); PARSING_ON_READ_TIME.addAndGet(System.nanoTime() - parseBefore); - System.out.printf("%s received callback id %d%n", LocalDateTime.now(), response.getCallbackIdx()); + System.out.printf("%s received callback id %d%n", + LocalDateTime.now().minusNanos(System.nanoTime() - readBefore), response.getCallbackIdx()); //System.out.printf("== Received response with callback %d%n", response.getCallbackIdx()); long futureBefore1 = System.nanoTime(); var future = responses.get(response.getCallbackIdx()); @@ -225,7 +226,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception buf.release(); READ_TIME.addAndGet(System.nanoTime() - readBefore); READ_COUNT.incrementAndGet(); - RESPONSE_TIMESTAMPS.add(System.nanoTime()); + RESPONSE_TIMESTAMPS.add(readBefore); } @Override @@ -283,7 +284,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) //buffer.release(); WRITE_TIME.addAndGet(System.nanoTime() - writeBefore); WRITE_COUNT.incrementAndGet(); - REQUEST_TIMESTAMPS.add(System.nanoTime()); + REQUEST_TIMESTAMPS.add(innerWriteBefore); } @Override @@ -402,6 +403,8 @@ public static void main(String[] args) { client.connectToRedis(); var key = String.valueOf(ProcessHandle.current().pid()); + System.out.printf("PID = %s%n%n", key); + Thread.sleep(10000); /* var get_ne = client.get("sdf"); client.set(key, "asfsdf");