Skip to content

Commit

Permalink
Java: Update benchmarking to use SynchronousQueue (#129) (valkey-io#1092
Browse files Browse the repository at this point in the history
)

Java: Update benchmarking to use CachedThreadPool  (#129)

* Run with CachedThreadPool



* Add cached thread pool with rejection handler



* Remove unused constant



---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Mar 18, 2024
1 parent a85e514 commit 54c498e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
2 changes: 1 addition & 1 deletion benchmarks/install_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ do
-lettuce)
runAllBenchmarks=0
runJava=1
chosenClients="lettuce_async"
chosenClients="lettuce"
;;
-jedis)
runAllBenchmarks=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -107,6 +112,23 @@ public static void printResults(
public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
// same as Executors.newCachedThreadPool() with a RejectedExecutionHandler for robustness
ExecutorService executor =
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
(r, poolExecutor) -> {
if (!poolExecutor.isShutdown()) {
try {
poolExecutor.getQueue().put(r);
} catch (InterruptedException e) {
throw new RuntimeException("interrupted");
}
}
});
int iterations =
config.minimal ? 1000 : Math.min(Math.max(100000, concurrentNum * 10000), 10000000);
for (int clientCount : config.clientCount) {
Expand Down Expand Up @@ -143,6 +165,7 @@ public static void testClientSetGet(
clients,
taskNumDebugging,
iterations,
executor,
config.debugLogging));
}
if (config.debugLogging) {
Expand Down Expand Up @@ -201,6 +224,7 @@ public static void testClientSetGet(
printResults(calculatedResults, (after - started) / NANO_TO_SECONDS, iterations);
}
}
executor.shutdownNow();
}

System.out.println();
Expand All @@ -215,6 +239,7 @@ private static CompletableFuture<Map<ChosenAction, ArrayList<Long>>> createTask(
List<Client> clients,
int taskNumDebugging,
int iterations,
Executor executor,
boolean debugLogging) {
return CompletableFuture.supplyAsync(
() -> {
Expand Down Expand Up @@ -243,7 +268,8 @@ private static CompletableFuture<Map<ChosenAction, ArrayList<Long>>> createTask(
taskActionResults.get(result.getLeft()).add(result.getRight());
}
return taskActionResults;
});
},
executor);
}

public static Map<ChosenAction, Operation> getActionMap(int dataSize, boolean async) {
Expand Down

0 comments on commit 54c498e

Please sign in to comment.