From 4ac9b33de5d20414e77bc22bbf79cf01aef4ce8f Mon Sep 17 00:00:00 2001 From: acarbonetto Date: Thu, 9 Nov 2023 09:05:48 -0800 Subject: [PATCH] Make benchmarking more asynchronous Signed-off-by: acarbonetto --- .../benchmarks/utils/Benchmarking.java | 56 ++++++++++++++----- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index c74b4aebdc..be03f95010 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -1,5 +1,7 @@ package javababushka.benchmarks.utils; +import static java.util.concurrent.CompletableFuture.runAsync; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -7,9 +9,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; import javababushka.benchmarks.AsyncClient; import javababushka.benchmarks.BenchmarkingApp; import javababushka.benchmarks.Client; @@ -23,8 +28,8 @@ public class Benchmarking { static final int SIZE_GET_KEYSPACE = 3750000; static final int SIZE_SET_KEYSPACE = 3000000; static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; - // measurements are done in nano-seconds, but it should be converted to seconds later - static final double SECONDS_IN_NANO = 1e-9; + // measurements are done in nanoseconds, but it should be converted to seconds later + public static final double SECONDS_IN_NANO = 1e-9; private static ChosenAction randomAction() { if (Math.random() > PROB_GET) { @@ -108,6 +113,7 @@ public static Map calculateResults( } public static void printResults(Map resultsMap) { + int totalHits = 0; for (Map.Entry entry : resultsMap.entrySet()) { ChosenAction action = entry.getKey(); LatencyResults results = entry.getValue(); @@ -118,7 +124,9 @@ public static void printResults(Map resultsMap) { System.out.println(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); System.out.println(action + " std dev in ms: " + results.stdDeviation / 1000000.0); System.out.println(action + " total hits: " + results.totalHits); + totalHits += results.totalHits; } + System.out.println("Total hits: " + totalHits); } public static void testClientSetGet( @@ -172,7 +180,8 @@ public static void testClientSetGet( iterationIncrement = iterationCounter.getAndIncrement(); } - }); + } + ); } if (config.debugLogging) { System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); @@ -180,20 +189,37 @@ public static void testClientSetGet( "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", concurrentNum, clientCount, tasks.size()); } - long started = System.nanoTime(); - tasks.stream() - .map(CompletableFuture::runAsync) - .forEach( - f -> { - try { - f.get(); - } catch (Exception e) { - e.printStackTrace(); - } - }); + ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum); + long started = System.nanoTime(); + // create threads and add them to the async pool. + // This will start execution of all the concurrent tasks. + List asyncTasks = + tasks.stream().map((runnable) -> runAsync(runnable, threadPool)).collect(Collectors.toList()); + // close pool and await for tasks to complete + threadPool.shutdown(); + while (!threadPool.isTerminated()) { + try { + // wait before waiting for threads to complete + Thread.sleep(100); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + // wait for all futures to complete + asyncTasks.forEach( + future -> { + try { + future.get(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + long after = System.nanoTime(); var calculatedResults = calculateResults(actionResults); + if (config.resultsFile.isPresent()) { + double tps = iterationCounter.get() * SECONDS_IN_NANO / (after - started); JsonWriter.Write( calculatedResults, config.resultsFile.get(), @@ -202,7 +228,7 @@ public static void testClientSetGet( clientCreator.get().getName(), clientCount, concurrentNum, - iterationCounter.get() * 1e9 / (System.nanoTime() - started)); + tps); } printResults(calculatedResults); }