Skip to content

Commit

Permalink
Iteration 1.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Oct 31, 2023
1 parent d6b72ac commit 58f7b58
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 3 deletions.
2 changes: 2 additions & 0 deletions babushka-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,8 @@ async fn read_values_loop(
return reason;
}
ReceivedValues(received_requests) => {
print!("Received {} requests: {:?}", received_requests.len(), received_requests);
log_error("parse input", format!("Received {} requests: {:?}", received_requests.len(), received_requests));
handle_requests(received_requests, client, &writer).await;
}
}
Expand Down
1 change: 1 addition & 0 deletions java/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"]
logger_core = {path = "../logger_core"}
tracing-subscriber = "0.3.16"
jni = "0.21.1"
log = "0.4.20"

[profile.release]
lto = true
Expand Down
4 changes: 4 additions & 0 deletions java/benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ dependencies {
// https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.24.3'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1'
implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final'
// https://github.com/netty/netty/wiki/Native-transports
implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64'

compileOnly 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'
Expand All @@ -37,6 +40,7 @@ java {
application {
// Define the main class for the application.
mainClass = 'javababushka.benchmarks.BenchmarkingApp'
mainClass = 'javababushka.benchmarks.clients.babushka.JniNettyClient'
applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/debug"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package javababushka.benchmarks.clients.babushka;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CombinedChannelDuplexHandler;

public class ChannelHandler extends CombinedChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package javababushka.benchmarks.clients.babushka;

import static connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import static connection_request.ConnectionRequestOuterClass.AddressInfo;
import static connection_request.ConnectionRequestOuterClass.ReadFromReplicaStrategy;
import static connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy;
import static connection_request.ConnectionRequestOuterClass.AuthenticationInfo;
import static connection_request.ConnectionRequestOuterClass.TlsMode;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.SimpleUserEventChannelHandler;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.UnixChannel;
import javababushka.benchmarks.clients.SyncClient;
import javababushka.benchmarks.utils.ConnectionSettings;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import javababushka.client.RedisClient;

import java.nio.charset.StandardCharsets;

public class JniNettyClient implements SyncClient {

private final static String unixSocket = getSocket();

private Channel channel = null;

// TODO static or move to constructor?
private static String getSocket() {
try {
return RedisClient.startSocketListenerExternal();
} catch (Exception | UnsatisfiedLinkError e) {
System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e);
return null;
}
}

@Override
public void connectToRedis() {
connectToRedis(new ConnectionSettings("localhost", 6379, false));
}

@Override
public void connectToRedis(ConnectionSettings connectionSettings) {

// TODO maybe move to constructor or to static?
// ======
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new EpollEventLoopGroup();
//EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap
.group(group)
.channel(EpollDomainSocketChannel.class)
.handler(new ChannelInitializer<UnixChannel>() {
@Override
public void initChannel(UnixChannel ch) throws Exception {
ch
.pipeline()
// TODO encoder/decoder
.addLast(new ChannelInboundHandlerAdapter())
.addLast(new ChannelOutboundHandlerAdapter());
/*
.addLast(new SimpleUserEventChannelHandler<String>() {
@Override
protected void eventReceived(ChannelHandlerContext ctx, String evt) throws Exception {
}
});
*/
//.addLast(new CombinedChannelDuplexHandler(new ChannelInboundHandler(), new ChannelOutboundHandler()));
}
});
channel = bootstrap.connect(new DomainSocketAddress(unixSocket)).sync().channel();


//channel.writeAndFlush(request);

//channel.closeFuture().sync();
}
catch (Exception e) {
int a = 5;
} finally {
//epollEventLoopGroup.shutdownGracefully();
}
// ======

var request = ConnectionRequest.newBuilder()
.addAddresses(
AddressInfo.newBuilder()
.setHost(connectionSettings.host)
.setPort(connectionSettings.port))
.setTlsMode(connectionSettings.useSsl // TODO: secure or insecure TLS?
? TlsMode.SecureTls
: TlsMode.NoTls)
.setClusterModeEnabled(false)
// In millis
.setResponseTimeout(250)
// In millis
.setClientCreationTimeout(2500)
.setReadFromReplicaStrategy(ReadFromReplicaStrategy.AlwaysFromPrimary)
.setConnectionRetryStrategy(
ConnectionRetryStrategy.newBuilder()
.setNumberOfRetries(1)
.setFactor(1)
.setExponentBase(1))
.setAuthenticationInfo(
AuthenticationInfo.newBuilder()
.setPassword("")
.setUsername("default"))
.setDatabaseId(0)
.build();

channel.writeAndFlush(request.toByteArray());
channel.read();
}

@Override
public String getName() {
return "JNI Netty";
}

@Override
public void set(String key, String value) {

}

@Override
public String get(String key) {
return null;
}

public static void main(String[] args) {
var client = new JniNettyClient();
client.connectToRedis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public static void testClientSetGet(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientCreator.get().getName(), clientCount, concurrentNum, dataSize);
AtomicInteger iterationCounter = new AtomicInteger(0);
// Collections.synchronizedList
Map<ChosenAction, List<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
public class RedisClient {
public static native void startSocketListenerExternal(RedisClient callback);

public static native String startSocketListenerExternal() throws Exception;

public static native Object valueFromPointer(long pointer);

static {
Expand Down
37 changes: 37 additions & 0 deletions java/javababushka_client_RedisClient.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 54 additions & 3 deletions java/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use babushka::start_socket_listener;

use jni::objects::{JClass, JObject};
use jni::objects::{JClass, JObject, JThrowable};
use jni::JNIEnv;
use jni::sys::jlong;
use std::sync::mpsc;

use log::error;
use logger_core::Level;
use redis::Value;

fn redis_value_to_java<'local>(mut env: JNIEnv<'local>, val: Value) -> JObject<'local> {
Expand Down Expand Up @@ -46,7 +47,7 @@ pub extern "system" fn Java_javababushka_client_RedisClient_valueFromPointer<'lo
}

#[no_mangle]
pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerExternal<'local>(
pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerExternal__Ljavababushka_client_RedisClient_2<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
callback: JObject<'local>
Expand Down Expand Up @@ -74,3 +75,53 @@ pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerE
// Wait until the thread has started
rx.recv().unwrap();
}

#[no_mangle]
pub extern "system" fn Java_javababushka_client_RedisClient_startSocketListenerExternal__<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>
) -> JObject<'local> {
let (tx, rx) = mpsc::channel::<Result<String, String>>();

logger_core::init(Some(Level::Trace), None);

start_socket_listener(move |socket_path : Result<String, String>| {
// Signals that thread has started
let _ = tx.send(socket_path);
});

// Wait until the thread has started
let socket_path = rx.recv();

match socket_path {
Ok(Ok(path)) => {
env.new_string(path).unwrap().into()
},
Ok(Err(error_message)) => {
throw_java_exception(env, error_message);
JObject::null()
},
Err(error) => {
throw_java_exception(env, error.to_string());
JObject::null()
}
}
}

fn throw_java_exception(mut env: JNIEnv, message: String) {
let res = env.new_object(
"java/lang/Exception",
"(Ljava/lang/String;)V",
&[
(&env.new_string(message.clone()).unwrap()).into(),
]);

match res {
Ok(res) => {
env.throw(JThrowable::from(res));
},
Err(err) => {
error!("Failed to create exception with string {}: {}", message, err.to_string());
}
};
}

0 comments on commit 58f7b58

Please sign in to comment.