From 54c498ed834504e8559ed3bb9e7123721c725944 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 18 Mar 2024 09:14:57 -0700 Subject: [PATCH] Java: Update benchmarking to use SynchronousQueue (#129) (#1092) Java: Update benchmarking to use CachedThreadPool (#129) * Run with CachedThreadPool * Add cached thread pool with rejection handler * Remove unused constant --------- Signed-off-by: Andrew Carbonetto --- benchmarks/install_and_test.sh | 2 +- .../glide/benchmarks/utils/Benchmarking.java | 28 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/benchmarks/install_and_test.sh b/benchmarks/install_and_test.sh index ebd650f39e..69dec5e389 100755 --- a/benchmarks/install_and_test.sh +++ b/benchmarks/install_and_test.sh @@ -196,7 +196,7 @@ do -lettuce) runAllBenchmarks=0 runJava=1 - chosenClients="lettuce_async" + chosenClients="lettuce" ;; -jedis) runAllBenchmarks=0 diff --git a/java/benchmarks/src/main/java/glide/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/glide/benchmarks/utils/Benchmarking.java index 85acf3160d..82bd607a70 100644 --- a/java/benchmarks/src/main/java/glide/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/glide/benchmarks/utils/Benchmarking.java @@ -12,6 +12,11 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.commons.lang3.tuple.Pair; @@ -107,6 +112,23 @@ public static void printResults( public static void testClientSetGet( Supplier clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) { for (int concurrentNum : config.concurrentTasks) { + // same as Executors.newCachedThreadPool() with a RejectedExecutionHandler for robustness + ExecutorService executor = + new ThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue(), + (r, poolExecutor) -> { + if (!poolExecutor.isShutdown()) { + try { + poolExecutor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RuntimeException("interrupted"); + } + } + }); int iterations = config.minimal ? 1000 : Math.min(Math.max(100000, concurrentNum * 10000), 10000000); for (int clientCount : config.clientCount) { @@ -143,6 +165,7 @@ public static void testClientSetGet( clients, taskNumDebugging, iterations, + executor, config.debugLogging)); } if (config.debugLogging) { @@ -201,6 +224,7 @@ public static void testClientSetGet( printResults(calculatedResults, (after - started) / NANO_TO_SECONDS, iterations); } } + executor.shutdownNow(); } System.out.println(); @@ -215,6 +239,7 @@ private static CompletableFuture>> createTask( List clients, int taskNumDebugging, int iterations, + Executor executor, boolean debugLogging) { return CompletableFuture.supplyAsync( () -> { @@ -243,7 +268,8 @@ private static CompletableFuture>> createTask( taskActionResults.get(result.getLeft()).add(result.getRight()); } return taskActionResults; - }); + }, + executor); } public static Map getActionMap(int dataSize, boolean async) {