diff --git a/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java b/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java index 6c2a797be6..21328f3391 100644 --- a/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java +++ b/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java @@ -4,9 +4,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -15,11 +19,15 @@ import javabushka.client.lettuce.LettuceAsyncClient; import javabushka.client.lettuce.LettuceClient; import javabushka.client.utils.Benchmarking; +import javabushka.client.utils.ChosenAction; +import javabushka.client.utils.ConnectionSettings; +import javabushka.client.utils.LatencyResults; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; /** Benchmarking app for reporting performance of various redis-rs Java-clients */ public class BenchmarkingApp { @@ -40,24 +48,28 @@ public static void main(String[] args) { System.err.println("Parsing failed. Reason: " + exp.getMessage()); } - for (ClientName client : runConfiguration.clients) { - switch (client) { - case JEDIS: - testClientSetGet(JedisClient::new, runConfiguration, false); - break; - case JEDIS_ASYNC: - testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); - break; - case LETTUCE: - testClientSetGet(LettuceClient::new, runConfiguration, false); - break; - case LETTUCE_ASYNC: - testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); - break; - case BABUSHKA: - System.out.println("Babushka not yet configured"); - break; + try { + for (ClientName client : runConfiguration.clients) { + switch (client) { + case JEDIS: + testIterateTasksAndClientSetGet(JedisClient::new, runConfiguration, false); + break; + case JEDIS_ASYNC: + testIterateTasksAndClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); + break; + case LETTUCE: + testIterateTasksAndClientSetGet(LettuceClient::new, runConfiguration, false); + break; + case LETTUCE_ASYNC: + testIterateTasksAndClientSetGet(LettuceAsyncClient::new, runConfiguration, true); + break; + case BABUSHKA: + System.out.println("Babushka not yet configured"); + break; + } } + } catch (IOException ioException) { + System.out.println("Error writing results to file"); } if (runConfiguration.resultsFile.isPresent()) { @@ -169,7 +181,7 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce // check if it's the correct format if (!clientCount.matches("\\d+(\\s+\\d+)?")) { - throw new ParseException("Invalid concurrentTasks"); + throw new ParseException("Invalid clientCount"); } // split the string into a list of integers runConfiguration.clientCount = @@ -183,49 +195,103 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce return runConfiguration; } - private static void testClientSetGet( - Supplier clientCreator, RunConfiguration runConfiguration, boolean async) { - System.out.printf("%n =====> %s <===== %n%n", clientCreator.get().getName()); - for (int concurrentNum : runConfiguration.concurrentTasks) { - for (int clientNum : runConfiguration.clientCount) { - - List tasks = new ArrayList<>(); - - for (int i = 0; i < concurrentNum; ) { - for (int j = 0; j < clientNum; j++) { - i++; - int finalI = i; - int finalJ = j; - tasks.add( - () -> { - System.out.printf( - "%n concurrent = %d/%d, client# = %d/%d%n", - finalI, concurrentNum, finalJ + 1, clientNum); - Benchmarking.printResults( - Benchmarking.measurePerformance( - clientCreator.get(), runConfiguration, async)); - }); - } - } - System.out.printf( - "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", - concurrentNum, clientNum, tasks.size()); - tasks.stream() - .map(CompletableFuture::runAsync) - .forEach( - f -> { - try { - f.get(); - } catch (Exception e) { - e.printStackTrace(); - } - }); + // call testConcurrentClientSetGet for each concurrentTask/clientCount pairing + private static void testIterateTasksAndClientSetGet(Supplier clientSupplier, + RunConfiguration runConfiguration, + boolean async) throws IOException { + System.out.printf("%n =====> %s <===== %n%n", clientSupplier.get().getName()); + for (int clientCount : runConfiguration.clientCount) { + for (int concurrentTasks : runConfiguration.concurrentTasks) { + Client client = clientSupplier.get(); + testConcurrentClientSetGet( + clientSupplier, + runConfiguration, + concurrentTasks, + clientCount, + async); } } - System.out.println(); } + // call one test scenario: with a number of concurrent threads against clientCount number + // of clients + private static void testConcurrentClientSetGet(Supplier clientSupplier, + RunConfiguration runConfiguration, + int concurrentTasks, + int clientCount, + boolean async) throws IOException { + // fetch a reasonable number of iterations based on the number of concurrent tasks + int iterations = Math.min(Math.max(100000, concurrentTasks * 10000), 10000000); + AtomicInteger iterationCounter = new AtomicInteger(0); + + // create clients + List clients = new LinkedList<>(); + for(int i = 0; i < clientCount; i++) { + Client newClient = clientSupplier.get(); + newClient.connectToRedis(new ConnectionSettings( + runConfiguration.host, + runConfiguration.port, + runConfiguration.tls)); + clients.add(newClient); + } + + // create runnable tasks + List tasks = new ArrayList<>(); + Map> actionResults = new HashMap<>(); + actionResults.put(ChosenAction.GET_EXISTING, new ArrayList<>()); + actionResults.put(ChosenAction.GET_NON_EXISTING, new ArrayList<>()); + actionResults.put(ChosenAction.SET, new ArrayList<>()); + + // add one runnable task for each concurrentTask + // task will run a random action against a client, uniformly distributed amongst all clients + for(int concurrentTaskIndex = 0; concurrentTaskIndex < concurrentTasks; concurrentTaskIndex++) { + System.out.printf("concurrent task: %d/%d%n", concurrentTaskIndex+1, concurrentTasks); + tasks.add( + () -> { + int iterationIncrement = iterationCounter.incrementAndGet(); + while (iterationIncrement < iterations) { + int clientIndex = iterationIncrement % clients.size(); + System.out.printf( + "> iteration = %d/%d, client# = %d/%d%n", + iterationIncrement+1, + iterations, + clientIndex+1, + clientCount); + + // operate and calculate tik-tok + Pair result = Benchmarking.measurePerformance( + clients.get(clientIndex), + runConfiguration.dataSize, + async + ); + + // save tik-tok to actionResults + actionResults.get(result.getLeft()).add(result.getRight()); + iterationIncrement = iterationCounter.incrementAndGet(); + } + }); + } + + // run all tasks asynchronously + tasks.stream() + .map(CompletableFuture::runAsync) + .forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // use results file to stdout/print + Benchmarking.printResults(Benchmarking.calculateResults(actionResults), runConfiguration.resultsFile); + + // close connections + clients.forEach(c -> c.closeConnection()); + } + public enum ClientName { JEDIS("Jedis"), JEDIS_ASYNC("Jedis async"), @@ -277,7 +343,7 @@ public RunConfiguration() { }; host = "localhost"; port = 6379; - clientCount = new int[] {1}; + clientCount = new int[] {1, 2}; tls = false; } } diff --git a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java index c3acbbf468..eccfa0d355 100644 --- a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java +++ b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java @@ -13,6 +13,7 @@ import javabushka.client.Client; import javabushka.client.SyncClient; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.tuple.Pair; public class Benchmarking { static final double PROB_GET = 0.8; @@ -60,6 +61,20 @@ public static Map> getLatencies( return latencies; } + public static Pair getLatency( + Map actions) { + + ChosenAction action = randomAction(); + Operation op = actions.get(action); + + long before = System.nanoTime(); + op.go(); + long after = System.nanoTime(); + + return Pair.of(action, after - before); + + } + private static void addLatency(Operation op, ArrayList latencies) { long before = System.nanoTime(); op.go(); @@ -150,21 +165,21 @@ public static void printResults(Map resultsMap) { } } - public static Map measurePerformance( - Client client, BenchmarkingApp.RunConfiguration config, boolean async) { - client.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls)); + public static Pair measurePerformance( + Client client, int setDataSize, boolean async) { + Map> results = new HashMap<>(); - int iterations = 100; - String value = RandomStringUtils.randomAlphanumeric(config.dataSize); - if (config.resultsFile.isPresent()) { - try { - config.resultsFile.get().write(client.getName() + " client Benchmarking: "); - } catch (Exception ignored) { - } - } else { - System.out.printf("%s client Benchmarking: %n", client.getName()); - } + String setValue = RandomStringUtils.randomAlphanumeric(setDataSize); + +// if (config.resultsFile.isPresent()) { +// try { +// config.resultsFile.get().write(client.getName() + " client Benchmarking: "); +// } catch (Exception ignored) { +// } +// } else { +// System.out.printf("%s client Benchmarking: %n", client.getName()); +// } Map actions = new HashMap<>(); actions.put( @@ -180,11 +195,11 @@ public static Map measurePerformance( actions.put( ChosenAction.SET, async - ? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), value) - : () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), value)); + ? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), setValue) + : () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), setValue)); - var results = Benchmarking.calculateResults(Benchmarking.getLatencies(iterations, actions)); - client.closeConnection(); - return results; + return Benchmarking.getLatency(actions); + +// return Benchmarking.calculateResults(results); } }