diff --git a/babushka-core/src/socket_listener.rs b/babushka-core/src/socket_listener.rs index d12f557296..80430f3180 100644 --- a/babushka-core/src/socket_listener.rs +++ b/babushka-core/src/socket_listener.rs @@ -210,7 +210,7 @@ async fn write_result( error_message.into(), )) } else { - log_warn("received error", error_message.as_str()); + log_warn("received error", format!("{} for callback {}", error_message.as_str(), callback_index)); let mut request_error = response::RequestError::default(); if err.is_connection_dropped() { request_error.type_ = response::RequestErrorType::Disconnect.into(); @@ -476,7 +476,7 @@ fn handle_request(request: RedisRequest, client: Client, writer: Rc) { } }, None => Err(ClienUsageError::InternalError( - "Received empty request".to_string(), + format!("Received empty request for callback {}", request.callback_idx), )), }; diff --git a/java/.gitignore b/java/.gitignore index 8f60484e5a..a7b8d6a177 100644 --- a/java/.gitignore +++ b/java/.gitignore @@ -4,5 +4,5 @@ # Ignore Gradle build output directory build -# Ignore generated files (e.g. protobuf) -generated +# Ignore protobuf generated files +protobuf diff --git a/java/README.md b/java/README.md index 6fb456a9b4..415814634f 100644 --- a/java/README.md +++ b/java/README.md @@ -6,11 +6,11 @@ language api to enhance performance and limit cpu cycles at scale. ## Organization -The Java client (javababushka) contains the following parts: +The Java client contains the following parts: 1. A Java client (lib folder): wrapper to rust-client. -2. An examples script: to sanity test javababushka and similar java-clients against a redis host. -3. A benchmark app: to performance benchmark test javababushka and similar java-clients against a redis host. +2. An examples script: to sanity test babushka and similar java-clients against a redis host. +3. A benchmark app: to performance benchmark test babushka and similar java-clients against a redis host. ## Building diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index 6ade7532b0..317bb9fcde 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -15,8 +15,6 @@ dependencies { // Use JUnit test framework. testImplementation 'org.junit.jupiter:junit-jupiter:5.9.2' - // This dependency is used internally, and not exposed to consumers on their own compile classpath. - implementation 'com.google.guava:guava:32.1.1-jre' implementation 'redis.clients:jedis:4.4.3' implementation 'io.lettuce:lettuce-core:6.2.6.RELEASE' implementation 'commons-cli:commons-cli:1.5.0' @@ -36,7 +34,7 @@ java { application { // Define the main class for the application. - mainClass = 'javababushka.benchmarks.BenchmarkingApp' + mainClass = 'babushka.benchmarks.BenchmarkingApp' applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/babushka/benchmarks/BenchmarkingApp.java similarity index 88% rename from java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java rename to java/benchmarks/src/main/java/babushka/benchmarks/BenchmarkingApp.java index b401488584..4bd11b3597 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/BenchmarkingApp.java @@ -1,16 +1,15 @@ -package javababushka.benchmarks; - -import static javababushka.benchmarks.utils.Benchmarking.testClientSetGet; - +package babushka.benchmarks; + +import babushka.benchmarks.clients.babushka.JniNettyClient; +import babushka.benchmarks.clients.jedis.JedisClient; +import babushka.benchmarks.clients.jedis.JedisPseudoAsyncClient; +import babushka.benchmarks.clients.lettuce.LettuceAsyncClient; +import babushka.benchmarks.clients.lettuce.LettuceAsyncClusterClient; +import babushka.benchmarks.clients.lettuce.LettuceClient; +import babushka.benchmarks.utils.Benchmarking; import java.util.Arrays; import java.util.Optional; import java.util.stream.Stream; -import javababushka.benchmarks.clients.babushka.JniNettyClient; -import javababushka.benchmarks.clients.jedis.JedisClient; -import javababushka.benchmarks.clients.jedis.JedisPseudoAsyncClient; -import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; -import javababushka.benchmarks.clients.lettuce.LettuceAsyncClusterClient; -import javababushka.benchmarks.clients.lettuce.LettuceClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -36,7 +35,7 @@ public static void main(String[] args) { // generate the help statement if (line.hasOption("help")) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("javababushka", options); + formatter.printHelp("babushka", options); return; } @@ -50,27 +49,27 @@ public static void main(String[] args) { switch (client) { case JEDIS: // run testClientSetGet on JEDIS sync client - testClientSetGet(JedisClient::new, runConfiguration, false); + Benchmarking.testClientSetGet(JedisClient::new, runConfiguration, false); break; case JEDIS_ASYNC: // run testClientSetGet on JEDIS pseudo-async client - testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); + Benchmarking.testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); break; case LETTUCE: - testClientSetGet(LettuceClient::new, runConfiguration, false); + Benchmarking.testClientSetGet(LettuceClient::new, runConfiguration, false); break; case LETTUCE_ASYNC: if (runConfiguration.clusterModeEnabled) { - testClientSetGet(LettuceAsyncClusterClient::new, runConfiguration, true); + Benchmarking.testClientSetGet(LettuceAsyncClusterClient::new, runConfiguration, true); } else { - testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); + Benchmarking.testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); } break; case BABUSHKA: - testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false); + Benchmarking.testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false); break; case BABUSHKA_ASYNC: - testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true); + Benchmarking.testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true); break; } } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/AsyncClient.java similarity index 82% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/AsyncClient.java index deae2ceee5..bfab12a5d1 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/AsyncClient.java @@ -1,8 +1,8 @@ -package javababushka.benchmarks.clients; +package babushka.benchmarks.clients; +import babushka.benchmarks.utils.ConnectionSettings; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import javababushka.benchmarks.utils.ConnectionSettings; /** A Redis client with async capabilities */ public interface AsyncClient extends Client { @@ -13,7 +13,7 @@ public interface AsyncClient extends Client { Future asyncSet(String key, String value); - Future asyncGet(String key); + Future asyncGet(String key); default T waitForResult(Future future) { return waitForResult(future, DEFAULT_TIMEOUT_MILLISECOND); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/Client.java similarity index 67% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/Client.java index d95f31f25d..d89e7a64b3 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/Client.java @@ -1,6 +1,6 @@ -package javababushka.benchmarks.clients; +package babushka.benchmarks.clients; -import javababushka.benchmarks.utils.ConnectionSettings; +import babushka.benchmarks.utils.ConnectionSettings; /** A Redis client interface */ public interface Client { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/SyncClient.java similarity index 79% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/SyncClient.java index 603f91e936..c2e0563f76 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/SyncClient.java @@ -1,4 +1,4 @@ -package javababushka.benchmarks.clients; +package babushka.benchmarks.clients; /** A Redis client with sync capabilities */ public interface SyncClient extends Client { diff --git a/java/benchmarks/src/main/java/babushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/babushka/JniNettyClient.java new file mode 100644 index 0000000000..9b8ca19bf3 --- /dev/null +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/babushka/JniNettyClient.java @@ -0,0 +1,68 @@ +package babushka.benchmarks.clients.babushka; + +import babushka.api.Awaiter; +import babushka.api.Client; +import babushka.api.Commands; +import babushka.api.Connection; +import babushka.benchmarks.clients.AsyncClient; +import babushka.benchmarks.clients.SyncClient; +import babushka.benchmarks.utils.ConnectionSettings; +import java.util.concurrent.Future; + +public class JniNettyClient implements SyncClient, AsyncClient { + + private final Connection connection; + private final Commands asyncCommands; + + private String name = "JNI Netty"; + + public JniNettyClient(boolean async) { + name += async ? " async" : " sync"; + connection = Client.CreateConnection(); + asyncCommands = Client.GetAsyncCommands(connection); + } + + @Override + public String getName() { + return name; + } + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + waitForResult(asyncConnectToRedis(connectionSettings)); + } + + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + return connection.connectToRedis( + connectionSettings.host, + connectionSettings.port, + connectionSettings.useSsl, + connectionSettings.clusterMode); + } + + @Override + public Future asyncSet(String key, String value) { + return asyncCommands.set(key, value); + } + + @Override + public Future asyncGet(String key) { + return asyncCommands.get(key); + } + + @Override + public void set(String key, String value) { + Awaiter.await(asyncCommands.set(key, value)); + } + + @Override + public String get(String key) { + return Awaiter.await(asyncCommands.get(key)); + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisClient.java similarity index 89% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisClient.java index f088d5ac07..8ac66cea73 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisClient.java @@ -1,7 +1,7 @@ -package javababushka.benchmarks.clients.jedis; +package babushka.benchmarks.clients.jedis; -import javababushka.benchmarks.clients.SyncClient; -import javababushka.benchmarks.utils.ConnectionSettings; +import babushka.benchmarks.clients.SyncClient; +import babushka.benchmarks.utils.ConnectionSettings; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java similarity index 84% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java index 3970826a33..8770d1f181 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java @@ -1,9 +1,9 @@ -package javababushka.benchmarks.clients.jedis; +package babushka.benchmarks.clients.jedis; +import babushka.benchmarks.clients.AsyncClient; +import babushka.benchmarks.utils.ConnectionSettings; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import javababushka.benchmarks.clients.AsyncClient; -import javababushka.benchmarks.utils.ConnectionSettings; /** * A Jedis client with pseudo-async capabilities. Jedis doesn't provide async API diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClient.java similarity index 93% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClient.java index ded154b3c5..6322a648ce 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClient.java @@ -1,5 +1,7 @@ -package javababushka.benchmarks.clients.lettuce; +package babushka.benchmarks.clients.lettuce; +import babushka.benchmarks.clients.AsyncClient; +import babushka.benchmarks.utils.ConnectionSettings; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; @@ -7,8 +9,6 @@ import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.codec.StringCodec; import java.util.concurrent.Future; -import javababushka.benchmarks.clients.AsyncClient; -import javababushka.benchmarks.utils.ConnectionSettings; /** A Lettuce client with async capabilities see: https://lettuce.io/ */ public class LettuceAsyncClient implements AsyncClient { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java similarity index 93% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java index 6e5ed09ce3..5e385d0d89 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java @@ -1,14 +1,14 @@ /* * This Java source file was generated by the Gradle 'init' task. */ -package javababushka.benchmarks.clients.lettuce; +package babushka.benchmarks.clients.lettuce; +import babushka.benchmarks.utils.ConnectionSettings; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; -import javababushka.benchmarks.utils.ConnectionSettings; public class LettuceAsyncClusterClient extends LettuceAsyncClient { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceClient.java similarity index 88% rename from java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java rename to java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceClient.java index 87d7bc9d2e..b0440ec8df 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/clients/lettuce/LettuceClient.java @@ -1,10 +1,10 @@ -package javababushka.benchmarks.clients.lettuce; +package babushka.benchmarks.clients.lettuce; +import babushka.benchmarks.clients.SyncClient; +import babushka.benchmarks.utils.ConnectionSettings; import io.lettuce.core.RedisClient; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisStringCommands; -import javababushka.benchmarks.clients.SyncClient; -import javababushka.benchmarks.utils.ConnectionSettings; /** A Lettuce client with sync capabilities see: https://lettuce.io/ */ public class LettuceClient implements SyncClient { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/babushka/benchmarks/utils/Benchmarking.java similarity index 97% rename from java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java rename to java/benchmarks/src/main/java/babushka/benchmarks/utils/Benchmarking.java index 4d826c427b..14f3ddb7c9 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/utils/Benchmarking.java @@ -1,5 +1,9 @@ -package javababushka.benchmarks.utils; +package babushka.benchmarks.utils; +import babushka.benchmarks.BenchmarkingApp; +import babushka.benchmarks.clients.AsyncClient; +import babushka.benchmarks.clients.Client; +import babushka.benchmarks.clients.SyncClient; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -11,10 +15,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import javababushka.benchmarks.BenchmarkingApp; -import javababushka.benchmarks.clients.AsyncClient; -import javababushka.benchmarks.clients.Client; -import javababushka.benchmarks.clients.SyncClient; import org.apache.commons.lang3.tuple.Pair; /** Class to calculate latency on client-actions */ @@ -127,6 +127,7 @@ public static void printResults( System.out.printf("Iterations: %d%n", iterations); System.out.printf("TPS: %f%n", iterations / duration); int totalHits = 0; + for (Map.Entry entry : resultsMap.entrySet()) { ChosenAction action = entry.getKey(); LatencyResults results = entry.getValue(); @@ -140,7 +141,7 @@ public static void printResults( System.out.printf("Total hits: %d%n", results.totalHits); totalHits += results.totalHits; } - System.out.println("Total hits: " + totalHits); + System.out.printf("%nTotal hits: %d%n", totalHits); } public static void testClientSetGet( diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java b/java/benchmarks/src/main/java/babushka/benchmarks/utils/ChosenAction.java similarity index 64% rename from java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java rename to java/benchmarks/src/main/java/babushka/benchmarks/utils/ChosenAction.java index bd9f01fd90..42eceabd8d 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/utils/ChosenAction.java @@ -1,4 +1,4 @@ -package javababushka.benchmarks.utils; +package babushka.benchmarks.utils; public enum ChosenAction { GET_NON_EXISTING, diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java b/java/benchmarks/src/main/java/babushka/benchmarks/utils/ConnectionSettings.java similarity index 86% rename from java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java rename to java/benchmarks/src/main/java/babushka/benchmarks/utils/ConnectionSettings.java index 91c11c76a8..8645d36450 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/utils/ConnectionSettings.java @@ -1,4 +1,4 @@ -package javababushka.benchmarks.utils; +package babushka.benchmarks.utils; import lombok.AllArgsConstructor; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java b/java/benchmarks/src/main/java/babushka/benchmarks/utils/JsonWriter.java similarity index 99% rename from java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java rename to java/benchmarks/src/main/java/babushka/benchmarks/utils/JsonWriter.java index 24e658f56b..f809ff03b5 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/JsonWriter.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/utils/JsonWriter.java @@ -1,4 +1,4 @@ -package javababushka.benchmarks.utils; +package babushka.benchmarks.utils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java b/java/benchmarks/src/main/java/babushka/benchmarks/utils/LatencyResults.java similarity index 89% rename from java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java rename to java/benchmarks/src/main/java/babushka/benchmarks/utils/LatencyResults.java index 5d25d72a67..a0a3d0ddc9 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java +++ b/java/benchmarks/src/main/java/babushka/benchmarks/utils/LatencyResults.java @@ -1,4 +1,4 @@ -package javababushka.benchmarks.utils; +package babushka.benchmarks.utils; import lombok.AllArgsConstructor; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java deleted file mode 100644 index a4fc37f4d9..0000000000 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java +++ /dev/null @@ -1,69 +0,0 @@ -package javababushka.benchmarks.clients.babushka; - -import static response.ResponseOuterClass.Response; - -import java.util.concurrent.Future; -import javababushka.Client; -import javababushka.benchmarks.clients.AsyncClient; -import javababushka.benchmarks.clients.SyncClient; -import javababushka.benchmarks.utils.ConnectionSettings; - -public class JniNettyClient implements SyncClient, AsyncClient { - - private final Client testClient; - private String name = "JNI Netty"; - - public JniNettyClient(boolean async) { - name += async ? " async" : " sync"; - testClient = new Client(); - } - - @Override - public String getName() { - return name; - } - - @Override - public void closeConnection() { - testClient.closeConnection(); - } - - @Override - public void connectToRedis() { - connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); - } - - @Override - public void connectToRedis(ConnectionSettings connectionSettings) { - waitForResult(asyncConnectToRedis(connectionSettings)); - } - - @Override - public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { - return testClient.asyncConnectToRedis( - connectionSettings.host, - connectionSettings.port, - connectionSettings.useSsl, - connectionSettings.clusterMode); - } - - @Override - public Future asyncSet(String key, String value) { - return testClient.asyncSet(key, value); - } - - @Override - public Future asyncGet(String key) { - return testClient.asyncGet(key); - } - - @Override - public void set(String key, String value) { - testClient.set(key, value); - } - - @Override - public String get(String key) { - return testClient.get(key); - } -} diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java b/java/benchmarks/src/test/java/babushka/benchmarks/jedis/JedisClientIT.java similarity index 86% rename from java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java rename to java/benchmarks/src/test/java/babushka/benchmarks/jedis/JedisClientIT.java index 9a6dbbe93e..5890b8b0b3 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java +++ b/java/benchmarks/src/test/java/babushka/benchmarks/jedis/JedisClientIT.java @@ -1,17 +1,14 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.jedis; +package babushka.benchmarks.jedis; import static org.junit.jupiter.api.Assertions.assertTrue; +import babushka.benchmarks.clients.jedis.JedisClient; +import babushka.benchmarks.utils.Benchmarking; +import babushka.benchmarks.utils.ChosenAction; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import javababushka.benchmarks.clients.jedis.JedisClient; -import javababushka.benchmarks.utils.Benchmarking; -import javababushka.benchmarks.utils.ChosenAction; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java b/java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceAsyncClientIT.java similarity index 91% rename from java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java rename to java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceAsyncClientIT.java index 92f81df2da..a07365482d 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java +++ b/java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceAsyncClientIT.java @@ -1,13 +1,10 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.lettuce; +package babushka.benchmarks.lettuce; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import babushka.benchmarks.clients.lettuce.LettuceAsyncClient; import io.lettuce.core.RedisFuture; -import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java b/java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceClientIT.java similarity index 77% rename from java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java rename to java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceClientIT.java index e9670e8dbb..f5073a4d96 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java +++ b/java/benchmarks/src/test/java/babushka/benchmarks/lettuce/LettuceClientIT.java @@ -1,12 +1,9 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.lettuce; +package babushka.benchmarks.lettuce; +import babushka.benchmarks.clients.lettuce.LettuceClient; +import babushka.benchmarks.utils.Benchmarking; +import babushka.benchmarks.utils.ChosenAction; import java.util.HashMap; -import javababushka.benchmarks.clients.lettuce.LettuceClient; -import javababushka.benchmarks.utils.Benchmarking; -import javababushka.benchmarks.utils.ChosenAction; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/java/build.gradle b/java/build.gradle index 340f2c98cb..5ecae27fae 100644 --- a/java/build.gradle +++ b/java/build.gradle @@ -18,17 +18,6 @@ subprojects { plugins.withId('java') { sourceCompatibility = targetCompatibility = "11" } - tasks.withType(Test) { - useJUnitPlatform() - - testLogging { - exceptionFormat "full" - events "started", "skipped", "passed", "failed" - showStandardStreams true - } - // TODO: add jacoco with code coverage - // finalizedBy jacocoTestReport, jacocoTestCoverageVerification - } } dependencies { diff --git a/java/client/build.gradle b/java/client/build.gradle index 8dd4e5b7c5..c05b7f5620 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -9,24 +9,36 @@ repositories { } dependencies { + // This dependency is used internally, and not exposed to consumers on their own compile classpath. + implementation 'com.google.guava:guava:32.1.1-jre' implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.24.3' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.13.0' implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' // https://github.com/netty/netty/wiki/Native-transports - // Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42 + // At the moment, Windows is not supported implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64' implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-x86_64' implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-aarch_64' + + //lombok + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + testCompileOnly 'org.projectlombok:lombok:1.18.30' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.30' + + // junit + testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4' } tasks.register('protobuf', Exec) { doFirst { - project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) + project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/babushka/models/protobuf').toString()) } commandLine 'protoc', '-Iprotobuf=babushka-core/src/protobuf/', - '--java_out=java/client/src/main/java/javababushka/generated', + '--java_out=java/client/src/main/java/babushka/models/protobuf', 'babushka-core/src/protobuf/connection_request.proto', 'babushka-core/src/protobuf/redis_request.proto', 'babushka-core/src/protobuf/response.proto' @@ -35,7 +47,7 @@ tasks.register('protobuf', Exec) { tasks.register('cleanProtobuf') { doFirst { - project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) + project.delete(Paths.get(project.projectDir.path, 'src/main/java/babushka/models/protobuf').toString()) } } @@ -60,9 +72,11 @@ tasks.register('buildAll') { } compileJava.dependsOn('protobuf') + clean.dependsOn('cleanProtobuf') tasks.withType(Test) { + useJUnitPlatform() testLogging { exceptionFormat "full" events "started", "skipped", "passed", "failed" diff --git a/java/client/src/main/java/babushka/api/Awaiter.java b/java/client/src/main/java/babushka/api/Awaiter.java new file mode 100644 index 0000000000..a2375d3f9a --- /dev/null +++ b/java/client/src/main/java/babushka/api/Awaiter.java @@ -0,0 +1,28 @@ +package babushka.api; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Awaiter { + private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000; + + /** Get the future result with default timeout. */ + public static T await(CompletableFuture future) { + return await(future, DEFAULT_TIMEOUT_MILLISECONDS); + } + + /** Get the future result with given timeout in ms. */ + public static T await(CompletableFuture future, long timeout) { + try { + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + // TODO: handle exceptions: + // InterruptedException: should shutdown the client service + // TimeoutException: should be propagated with an error message thrown + // ExecutionException: throw runtime exception + throw new RuntimeException(e); + } + } +} diff --git a/java/client/src/main/java/babushka/api/Client.java b/java/client/src/main/java/babushka/api/Client.java new file mode 100644 index 0000000000..1f724f8227 --- /dev/null +++ b/java/client/src/main/java/babushka/api/Client.java @@ -0,0 +1,45 @@ +package babushka.api; + +import babushka.connectors.SocketConnection; +import babushka.ffi.resolvers.BabushkaCoreNativeDefinitions; +import babushka.managers.CallbackManager; +import babushka.managers.CommandManager; +import babushka.managers.ConnectionManager; + +/** Factory class for creating Babushka-Redis client connections */ +public class Client { + + static { + SocketConnection.setSocketPath(BabushkaCoreNativeDefinitions.getSocket()); + } + + public static Connection CreateConnection() { + CallbackManager callbackManager = new CallbackManager(); + + SocketConnection socketConnection = SocketConnection.getInstance(); + var channelHandler = socketConnection.openNewChannel(callbackManager); + + CommandManager commandManager = new CommandManager(channelHandler); + ConnectionManager connectionManager = new ConnectionManager(channelHandler, commandManager); + + return new Connection(connectionManager); + } + + public static Connection ConnectToRedis(String host, int port) { + CallbackManager callbackManager = new CallbackManager(); + + SocketConnection socketConnection = SocketConnection.getInstance(); + var channelHandler = socketConnection.openNewChannel(callbackManager); + + CommandManager commandManager = new CommandManager(channelHandler); + ConnectionManager connectionManager = new ConnectionManager(channelHandler, commandManager); + + Awaiter.await(connectionManager.connectToRedis(host, port, false, false)); + + return new Connection(connectionManager); + } + + public static Commands GetAsyncCommands(Connection connection) { + return new Commands(connection.getConnectionManager().getCommandManager()); + } +} diff --git a/java/client/src/main/java/babushka/api/Commands.java b/java/client/src/main/java/babushka/api/Commands.java new file mode 100644 index 0000000000..5fafed0523 --- /dev/null +++ b/java/client/src/main/java/babushka/api/Commands.java @@ -0,0 +1,34 @@ +package babushka.api; + +import babushka.managers.CommandManager; +import java.util.concurrent.CompletableFuture; + +public class Commands { + + private final CommandManager commandManager; + + public Commands(CommandManager commandManager) { + this.commandManager = commandManager; + } + + /** + * Async (non-blocking) set. See sync option in {@link #set}.
+ * See REDIS docs for SET. + * + * @param key The key name + * @param value The value to set + */ + public CompletableFuture set(String key, String value) { + return commandManager.set(key, value); + } + + /** + * Async (non-blocking) get. See sync option in {@link #get}.
+ * See REDIS docs for GET. + * + * @param key The key name + */ + public CompletableFuture get(String key) { + return commandManager.get(key); + } +} diff --git a/java/client/src/main/java/babushka/api/Connection.java b/java/client/src/main/java/babushka/api/Connection.java new file mode 100644 index 0000000000..5c4e44bf5f --- /dev/null +++ b/java/client/src/main/java/babushka/api/Connection.java @@ -0,0 +1,27 @@ +package babushka.api; + +import babushka.managers.ConnectionManager; +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class Connection { + + @Getter private final ConnectionManager connectionManager; + + /** + * Async (non-blocking) connect to REDIS. See sync option in {@link #connectToRedis}. + * + * @param host Server address + * @param port Server port + * @param useSsl true if communication with the server or cluster should use Transport Level + * Security + * @param clusterMode true if REDIS instance runs in the cluster mode + */ + // TODO support configuration object which holds more parameters (e.g. multiple addresses, etc) + public CompletableFuture connectToRedis( + String host, int port, boolean useSsl, boolean clusterMode) { + return connectionManager.connectToRedis(host, port, useSsl, clusterMode); + } +} diff --git a/java/client/src/main/java/babushka/connectors/SocketConnection.java b/java/client/src/main/java/babushka/connectors/SocketConnection.java new file mode 100644 index 0000000000..04878fa5ea --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/SocketConnection.java @@ -0,0 +1,135 @@ +package babushka.connectors; + +import babushka.connectors.handlers.ChannelBuilder; +import babushka.connectors.handlers.ChannelHandler; +import babushka.managers.CallbackManager; +import babushka.managers.ConnectionManager; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.util.concurrent.DefaultThreadFactory; + +public class SocketConnection { + + /** Thread pool supplied to Netty to perform all async IO. */ + private EventLoopGroup group; + + /** The singleton instance. */ + private static SocketConnection INSTANCE = null; + + private static String socketPath; + + public static void setSocketPath(String socketPath) { + if (SocketConnection.socketPath == null) { + SocketConnection.socketPath = socketPath; + return; + } + throw new RuntimeException("socket path can only be declared once"); + } + + /** + * Creates (if not yet created) and returns the singleton instance of the {@link + * ConnectionManager}. + * + * @return a {@link ConnectionManager} instance. + */ + public static synchronized SocketConnection getInstance() { + if (INSTANCE == null) { + assert socketPath != null : "socket path must be defined"; + INSTANCE = new SocketConnection(); + } + return INSTANCE; + } + + // At the moment, Windows is not supported + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private static final boolean isMacOs = isKQueueAvailable(); + + // TODO support IO-Uring and NIO + /** + * Detect platform to identify which native implementation to use for UDS interaction. Currently + * supported platforms are: Linux and macOS.
+ * Subject to change in future to support more platforms and implementations. + */ + private static boolean isKQueueAvailable() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + /** Constructor for the single instance. */ + private SocketConnection() { + try { + int cpuCount = Runtime.getRuntime().availableProcessors(); + group = + isMacOs + ? new KQueueEventLoopGroup( + cpuCount, new DefaultThreadFactory("SocketConnection-kqueue-elg", true)) + : new EpollEventLoopGroup( + cpuCount, new DefaultThreadFactory("SocketConnection-epoll-elg", true)); + } catch (Exception e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + } + } + + /** Open a new channel for a new client. */ + public ChannelHandler openNewChannel(CallbackManager callbackManager) { + try { + Channel channel = + new Bootstrap() + .group(group) + .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) + .handler(new ChannelBuilder(callbackManager)) + .connect(new DomainSocketAddress(socketPath)) + .sync() + .channel(); + return new ChannelHandler(channel, callbackManager); + } catch (InterruptedException e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + throw new RuntimeException(e); + } + } + + /** + * Closes the UDS connection and frees corresponding resources. A consecutive call to {@link + * #getInstance()} will create a new connection with new resource pool. + */ + public void close() { + group.shutdownGracefully(); + INSTANCE = null; + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources by calling {@link #close()}. It is recommended to use a class instead of lambda to + * ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + private static class ShutdownHook implements Runnable { + @Override + public void run() { + if (INSTANCE != null) { + INSTANCE.close(); + INSTANCE = null; + } + } + } + + static { + Runtime.getRuntime() + .addShutdownHook(new Thread(new ShutdownHook(), "SocketConnection-shutdown-hook")); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java new file mode 100644 index 0000000000..4819254499 --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelBuilder.java @@ -0,0 +1,26 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.unix.UnixChannel; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** Builder for the channel used by {@link babushka.connectors.SocketConnection}. */ +@RequiredArgsConstructor +public class ChannelBuilder extends ChannelInitializer { + + private final CallbackManager callbackManager; + + @Override + public void initChannel(@NonNull UnixChannel ch) { + ch.pipeline() + // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html + .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) + .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ReadHandler(callbackManager)) + .addLast(new WriteHandler()); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java new file mode 100644 index 0000000000..06a322baad --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ChannelHandler.java @@ -0,0 +1,44 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import io.netty.channel.Channel; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import redis_request.RedisRequestOuterClass.RedisRequest; +import response.ResponseOuterClass.Response; + +/** + * Class responsible for manipulations with Netty's {@link Channel}.
+ * Uses a {@link CallbackManager} to record callbacks of every request sent. + */ +@RequiredArgsConstructor +public class ChannelHandler { + private final Channel channel; + private final CallbackManager callbackManager; + + /** Write a protobuf message to the socket. */ + public CompletableFuture write(RedisRequest.Builder request, boolean flush) { + var commandId = callbackManager.registerRequest(); + request.setCallbackIdx(commandId.getKey()); + + if (flush) { + channel.writeAndFlush(request.build().toByteArray()); + } else { + channel.write(request.build().toByteArray()); + } + return commandId.getValue(); + } + + /** Write a protobuf message to the socket. */ + public CompletableFuture connect(ConnectionRequest request) { + channel.writeAndFlush(request.toByteArray()); + return callbackManager.getConnectionPromise(); + } + + /** Closes the UDS connection and frees corresponding resources. */ + public void close() { + channel.close(); + callbackManager.shutdownGracefully(); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java new file mode 100644 index 0000000000..05cbbc8cfe --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/ReadHandler.java @@ -0,0 +1,45 @@ +package babushka.connectors.handlers; + +import babushka.managers.CallbackManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass; + +/** Handler for inbound traffic though UDS. Used by Netty. */ +@RequiredArgsConstructor +public class ReadHandler extends ChannelInboundHandlerAdapter { + + private final CallbackManager callbackManager; + + /** + * Handles responses from babushka core: + * + *
    + *
  1. Copy to a buffer; + *
  2. Parse protobuf packet; + *
  3. Find and resolve a corresponding future; + *
+ */ + @Override + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) + throws Exception { + var buf = (ByteBuf) msg; + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + // TODO surround parsing with try-catch, set error to future if parsing failed. + var response = ResponseOuterClass.Response.parseFrom(bytes); + callbackManager.completeRequest(response); + buf.release(); + } + + /** Handles uncaught exceptions from {@link #channelRead(ChannelHandlerContext, Object)}. */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); + cause.printStackTrace(System.err); + super.exceptionCaught(ctx, cause); + } +} diff --git a/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java b/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java new file mode 100644 index 0000000000..7e36f49d9b --- /dev/null +++ b/java/client/src/main/java/babushka/connectors/handlers/WriteHandler.java @@ -0,0 +1,23 @@ +package babushka.connectors.handlers; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +/** Handler for outbound traffic though UDS. Used by Netty. */ +public class WriteHandler extends ChannelOutboundHandlerAdapter { + /** + * Converts objects submitted to {@link Channel#write(Object)} and {@link + * Channel#writeAndFlush(Object)} to a {@link ByteBuf}. + */ + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + var bytes = (byte[]) msg; + + super.write(ctx, Unpooled.copiedBuffer(bytes), promise); + } +} diff --git a/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java new file mode 100644 index 0000000000..6d4ec45121 --- /dev/null +++ b/java/client/src/main/java/babushka/ffi/resolvers/BabushkaCoreNativeDefinitions.java @@ -0,0 +1,25 @@ +package babushka.ffi.resolvers; + +public class BabushkaCoreNativeDefinitions { + public static native String startSocketListenerExternal() throws Exception; + + public static native Object valueFromPointer(long pointer); + + static { + System.loadLibrary("javababushka"); + } + + /** + * Make an FFI call to obtain the socket path. + * + * @return A UDS path. + */ + public static String getSocket() { + try { + return startSocketListenerExternal(); + } catch (Exception | UnsatisfiedLinkError e) { + System.err.printf("Failed to create a UDS connection: %s%n%n", e); + throw new RuntimeException(e); + } + } +} diff --git a/java/client/src/main/java/babushka/managers/CallbackManager.java b/java/client/src/main/java/babushka/managers/CallbackManager.java new file mode 100644 index 0000000000..099db73c86 --- /dev/null +++ b/java/client/src/main/java/babushka/managers/CallbackManager.java @@ -0,0 +1,64 @@ +package babushka.managers; + +import babushka.connectors.handlers.ReadHandler; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; +import org.apache.commons.lang3.tuple.Pair; +import response.ResponseOuterClass.Response; + +/** Holder for resources owned by {@link CommandManager} and used by {@link ReadHandler}. */ +public class CallbackManager { + /** Unique request ID (callback ID). Thread-safe. */ + private final AtomicInteger requestId = new AtomicInteger(0); + + /** + * Storage of Futures to handle responses. Map key is callback id, which starts from 1.
+ * Each future is a promise for every submitted by user request. + */ + private final Map> responses = new ConcurrentHashMap<>(); + + /** + * Storage for connection request similar to {@link #responses}. Unfortunately, connection + * requests can't be stored in the same storage, because callback ID = 0 is hardcoded for + * connection requests. + */ + @Getter private final CompletableFuture connectionPromise = new CompletableFuture<>(); + + /** + * Register a new request to be sent. Once response received, the given future completes with it. + * + * @return A pair of unique callback ID which should set into request and a client promise for + * response. + */ + public Pair> registerRequest() { + int callbackId = requestId.incrementAndGet(); + var future = new CompletableFuture(); + responses.put(callbackId, future); + return Pair.of(callbackId, future); + } + + /** + * Complete the corresponding client promise and free resources. + * + * @param response A response received + */ + public void completeRequest(Response response) { + int callbackId = response.getCallbackIdx(); + if (callbackId == 0) { + connectionPromise.completeAsync(() -> response); + } else { + responses.get(callbackId).completeAsync(() -> response); + responses.remove(callbackId); + } + } + + public void shutdownGracefully() { + connectionPromise.completeExceptionally(new InterruptedException()); + responses.forEach( + (callbackId, future) -> future.completeExceptionally(new InterruptedException())); + responses.clear(); + } +} diff --git a/java/client/src/main/java/babushka/managers/CommandManager.java b/java/client/src/main/java/babushka/managers/CommandManager.java new file mode 100644 index 0000000000..69a0713851 --- /dev/null +++ b/java/client/src/main/java/babushka/managers/CommandManager.java @@ -0,0 +1,38 @@ +package babushka.managers; + +import babushka.connectors.handlers.ChannelHandler; +import babushka.models.RequestBuilder; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import redis_request.RedisRequestOuterClass.RequestType; + +@RequiredArgsConstructor +public class CommandManager { + + private final ChannelHandler channel; + + public CompletableFuture get(String key) { + return submitNewCommand(RequestType.GetString, List.of(key)); + } + + public CompletableFuture set(String key, String value) { + return submitNewCommand(RequestType.SetString, List.of(key, value)); + } + + /** + * @param command + * @param args + * @return + */ + @VisibleForTesting + private CompletableFuture submitNewCommand(RequestType command, List args) { + // TODO this explicitly uses ForkJoin thread pool. May be we should use another one. + return CompletableFuture.supplyAsync( + () -> channel.write(RequestBuilder.prepareRequest(command, args), true)) + // TODO: is there a better way to execute this? + .thenComposeAsync(f -> f) + .thenApplyAsync(RequestBuilder::resolveRedisResponseToString); + } +} diff --git a/java/client/src/main/java/babushka/managers/ConnectionManager.java b/java/client/src/main/java/babushka/managers/ConnectionManager.java new file mode 100644 index 0000000000..e0cf09ee45 --- /dev/null +++ b/java/client/src/main/java/babushka/managers/ConnectionManager.java @@ -0,0 +1,68 @@ +package babushka.managers; + +import babushka.connectors.SocketConnection; +import babushka.connectors.handlers.ChannelHandler; +import babushka.models.RequestBuilder; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import response.ResponseOuterClass.ConstantResponse; + +/** + * A UDS connection manager. This class is responsible for: + * + *
    + *
  • opening a connection (channel) though the UDS; + *
  • allocating the corresponding resources, e.g. thread pools (see also {@link + * SocketConnection}); + *
  • handling connection requests; + *
  • handling REDIS requests; + *
  • closing connection; + *
+ */ +@RequiredArgsConstructor +public class ConnectionManager { + + private final ChannelHandler channel; + @Getter private final CommandManager commandManager; + + private final AtomicBoolean isConnected = new AtomicBoolean(false); + + /** + * Connect to Redis using a ProtoBuf connection request + * + * @param host Server address + * @param port Server port + * @param useSsl true if communication with the server or cluster should use Transport Level + * Security + * @param clusterMode true if REDIS instance runs in the cluster mode + */ + public CompletableFuture connectToRedis( + String host, int port, boolean useSsl, boolean clusterMode) { + ConnectionRequest request = + RequestBuilder.createConnectionRequest(host, port, useSsl, clusterMode); + return channel + .connect(request) + .thenApplyAsync( + response -> + isConnected.compareAndSet( + false, response.getConstantResponse() == ConstantResponse.OK)); + } + + /** + * Close socket connection and drop all channels.
+ * TODO: provide feedback that the connection was properly closed + */ + public CompletableFuture closeConnection() { + isConnected.setPlain(false); + channel.close(); + return new CompletableFuture(); + } + + /** Check that connection established. This doesn't validate whether it is alive. */ + public boolean isConnected() { + return isConnected.get(); + } +} diff --git a/java/client/src/main/java/babushka/models/RedisResponse.java b/java/client/src/main/java/babushka/models/RedisResponse.java new file mode 100644 index 0000000000..def93ec94c --- /dev/null +++ b/java/client/src/main/java/babushka/models/RedisResponse.java @@ -0,0 +1,11 @@ +package babushka.models; + +/** + * TODO: Placeholder class to collect Future & Responses for the client users. TODO: Do we need a + * separate RedisClusterResponse? TODO: Create a RedisPipelineFuture for pipeline requests TODO: + * Create a RedisTransactionFuture for transaction requests + * + *

Redis client wrapper-class to handle Redis client responses. Handles async Future + * objects, or sync Response objects. + */ +public class RedisResponse {} diff --git a/java/client/src/main/java/babushka/models/RequestBuilder.java b/java/client/src/main/java/babushka/models/RequestBuilder.java new file mode 100644 index 0000000000..7f77c4198a --- /dev/null +++ b/java/client/src/main/java/babushka/models/RequestBuilder.java @@ -0,0 +1,74 @@ +package babushka.models; + +import babushka.ffi.resolvers.BabushkaCoreNativeDefinitions; +import babushka.managers.CallbackManager; +import babushka.managers.ConnectionManager; +import connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import connection_request.ConnectionRequestOuterClass.NodeAddress; +import connection_request.ConnectionRequestOuterClass.ReadFrom; +import connection_request.ConnectionRequestOuterClass.TlsMode; +import java.util.List; +import redis_request.RedisRequestOuterClass.Command; +import redis_request.RedisRequestOuterClass.Command.ArgsArray; +import redis_request.RedisRequestOuterClass.RedisRequest; +import redis_request.RedisRequestOuterClass.RequestType; +import redis_request.RedisRequestOuterClass.Routes; +import redis_request.RedisRequestOuterClass.SimpleRoutes; +import response.ResponseOuterClass.Response; + +public class RequestBuilder { + + /** + * Build a protobuf connection request.
+ * Used by {@link ConnectionManager#connectToRedis}. + */ + // TODO support more parameters and/or configuration object + public static ConnectionRequest createConnectionRequest( + String host, int port, boolean useSsl, boolean clusterMode) { + return ConnectionRequest.newBuilder() + .addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build()) + .setTlsMode(useSsl ? TlsMode.SecureTls : TlsMode.NoTls) + .setClusterModeEnabled(clusterMode) + .setReadFrom(ReadFrom.Primary) + .setDatabaseId(0) + .build(); + } + + /** + * Build a protobuf command/transaction request draft. + * + * @return An uncompleted request. {@link CallbackManager} is responsible to complete it by adding + * a callback id. + */ + public static RedisRequest.Builder prepareRequest(RequestType command, List args) { + var commandArgs = ArgsArray.newBuilder(); + for (var arg : args) { + commandArgs.addArgs(arg); + } + + return RedisRequest.newBuilder() + .setSingleCommand( // set command + Command.newBuilder() + .setRequestType(command) // set command name + .setArgsArray(commandArgs.build()) // set arguments + .build()) + .setRoute( // set route + Routes.newBuilder() + .setSimpleRoutes(SimpleRoutes.AllNodes) // set route type + .build()); + } + + /** + * Returns a String from the redis response if a resp2 response exists, or Ok. Otherwise, returns + * null + * + * @param response Redis Response + * @return String or null + */ + public static String resolveRedisResponseToString(Response response) { + if (response.getRespPointer() != 0) { + return BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()).toString(); + } + return null; + } +} diff --git a/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java deleted file mode 100644 index 3f26ebef91..0000000000 --- a/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java +++ /dev/null @@ -1,11 +0,0 @@ -package javababushka; - -public class BabushkaCoreNativeDefinitions { - public static native String startSocketListenerExternal() throws Exception; - - public static native Object valueFromPointer(long pointer); - - static { - System.loadLibrary("javababushka"); - } -} diff --git a/java/client/src/main/java/javababushka/Client.java b/java/client/src/main/java/javababushka/Client.java deleted file mode 100644 index cf14739ed9..0000000000 --- a/java/client/src/main/java/javababushka/Client.java +++ /dev/null @@ -1,404 +0,0 @@ -package javababushka; - -import static connection_request.ConnectionRequestOuterClass.AuthenticationInfo; -import static connection_request.ConnectionRequestOuterClass.ConnectionRequest; -import static connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy; -import static connection_request.ConnectionRequestOuterClass.NodeAddress; -import static connection_request.ConnectionRequestOuterClass.ReadFrom; -import static connection_request.ConnectionRequestOuterClass.TlsMode; -import static redis_request.RedisRequestOuterClass.Command; -import static redis_request.RedisRequestOuterClass.Command.ArgsArray; -import static redis_request.RedisRequestOuterClass.RedisRequest; -import static redis_request.RedisRequestOuterClass.RequestType; -import static redis_request.RedisRequestOuterClass.Routes; -import static redis_request.RedisRequestOuterClass.SimpleRoutes; -import static response.ResponseOuterClass.Response; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollDomainSocketChannel; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.channel.unix.DomainSocketAddress; -import io.netty.channel.unix.UnixChannel; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.util.internal.logging.InternalLoggerFactory; -import io.netty.util.internal.logging.Slf4JLoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.tuple.Pair; - -public class Client implements AutoCloseable { - - private static final int REQUEST_TIMEOUT_MILLISECONDS = 250; - private static final int CLIENT_CREATION_TIMEOUT_MILLISECONDS = 250; - private static final int HIGH_WRITE_WATERMARK = 4096; - private static final int LOW_WRITE_WATERMARK = 1024; - private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000; - public static boolean ALWAYS_FLUSH_ON_WRITE = true; - - // https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html - // Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE - public static int AUTO_FLUSH_THRESHOLD_BYTES = 512; // 1024; - private final AtomicInteger nonFlushedBytesCounter = new AtomicInteger(0); - - // Flush every N writes if !ALWAYS_FLUSH_ON_WRITE - public static int AUTO_FLUSH_THRESHOLD_WRITES = 10; - private final AtomicInteger nonFlushedWritesCounter = new AtomicInteger(0); - - // If !ALWAYS_FLUSH_ON_WRITE and a command has no response in N millis, flush (probably it wasn't - // send) - public static int AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS = 100; - // If !ALWAYS_FLUSH_ON_WRITE flush on timer (like a cron) - public static int AUTO_FLUSH_TIMER_MILLIS = 200; - - public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000; - - // Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection - // request always). - // Is it not a concurrent nor sync collection, but it is synced on adding. No removes. - private final List> responses = new ArrayList<>(); - // Unique offset for every client to avoid having multiple commands with the same id at a time. - // For debugging replace with: new Random().nextInt(1000) * 1000 - private final int callbackOffset = new Random().nextInt(); - - // TODO move to a [static] constructor. - private final String unixSocket = getSocket(); - - private static String getSocket() { - try { - return BabushkaCoreNativeDefinitions.startSocketListenerExternal(); - } catch (Exception | UnsatisfiedLinkError e) { - System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e); - throw new RuntimeException(e); - } - } - - private Channel channel = null; - private EventLoopGroup group = null; - - // We support MacOS and Linux only, because Babushka does not support Windows, because tokio does - // not support it. - // Probably we should use NIO (NioEventLoopGroup) for Windows. - private static final boolean isMacOs = isMacOs(); - - private static boolean isMacOs() { - try { - Class.forName("io.netty.channel.kqueue.KQueue"); - return KQueue.isAvailable(); - } catch (ClassNotFoundException e) { - return false; - } - } - - static { - // TODO fix: netty still doesn't use slf4j nor log4j - InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); - } - - private void createChannel() { - // TODO maybe move to constructor or to static? - try { - channel = - new Bootstrap() - .option( - ChannelOption.WRITE_BUFFER_WATER_MARK, - new WriteBufferWaterMark(LOW_WRITE_WATERMARK, HIGH_WRITE_WATERMARK)) - .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) - .group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup()) - .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - public void initChannel(UnixChannel ch) throws Exception { - ch.pipeline() - .addLast("logger", new LoggingHandler(LogLevel.DEBUG)) - // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html - .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) - .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) - .addLast( - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - // System.out.printf("=== channelRead %s %s %n", ctx, msg); - var buf = (ByteBuf) msg; - var bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - // TODO surround parsing with try-catch, set error to future if - // parsing failed. - var response = Response.parseFrom(bytes); - int callbackId = response.getCallbackIdx(); - if (callbackId != 0) { - // connection request has hardcoded callback id = 0 - // https://github.com/aws/babushka/issues/600 - callbackId -= callbackOffset; - } - // System.out.printf("== Received response with callback %d%n", - // response.getCallbackIdx()); - responses.get(callbackId).complete(response); - responses.set(callbackId, null); - super.channelRead(ctx, bytes); - } - - @Override - public void exceptionCaught( - ChannelHandlerContext ctx, Throwable cause) throws Exception { - System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); - cause.printStackTrace(); - super.exceptionCaught(ctx, cause); - } - }) - .addLast( - new ChannelOutboundHandlerAdapter() { - @Override - public void write( - ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - // System.out.printf("=== write %s %s %s %n", ctx, msg, promise); - var bytes = (byte[]) msg; - - boolean needFlush = false; - if (!ALWAYS_FLUSH_ON_WRITE) { - synchronized (nonFlushedBytesCounter) { - if (nonFlushedBytesCounter.addAndGet(bytes.length) - >= AUTO_FLUSH_THRESHOLD_BYTES - || nonFlushedWritesCounter.incrementAndGet() - >= AUTO_FLUSH_THRESHOLD_WRITES) { - nonFlushedBytesCounter.set(0); - nonFlushedWritesCounter.set(0); - needFlush = true; - } - } - } - super.write(ctx, Unpooled.copiedBuffer(bytes), promise); - if (needFlush) { - // flush outside the sync block - flush(ctx); - // System.out.println("-- auto flush - buffer"); - } - } - }); - } - }) - .connect(new DomainSocketAddress(unixSocket)) - .sync() - .channel(); - - } catch (Exception e) { - System.err.printf( - "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); - e.printStackTrace(System.err); - } - - if (!ALWAYS_FLUSH_ON_WRITE) { - new Timer(true) - .scheduleAtFixedRate( - new TimerTask() { - @Override - public void run() { - channel.flush(); - nonFlushedBytesCounter.set(0); - nonFlushedWritesCounter.set(0); - } - }, - 0, - AUTO_FLUSH_TIMER_MILLIS); - } - } - - public void closeConnection() { - - // flush and close the channel - channel.flush(); - channel.close(); - // TODO: check that the channel is closed - - // shutdown the event loop group gracefully by waiting for the remaining response - // and then shutting down the connection - try { - long waitStarted = System.nanoTime(); - long waitUntil = - waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos - for (var responseFuture : responses) { - if (responseFuture == null || responseFuture.isDone()) { - continue; - } - try { - responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); - } catch (InterruptedException | ExecutionException ignored) { - // TODO: print warning - } catch (TimeoutException e) { - responseFuture.cancel(true); - // TODO: cancel the rest - break; - } - } - } finally { - var shuttingDown = group.shutdownGracefully(); - try { - shuttingDown.get(); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } - assert group.isShutdown() : "Redis connection did not shutdown gracefully"; - } - } - - public void set(String key, String value) { - waitForResult(asyncSet(key, value)); - // TODO parse response and rethrow an exception if there is an error - } - - public String get(String key) { - return waitForResult(asyncGet(key)); - // TODO support non-strings - } - - private synchronized Pair> getNextCallback() { - var future = new CompletableFuture(); - responses.add(future); - return Pair.of(responses.size() - 1, future); - } - - @Override - public void close() throws Exception { - closeConnection(); - } - - public Future asyncConnectToRedis( - String host, int port, boolean useSsl, boolean clusterMode) { - createChannel(); - - var request = - ConnectionRequest.newBuilder() - .addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build()) - .setTlsMode( - useSsl // TODO: secure or insecure TLS? - ? TlsMode.SecureTls - : TlsMode.NoTls) - .setClusterModeEnabled(clusterMode) - .setRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS) - .setReadFrom(ReadFrom.Primary) - .setConnectionRetryStrategy( - ConnectionRetryStrategy.newBuilder() - .setNumberOfRetries(1) - .setFactor(1) - .setExponentBase(1) - .build()) - .setAuthenticationInfo( - AuthenticationInfo.newBuilder().setPassword("").setUsername("default").build()) - .setDatabaseId(0) - .build(); - - var future = new CompletableFuture(); - responses.add(future); - channel.writeAndFlush(request.toByteArray()); - return future; - } - - private CompletableFuture submitNewCommand(RequestType command, List args) { - var commandId = getNextCallback(); - // System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), commandId); - - return CompletableFuture.supplyAsync( - () -> { - var commandArgs = ArgsArray.newBuilder(); - for (var arg : args) { - commandArgs.addArgs(arg); - } - - RedisRequest request = - RedisRequest.newBuilder() - .setCallbackIdx(commandId.getKey() + callbackOffset) - .setSingleCommand( - Command.newBuilder() - .setRequestType(command) - .setArgsArray(commandArgs.build()) - .build()) - .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build()) - .build(); - if (ALWAYS_FLUSH_ON_WRITE) { - channel.writeAndFlush(request.toByteArray()); - return commandId.getRight(); - } - channel.write(request.toByteArray()); - return autoFlushFutureWrapper(commandId.getRight()); - }) - .thenCompose(f -> f); - } - - private CompletableFuture autoFlushFutureWrapper(Future future) { - return CompletableFuture.supplyAsync( - () -> { - try { - return future.get(AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } catch (TimeoutException e) { - // System.out.println("-- auto flush - timeout"); - channel.flush(); - nonFlushedBytesCounter.set(0); - nonFlushedWritesCounter.set(0); - } - try { - return future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } - - public Future asyncSet(String key, String value) { - // System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); - return submitNewCommand(RequestType.SetString, List.of(key, value)); - } - - public Future asyncGet(String key) { - // System.out.printf("== get(%s), callback %d%n", key, callbackId); - return submitNewCommand(RequestType.GetString, List.of(key)) - .thenApply( - response -> - response.getRespPointer() != 0 - ? BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()) - .toString() - : null); - } - - public T waitForResult(Future future) { - return waitForResult(future, DEFAULT_TIMEOUT_MILLISECONDS); - } - - public T waitForResult(Future future, long timeout) { - try { - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (Exception ignored) { - return null; - } - } -} diff --git a/java/client/src/test/java/babushka/api/CommandsTest.java b/java/client/src/test/java/babushka/api/CommandsTest.java new file mode 100644 index 0000000000..24f93defc8 --- /dev/null +++ b/java/client/src/test/java/babushka/api/CommandsTest.java @@ -0,0 +1,94 @@ +package babushka.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import babushka.managers.CommandManager; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CommandsTest { + + Commands service; + + CommandManager commandsManager; + + private static String OK_RESPONSE = response.ResponseOuterClass.ConstantResponse.OK.toString(); + + @BeforeEach + public void setUp() { + commandsManager = mock(CommandManager.class); + service = new Commands(commandsManager); + } + + @Test + public void get_success() throws ExecutionException, InterruptedException { + // setup + // TODO: randomize keys + String key = "testKey"; + String value = "testValue"; + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(value); + when(commandsManager.get(eq(key))).thenReturn(testResponse); + + // exercise + Future response = service.get(key); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + + // teardown + } + + // TODO: test_get_InterruptedException and ExecutionException + + @Test + public void set_success() throws ExecutionException, InterruptedException { + // setup + // TODO: randomize key and value + String key = "testKey"; + String value = "testValue"; + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(OK_RESPONSE); + when(commandsManager.set(eq(key), eq(value))).thenReturn(testResponse); + + // exercise + Future response = service.set(key, value); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(OK_RESPONSE, payload); + + // teardown + } + + @Test + public void test_ping_success() { + // setup + + // exercise + + // verify + + // teardown + } + + @Test + public void test_info_success() { + // setup + + // exercise + + // verify + + // teardown + } +} diff --git a/java/client/src/test/java/babushka/api/ConnectionTest.java b/java/client/src/test/java/babushka/api/ConnectionTest.java new file mode 100644 index 0000000000..e4f0bb5052 --- /dev/null +++ b/java/client/src/test/java/babushka/api/ConnectionTest.java @@ -0,0 +1,53 @@ +package babushka.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import babushka.managers.ConnectionManager; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class ConnectionTest { + + Connection service; + + ConnectionManager connectionManager; + + private static String HOST = "host"; + private static int PORT = 9999; + + @BeforeEach + public void setUp() { + connectionManager = mock(ConnectionManager.class); + service = new Connection(connectionManager); + } + + @Test + public void asyncConnectToRedis_success() { + // setup + boolean useSsl = false; + boolean clusterMode = false; + CompletableFuture testResponse = mock(CompletableFuture.class); + when(connectionManager.connectToRedis(anyString(), anyInt(), anyBoolean(), anyBoolean())) + .thenReturn(testResponse); + + // exercise + CompletableFuture connectionResponse = + service.connectToRedis(HOST, PORT, useSsl, clusterMode); + + // verify + Mockito.verify(connectionManager, times(1)) + .connectToRedis(eq(HOST), eq(PORT), eq(useSsl), eq(clusterMode)); + assertEquals(testResponse, connectionResponse); + + // teardown + } +} diff --git a/java/client/src/test/java/babushka/connectors/SocketConnectionTest.java b/java/client/src/test/java/babushka/connectors/SocketConnectionTest.java new file mode 100644 index 0000000000..43a6d34593 --- /dev/null +++ b/java/client/src/test/java/babushka/connectors/SocketConnectionTest.java @@ -0,0 +1,3 @@ +package babushka.connectors; + +public class SocketConnectionTest {} diff --git a/java/settings.gradle b/java/settings.gradle index c392c65247..e96b2925b4 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -1,4 +1,4 @@ -rootProject.name = 'javababushka' +rootProject.name = 'babushka' include 'client' include 'integTest' diff --git a/java/src/lib.rs b/java/src/lib.rs index 468d8797e7..8d11cf90e6 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -1,14 +1,13 @@ use babushka::start_socket_listener; -use jni::objects::{JClass, JObject, JThrowable}; +use jni::objects::{JClass, JObject, JThrowable, JObjectArray}; use jni::JNIEnv; use jni::sys::jlong; use std::sync::mpsc; use log::error; -use logger_core::Level; use redis::Value; -fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { +fn redis_value_to_java<'local>(env: &mut JNIEnv<'local>, val: Value) -> JObject<'local> { match val { Value::Nil => JObject::null(), Value::Status(str) => JObject::from(env.new_string(str).unwrap()), @@ -22,40 +21,40 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { JObject::null() }, }, - Value::Bulk(_bulk) => { - let _ = env.throw("Not implemented"); - JObject::null() - /* - let elements: &PyList = PyList::new( - py, - bulk.into_iter() - .map(|item| redis_value_to_py(py, item).unwrap()), - ); - Ok(elements.into_py(py)) - */ + Value::Bulk(bulk) => { + // TODO: Consider caching the method ID here in a static variable (might need RwLock to mutate) + let items: JObjectArray = env + .new_object_array(bulk.len() as i32, "java/lang/Object", JObject::null()) + .unwrap(); + + for (i, item) in bulk.into_iter().enumerate() { + let java_value = redis_value_to_java(env, item); + env.set_object_array_element(&items, i as i32, java_value) + .unwrap(); + } + + items.into() } } } #[no_mangle] -pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_valueFromPointer<'local>( +pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_valueFromPointer<'local>( mut env: JNIEnv<'local>, _class: JClass<'local>, pointer: jlong ) -> JObject<'local> { let value = unsafe { Box::from_raw(pointer as *mut Value) }; - redis_value_to_java(env, *value) + redis_value_to_java(&mut env, *value) } #[no_mangle] -pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_startSocketListenerExternal<'local>( - mut env: JNIEnv<'local>, +pub extern "system" fn Java_babushka_ffi_resolvers_BabushkaCoreNativeDefinitions_startSocketListenerExternal<'local>( + env: JNIEnv<'local>, _class: JClass<'local> ) -> JObject<'local> { let (tx, rx) = mpsc::channel::>(); - //logger_core::init(Some(Level::Trace), None); - start_socket_listener(move |socket_path : Result| { // Signals that thread has started let _ = tx.send(socket_path);