Skip to content

Commit

Permalink
Iteration 4: benchmark.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 1, 2023
1 parent 0b03dc5 commit a3075d5
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 45 deletions.
2 changes: 0 additions & 2 deletions babushka-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,6 @@ impl Client {
pub async fn new(request: ConnectionRequest) -> Result<Self, ConnectionError> {
const DEFAULT_CLIENT_CREATION_TIMEOUT: Duration = Duration::from_millis(2500);

log_info("Connection configuration", format!("received {} addresses", request.addresses.len()));

log_info(
"Connection configuration",
sanitized_request_string(&request),
Expand Down
8 changes: 0 additions & 8 deletions babushka-core/src/rotating_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,11 @@ impl RotatingBuffer {
if (start_pos + request_len as usize) > buffer_len {
break;
} else {
log_error("parse input", format!("incoming: start {start_pos}, len {request_len}, buffer_len: {buffer_len}"));
let bytes = buffer.slice(start_pos..start_pos + request_len as usize);
log_error("parse input", format!("{:#x?}", bytes.as_ref()));
//let str_bytes = std::str::from_utf8(&bytes[..]);
//log_error("String bytes", str_bytes.unwrap().to_string());
match T::parse_from_tokio_bytes(
&buffer.slice(start_pos..start_pos + request_len as usize),
) {
Ok(request) => {
prev_position += request_len as usize + bytes_read;

results.push(request);
}
Err(err) => {
Expand All @@ -52,8 +46,6 @@ impl RotatingBuffer {
}
}

log_error("parse input", format!("results: {}", results.len()));

if prev_position != buffer.len() {
self.backing_buffer
.extend_from_slice(&buffer[prev_position..]);
Expand Down
12 changes: 0 additions & 12 deletions babushka-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,6 @@ async fn write_to_output(writer: &Rc<Writer>) {
if output.is_empty() {
return;
}

log_warn("write_to_output", format!("output: {} {:?}", output.len(), output));

let mut total_written_bytes = 0;
while total_written_bytes < output.len() {
if let Err(err) = writer.socket.writable().await {
Expand Down Expand Up @@ -168,8 +165,6 @@ async fn write_closing_error(
callback_index: u32,
writer: &Rc<Writer>,
) -> Result<(), io::Error> {
log_warn("write_closing_error", format!("err: {}, callback: {callback_index}", err.err_message));

let err = err.err_message;
log_error("client creation", err.as_str());
let mut response = Response::new();
Expand All @@ -184,9 +179,6 @@ async fn write_result(
callback_index: u32,
writer: &Rc<Writer>,
) -> Result<(), io::Error> {

log_warn("write_result", format!("resp_result: {resp_result:?}, callback: {callback_index}"));

let mut response = Response::new();
response.callback_idx = callback_index;
response.value = match resp_result {
Expand Down Expand Up @@ -249,8 +241,6 @@ async fn write_to_writer(response: Response, writer: &Rc<Writer>) -> Result<(),
let mut vec = writer.accumulated_outputs.take();
let encode_result = response.write_length_delimited_to_vec(&mut vec);

log_warn("write_to_writer", format!("Response: {response:?}"));

// Write the response' length to the buffer
match encode_result {
Ok(_) => {
Expand Down Expand Up @@ -535,8 +525,6 @@ async fn read_values_loop(
return reason;
}
ReceivedValues(received_requests) => {
print!("Received {} requests: {:?}", received_requests.len(), received_requests);
log_error("parse input", format!("Received {} requests: {:?}", received_requests.len(), received_requests));
handle_requests(received_requests, client, &writer).await;
}
}
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/utilities/csv_exporter.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/python3

import csv
import json
import os
Expand All @@ -12,7 +14,7 @@
"is_cluster",
"num_of_tasks",
"data_size",
"clientCount",
"client_count",
"tps",
"get_non_existing_p50_latency",
"get_non_existing_p90_latency",
Expand Down Expand Up @@ -51,5 +53,5 @@
values = [json_object[field] for field in base_fields]
writer.writerow(values)

for json_file_full_path in sys.argv[1:-1]:
os.remove(json_file_full_path)
# for json_file_full_path in sys.argv[1:-1]:
# os.remove(json_file_full_path)
2 changes: 1 addition & 1 deletion java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ java {
application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
// mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Stream;

import javababushka.benchmarks.clients.babushka.JniNettyClient;
import javababushka.benchmarks.clients.babushka.JniSyncClient;
import javababushka.benchmarks.clients.jedis.JedisClient;
import javababushka.benchmarks.clients.jedis.JedisPseudoAsyncClient;
Expand Down Expand Up @@ -64,6 +66,10 @@ public static void main(String[] args) {
case BABUSHKA_JNI:
testClientSetGet(JniSyncClient::new, runConfiguration, false);
break;
case JNI_NETTY:
testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false);
testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true);
break;
case BABUSHKA_ASYNC:
System.out.println("Babushka async not yet configured");
break;
Expand Down Expand Up @@ -195,6 +201,7 @@ private static int[] parseIntListOption(String line) throws ParseException {
}

public enum ClientName {
JNI_NETTY("JNI netty"),
JEDIS("Jedis"),
JEDIS_ASYNC("Jedis async"),
LETTUCE("Lettuce"),
Expand Down Expand Up @@ -236,13 +243,14 @@ public static class RunConfiguration {

public RunConfiguration() {
configuration = "Release";
resultsFile = Optional.empty();
resultsFile = Optional.of("res_java.json");//Optional.empty();
dataSize = new int[] {100, 4000};
concurrentTasks = new int[] {100, 1000};
clients =
new ClientName[] {
// ClientName.BABUSHKA_ASYNC,
ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
//ClientName.JEDIS, ClientName.JEDIS_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC
ClientName.JNI_NETTY
};
host = "localhost";
port = 6379;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ private static boolean isMacOs() {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
}

public JniNettyClient(boolean async) {
name += async ? " async" : " sync";
}

public JniNettyClient() {}

private String name = "JNI Netty";

@Override
public String getName() {
return name;
}

@Override
public void connectToRedis() {
connectToRedis(new ConnectionSettings("localhost", 6379, false));
Expand Down Expand Up @@ -150,7 +163,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
buf.readBytes(bytes);
// TODO surround parsing with try-catch
var response = Response.parseFrom(bytes);
System.out.printf("== Received response with callback %d%n", response.getCallbackIdx());
//System.out.printf("== Received response with callback %d%n", response.getCallbackIdx());
responses.get(response.getCallbackIdx()).complete(response);
super.channelRead(ctx, bytes);
}
Expand All @@ -165,26 +178,26 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
.addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.printf("=== bind %s %s %s %n", ctx, localAddress, promise);
//System.out.printf("=== bind %s %s %s %n", ctx, localAddress, promise);
super.bind(ctx, localAddress, promise);
}

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.printf("=== connect %s %s %s %s %n", ctx, remoteAddress, localAddress, promise);
//System.out.printf("=== connect %s %s %s %s %n", ctx, remoteAddress, localAddress, promise);
super.connect(ctx, remoteAddress, localAddress, promise);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.printf("=== write %s %s %s %n", ctx, msg, promise);
//System.out.printf("=== write %s %s %s %n", ctx, msg, promise);

super.write(ctx, Unpooled.copiedBuffer((byte[])msg), promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
System.out.printf("=== flush %s %n", ctx);
//System.out.printf("=== flush %s %n", ctx);
super.flush(ctx);
}
});
Expand Down Expand Up @@ -218,11 +231,6 @@ public void closeConnection() {
}
}

@Override
public String getName() {
return "JNI Netty";
}

@Override
public void set(String key, String value) {
waitForResult(asyncSet(key, value));
Expand Down Expand Up @@ -296,6 +304,29 @@ public static void main(String[] args) {
long afterGetE = System.nanoTime();
System.out.printf("++++ get E: %d%n", afterGetE - beforeGetE);

///////

long beforeSetA = System.nanoTime();
for (int i = 0; i < 1000; i++) {
client.asyncSet("name", "value");
}
long afterSetA = System.nanoTime();
System.out.printf("++++ set: %d%n", afterSetA - beforeSetA);

long beforeGetNEA = System.nanoTime();
for (int i = 0; i < 1000; i++) {
client.asyncGet("namevalue");
}
long afterGetNEA = System.nanoTime();
System.out.printf("++++ get NE: %d%n", afterGetNEA - beforeGetNEA);

long beforeGetEA = System.nanoTime();
for (int i = 0; i < 1000; i++) {
client.asyncGet(key);
}
long afterGetEA = System.nanoTime();
System.out.printf("++++ get E: %d%n", afterGetEA - beforeGetEA);

client.closeConnection();
}

Expand Down Expand Up @@ -346,7 +377,7 @@ public Future<Response> asyncConnectToRedis(ConnectionSettings connectionSetting
@Override
public Future<Response> asyncSet(String key, String value) {
int callbackId = getNextCallbackId();
System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId);
//System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId);
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(callbackId)
Expand All @@ -367,7 +398,7 @@ public Future<Response> asyncSet(String key, String value) {
@Override
public Future<String> asyncGet(String key) {
int callbackId = getNextCallbackId();
System.out.printf("== get(%s), callback %d%n", key, callbackId);
//System.out.printf("== get(%s), callback %d%n", key, callbackId);
RedisRequest request =
RedisRequest.newBuilder()
.setCallbackIdx(callbackId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static void printResults(
public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
int iterations =
int iterations = 1000;
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
for (int clientCount : config.clientCount) {
for (int dataSize : config.dataSize) {
Expand Down Expand Up @@ -249,6 +249,8 @@ public static void testClientSetGet(
});
long after = System.nanoTime();

clients.forEach(Client::closeConnection);

var calculatedResults = calculateResults(actionResults);
if (config.resultsFile.isPresent()) {
JsonWriter.Write(
Expand Down
5 changes: 1 addition & 4 deletions java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use logger_core::Level;
use redis::Value;

fn redis_value_to_java<'local>(mut env: JNIEnv<'local>, val: Value) -> JObject<'local> {
println!("==r value {:?}", val);
match val {
Value::Nil => JObject::null(),
Value::Status(str) => JObject::from(env.new_string(str).unwrap()),
Expand Down Expand Up @@ -44,9 +43,7 @@ pub extern "system" fn Java_javababushka_client_RedisClient_valueFromPointer<'lo
_class: JClass<'local>,
pointer: jlong
) -> JObject<'local> {
println!("==r pointer {:?}", pointer);
let value = unsafe { Box::from_raw(pointer as *mut Value) };
println!("==r value {:?}", value);
redis_value_to_java(env, *value)
}

Expand Down Expand Up @@ -87,7 +84,7 @@ pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerE
) -> JObject<'local> {
let (tx, rx) = mpsc::channel::<Result<String, String>>();

logger_core::init(Some(Level::Trace), None);
//logger_core::init(Some(Level::Trace), None);

start_socket_listener(move |socket_path : Result<String, String>| {
// Signals that thread has started
Expand Down

0 comments on commit a3075d5

Please sign in to comment.