Skip to content

Commit

Permalink
More profiling.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 9, 2023
1 parent 6d6d0fc commit 1e2da76
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
9 changes: 6 additions & 3 deletions babushka-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::retry_strategies::RetryStrategy;
use futures::{stream, StreamExt};
#[cfg(standalone_heartbeat)]
use logger_core::log_debug;
use logger_core::{log_trace, log_warn};
use logger_core::{log_error, log_trace, log_warn};
use protobuf::EnumOrUnknown;
use redis::cluster_routing::is_readonly;
use redis::{RedisError, RedisResult, Value};
Expand Down Expand Up @@ -186,7 +186,7 @@ impl StandaloneClient {
}

pub async fn send_packed_command(&mut self, cmd: &redis::Cmd) -> RedisResult<Value> {
log_trace("StandaloneClient", "sending command");
log_error("StandaloneClient", "sending command");
let reconnecting_connection = self.get_connection(cmd);
let mut connection = reconnecting_connection.get_connection().await?;
let result = connection.send_packed_command(cmd).await;
Expand All @@ -199,7 +199,10 @@ impl StandaloneClient {
reconnecting_connection.reconnect();
Err(err)
}
_ => result,
_ => {
log_error("StandaloneClient", "received response");
result
},
}
}

Expand Down
13 changes: 8 additions & 5 deletions babushka-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl UnixStreamListener {
}
}

async fn write_to_output(writer: &Rc<Writer>) {
async fn write_to_output(writer: &Rc<Writer>, callback_idx: u32) {
let Ok(_guard) = writer.lock.try_lock() else {
return;
};
Expand All @@ -135,6 +135,9 @@ async fn write_to_output(writer: &Rc<Writer>) {
return;
}
let mut total_written_bytes = 0;
if callback_idx > 0 {
log_error("callback id sent ", callback_idx.to_string());
}
while total_written_bytes < output.len() {
if let Err(err) = writer.socket.writable().await {
let _res = writer.closing_sender.send(err.into()).await; // we ignore the error, because it means that the reader was dropped, which is ok.
Expand All @@ -155,6 +158,9 @@ async fn write_to_output(writer: &Rc<Writer>) {
}
}
}
if callback_idx > 0 {
log_error("callback id sentdone", callback_idx.to_string());
}
output.clear();
output = writer.accumulated_outputs.replace(output);
}
Expand Down Expand Up @@ -238,17 +244,14 @@ async fn write_result(
}

async fn write_to_writer(response: Response, writer: &Rc<Writer>) -> Result<(), io::Error> {
if response.callback_idx > 0 {
log_error("callback id sent ", response.callback_idx.to_string());
}
let mut vec = writer.accumulated_outputs.take();
let encode_result = response.write_length_delimited_to_vec(&mut vec);

// Write the response' length to the buffer
match encode_result {
Ok(_) => {
writer.accumulated_outputs.set(vec);
write_to_output(writer).await;
write_to_output(writer, response.callback_idx).await;
Ok(())
}
Err(err) => {
Expand Down
9 changes: 4 additions & 5 deletions java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug"
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release"
}

tasks.withType(Test) {
Expand All @@ -65,7 +65,7 @@ tasks.withType(Test) {
}

long unixdump_pid = 0;

/*
task('start_unixdump') {
var process = new ProcessBuilder()
.command('sudo', 'unixdump', '-b', '-s', "/run/user/1000/babushka-socket-")
Expand All @@ -85,10 +85,9 @@ task('start_unixdump') {
break;
}
}
*/
unixdump_pid = process.pid()
}

/*
task('stop_unixdump') {
if (unixdump_pid > 0) {
"sudo killall -9 unixdump".execute().waitFor()
Expand All @@ -102,7 +101,7 @@ task startUnixdump(type: SpawnProcessTask) {
command "unixdump -s /run/user/1000/babushka-socket-*"
ready 'Listening...'
}

*/
// run.dependsOn startUnixdump

task stopUnixdump(type: KillProcessTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
long parseBefore = System.nanoTime();
var response = Response.parseFrom(bytes);
PARSING_ON_READ_TIME.addAndGet(System.nanoTime() - parseBefore);
System.out.printf("%s received callback id %d%n", LocalDateTime.now(), response.getCallbackIdx());
System.out.printf("%s received callback id %d%n",
LocalDateTime.now().minusNanos(System.nanoTime() - readBefore), response.getCallbackIdx());
//System.out.printf("== Received response with callback %d%n", response.getCallbackIdx());
long futureBefore1 = System.nanoTime();
var future = responses.get(response.getCallbackIdx());
Expand All @@ -225,7 +226,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
buf.release();
READ_TIME.addAndGet(System.nanoTime() - readBefore);
READ_COUNT.incrementAndGet();
RESPONSE_TIMESTAMPS.add(System.nanoTime());
RESPONSE_TIMESTAMPS.add(readBefore);
}

@Override
Expand Down Expand Up @@ -283,7 +284,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
//buffer.release();
WRITE_TIME.addAndGet(System.nanoTime() - writeBefore);
WRITE_COUNT.incrementAndGet();
REQUEST_TIMESTAMPS.add(System.nanoTime());
REQUEST_TIMESTAMPS.add(innerWriteBefore);
}

@Override
Expand Down Expand Up @@ -402,6 +403,8 @@ public static void main(String[] args) {
client.connectToRedis();

var key = String.valueOf(ProcessHandle.current().pid());
System.out.printf("PID = %s%n%n", key);
Thread.sleep(10000);
/*
var get_ne = client.get("sdf");
client.set(key, "asfsdf");
Expand Down

0 comments on commit 1e2da76

Please sign in to comment.