diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index c071ac01ae..d27acecc5c 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -1,5 +1,7 @@ package javababushka.benchmarks; +import static javababushka.benchmarks.utils.Benchmarking.testClientSetGet; + import java.io.FileWriter; import java.io.IOException; import java.util.Arrays; @@ -7,6 +9,9 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; +import javababushka.benchmarks.clients.LettuceClient; +import javababushka.benchmarks.clients.LettuceAsyncClient; +import javababushka.benchmarks.clients.LettuceClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -43,12 +48,10 @@ public static void main(String[] args) { System.out.println("Run JEDI pseudo-async client"); break; case LETTUCE: - // run testClientSetGet on LETTUCE sync client - System.out.println("Run LETTUCE sync client"); + testClientSetGet(LettuceClient::new, runConfiguration, false); break; case LETTUCE_ASYNC: - // run testClientSetGet on LETTUCE async client - System.out.println("Run LETTUCE async client"); + testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); break; case BABUSHKA: System.out.println("Babushka not yet configured"); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java new file mode 100644 index 0000000000..6413829eab --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java @@ -0,0 +1,19 @@ +package javababushka.benchmarks.clients; + +import java.util.concurrent.Future; + +/** + * A Redis client with async capabilities + */ +public interface AsyncClient extends Client { + + long DEFAULT_TIMEOUT = 1000; + + Future asyncSet(String key, String value); + + Future asyncGet(String key); + + T waitForResult(Future future); + + T waitForResult(Future future, long timeout); +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java new file mode 100644 index 0000000000..a3376ccbec --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java @@ -0,0 +1,16 @@ +package javababushka.benchmarks.clients; + +import javababushka.benchmarks.utils.ConnectionSettings; + +/** + * A Redis client interface + */ +public interface Client { + void connectToRedis(); + + void connectToRedis(ConnectionSettings connectionSettings); + + default void closeConnection() {} + + String getName(); +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceAsyncClient.java new file mode 100644 index 0000000000..40cfd50543 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceAsyncClient.java @@ -0,0 +1,73 @@ +package javababushka.benchmarks.clients; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** + * A Lettuce client with async capabilities + * see: https://lettuce.io/ + */ +public class LettuceAsyncClient implements AsyncClient { + + RedisClient client; + RedisAsyncCommands asyncCommands; + StatefulRedisConnection connection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + client = + RedisClient.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port)); + connection = client.connect(); + asyncCommands = connection.async(); + } + + @Override + public RedisFuture asyncSet(String key, String value) { + return asyncCommands.set(key, value); + } + + @Override + public RedisFuture asyncGet(String key) { + return asyncCommands.get(key); + } + + @Override + public Object waitForResult(Future future) { + return waitForResult(future, DEFAULT_TIMEOUT); + } + + @Override + public Object waitForResult(Future future, long timeoutMS) { + try { + return future.get(timeoutMS, TimeUnit.MILLISECONDS); + } catch (Exception ignored) { + return null; + } + } + + @Override + public void closeConnection() { + connection.close(); + client.shutdown(); + } + + @Override + public String getName() { + return "Lettuce Async"; + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceClient.java new file mode 100644 index 0000000000..0ce80fac0f --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/LettuceClient.java @@ -0,0 +1,56 @@ +package javababushka.benchmarks.clients; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisStringCommands; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** + * A Lettuce client with sync capabilities + * see: https://lettuce.io/ + */ +public class LettuceClient implements SyncClient { + + RedisClient client; + RedisStringCommands syncCommands; + StatefulRedisConnection connection; + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + client = + RedisClient.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port)); + connection = client.connect(); + syncCommands = connection.sync(); + } + + @Override + public void set(String key, String value) { + syncCommands.set(key, value); + } + + @Override + public String get(String key) { + return (String) syncCommands.get(key); + } + + @Override + public void closeConnection() { + connection.close(); + client.shutdown(); + } + + @Override + public String getName() { + return "Lettuce"; + } +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java new file mode 100644 index 0000000000..fa9ebd397e --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java @@ -0,0 +1,10 @@ +package javababushka.benchmarks.clients; + +/** + * A Redis client with sync capabilities + */ +public interface SyncClient extends Client { + void set(String key, String value); + + String get(String key); +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java new file mode 100644 index 0000000000..b73394a3eb --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -0,0 +1,260 @@ +package javababushka.benchmarks.utils; + +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.BenchmarkingApp; +import javababushka.benchmarks.clients.Client; +import javababushka.benchmarks.clients.SyncClient; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Class to calculate latency on client-actions + */ +public class Benchmarking { + static final double PROB_GET = 0.8; + static final double PROB_GET_EXISTING_KEY = 0.8; + static final int SIZE_GET_KEYSPACE = 3750000; + static final int SIZE_SET_KEYSPACE = 3000000; + static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; + + private static ChosenAction randomAction() { + if (Math.random() > PROB_GET) { + return ChosenAction.SET; + } + if (Math.random() > PROB_GET_EXISTING_KEY) { + return ChosenAction.GET_NON_EXISTING; + } + return ChosenAction.GET_EXISTING; + } + + public static String generateKeyGet() { + int range = SIZE_GET_KEYSPACE - SIZE_SET_KEYSPACE; + return Math.floor(Math.random() * range + SIZE_SET_KEYSPACE + 1) + ""; + } + + public static String generateKeySet() { + return (Math.floor(Math.random() * SIZE_SET_KEYSPACE) + 1) + ""; + } + + public interface Operation { + void go() throws Exception; + } + + private static Pair getLatency(Map actions) { + var action = randomAction(); + long before = System.nanoTime(); + try { + actions.get(action).go(); + } catch (Exception e) { + // timed out - exception from Future::get + } + long after = System.nanoTime(); + return Pair.of(action, after - before); + } + + // Assumption: latencies is sorted in ascending order + private static Long percentile(ArrayList latencies, int percentile) { + int N = latencies.size(); + double n = (N - 1) * percentile / 100. + 1; + if (n == 1d) return latencies.get(0); + else if (n == N) return latencies.get(N - 1); + int k = (int) n; + double d = n - k; + return Math.round(latencies.get(k - 1) + d * (latencies.get(k) - latencies.get(k - 1))); + } + + private static double stdDeviation(ArrayList latencies, Double avgLatency) { + double stdDeviation = + latencies.stream() + .mapToDouble(Long::doubleValue) + .reduce(0.0, (stdDev, latency) -> stdDev + Math.pow(latency - avgLatency, 2)); + return Math.sqrt(stdDeviation / latencies.size()); + } + + // This has the side-effect of sorting each latencies ArrayList + public static Map calculateResults( + Map> actionLatencies) { + Map results = new HashMap(); + + for (Map.Entry> entry : actionLatencies.entrySet()) { + ChosenAction action = entry.getKey(); + ArrayList latencies = entry.getValue(); + + Double avgLatency = + latencies.stream().collect(Collectors.summingLong(Long::longValue)) + / Double.valueOf(latencies.size()); + + Collections.sort(latencies); + results.put( + action, + new LatencyResults( + avgLatency, + percentile(latencies, 50), + percentile(latencies, 90), + percentile(latencies, 99), + stdDeviation(latencies, avgLatency))); + } + + return results; + } + + public static void printResults( + Map calculatedResults, Optional resultsFile) { + if (resultsFile.isPresent()) { + printResults(calculatedResults, resultsFile.get()); + } else { + printResults(calculatedResults); + } + } + + public static void printResults( + Map resultsMap, FileWriter resultsFile) { + for (Map.Entry entry : resultsMap.entrySet()) { + ChosenAction action = entry.getKey(); + LatencyResults results = entry.getValue(); + + try { + resultsFile.write("Avg. time in ms per " + action + ": " + results.avgLatency / 1000000.0); + resultsFile.write(action + " p50 latency in ms: " + results.p50Latency / 1000000.0); + resultsFile.write(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); + resultsFile.write(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); + resultsFile.write(action + " std dev in ms: " + results.stdDeviation / 1000000.0); + } catch (Exception ignored) { + } + } + } + + public static void printResults(Map resultsMap) { + for (Map.Entry entry : resultsMap.entrySet()) { + ChosenAction action = entry.getKey(); + LatencyResults results = entry.getValue(); + + System.out.println("Avg. time in ms per " + action + ": " + results.avgLatency / 1000000.0); + System.out.println(action + " p50 latency in ms: " + results.p50Latency / 1000000.0); + System.out.println(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); + System.out.println(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); + System.out.println(action + " std dev in ms: " + results.stdDeviation / 1000000.0); + } + } + + public static void testClientSetGet( + Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { + for (int concurrentNum : config.concurrentTasks) { + int iterations = Math.min(Math.max(100000, concurrentNum * 10000), 10000000); + for (int clientCount : config.clientCount) { + System.out.printf( + "%n =====> %s <===== %d clients %d concurrent %n%n", + clientCreator.get().getName(), clientCount, concurrentNum); + AtomicInteger iterationCounter = new AtomicInteger(0); + Map> actionResults = + Map.of( + ChosenAction.GET_EXISTING, new ArrayList<>(), + ChosenAction.GET_NON_EXISTING, new ArrayList<>(), + ChosenAction.SET, new ArrayList<>()); + List tasks = new ArrayList<>(); + + // create clients + List clients = new LinkedList<>(); + for (int cc = 0; cc < clientCount; cc++) { + Client newClient = clientCreator.get(); + newClient.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls)); + clients.add(newClient); + } + + for (int taskNum = 0; taskNum < concurrentNum; taskNum++) { + final int taskNumDebugging = taskNum; + tasks.add( + () -> { + int iterationIncrement = iterationCounter.getAndIncrement(); + int clientIndex = iterationIncrement % clients.size(); + + if (config.debugLogging) { + System.out.printf( + "%n concurrent = %d/%d, client# = %d/%d%n", + taskNumDebugging, concurrentNum, clientIndex + 1, clientCount); + } + while (iterationIncrement < iterations) { + if (config.debugLogging) { + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement + 1, iterations, clientIndex + 1, clientCount); + } + // operate and calculate tik-tok + Pair result = + measurePerformance(clients.get(clientIndex), config.dataSize, async); + actionResults.get(result.getLeft()).add(result.getRight()); + + iterationIncrement = iterationCounter.getAndIncrement(); + } + }); + } + if (config.debugLogging) { + System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); + System.out.printf( + "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", + concurrentNum, clientCount, tasks.size()); + } + tasks.stream() + .map(CompletableFuture::runAsync) + .forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + printResults(calculateResults(actionResults), config.resultsFile); + } + } + + System.out.println(); + } + + public static Pair measurePerformance( + Client client, int dataSize, boolean async) { + + String value = RandomStringUtils.randomAlphanumeric(dataSize); + Map actions = new HashMap<>(); + actions.put( + ChosenAction.GET_EXISTING, + async + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeySet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeySet())); + actions.put( + ChosenAction.GET_NON_EXISTING, + async + ? () -> + ((AsyncClient) client) + .asyncGet(generateKeyGet()) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).get(generateKeyGet())); + actions.put( + ChosenAction.SET, + async + ? () -> + ((AsyncClient) client) + .asyncSet(generateKeySet(), value) + .get(ASYNC_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS) + : () -> ((SyncClient) client).set(generateKeySet(), value)); + + return getLatency(actions); + } +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java new file mode 100644 index 0000000000..3cc685139e --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ChosenAction.java @@ -0,0 +1,10 @@ +package javababushka.benchmarks.utils; + +/** + * enum for actions + */ +public enum ChosenAction { + GET_NON_EXISTING, + GET_EXISTING, + SET +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java new file mode 100644 index 0000000000..f1aee3d06a --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java @@ -0,0 +1,16 @@ +package javababushka.benchmarks.utils; + +/** + * Redis-client settings + */ +public class ConnectionSettings { + public String host; + public int port; + public boolean useSsl; + + public ConnectionSettings(String host, int port, boolean useSsl) { + this.host = host; + this.port = port; + this.useSsl = useSsl; + } +} \ No newline at end of file diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java new file mode 100644 index 0000000000..054623b8f0 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java @@ -0,0 +1,21 @@ +package javababushka.benchmarks.utils; + +/** + * Raw timing results in nanoseconds + */ +public class LatencyResults { + public final double avgLatency; + public final long p50Latency; + public final long p90Latency; + public final long p99Latency; + public final double stdDeviation; + + public LatencyResults( + double avgLatency, long p50Latency, long p90Latency, long p99Latency, double stdDeviation) { + this.avgLatency = avgLatency; + this.p50Latency = p50Latency; + this.p90Latency = p90Latency; + this.p99Latency = p99Latency; + this.stdDeviation = stdDeviation; + } +} \ No newline at end of file