Skip to content

Commit

Permalink
Make benchmarking more asynchronous
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Nov 9, 2023
1 parent aaf0c30 commit 4ac9b33
Showing 1 changed file with 41 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package javababushka.benchmarks.utils;

import static java.util.concurrent.CompletableFuture.runAsync;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javababushka.benchmarks.AsyncClient;
import javababushka.benchmarks.BenchmarkingApp;
import javababushka.benchmarks.Client;
Expand All @@ -23,8 +28,8 @@ public class Benchmarking {
static final int SIZE_GET_KEYSPACE = 3750000;
static final int SIZE_SET_KEYSPACE = 3000000;
static final int ASYNC_OPERATION_TIMEOUT_SEC = 1;
// measurements are done in nano-seconds, but it should be converted to seconds later
static final double SECONDS_IN_NANO = 1e-9;
// measurements are done in nanoseconds, but it should be converted to seconds later
public static final double SECONDS_IN_NANO = 1e-9;

private static ChosenAction randomAction() {
if (Math.random() > PROB_GET) {
Expand Down Expand Up @@ -108,6 +113,7 @@ public static Map<ChosenAction, LatencyResults> calculateResults(
}

public static void printResults(Map<ChosenAction, LatencyResults> resultsMap) {
int totalHits = 0;
for (Map.Entry<ChosenAction, LatencyResults> entry : resultsMap.entrySet()) {
ChosenAction action = entry.getKey();
LatencyResults results = entry.getValue();
Expand All @@ -118,7 +124,9 @@ public static void printResults(Map<ChosenAction, LatencyResults> resultsMap) {
System.out.println(action + " p99 latency in ms: " + results.p99Latency / 1000000.0);
System.out.println(action + " std dev in ms: " + results.stdDeviation / 1000000.0);
System.out.println(action + " total hits: " + results.totalHits);
totalHits += results.totalHits;
}
System.out.println("Total hits: " + totalHits);
}

public static void testClientSetGet(
Expand Down Expand Up @@ -172,28 +180,46 @@ public static void testClientSetGet(

iterationIncrement = iterationCounter.getAndIncrement();
}
});
}
);
}
if (config.debugLogging) {
System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName());
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
}
long started = System.nanoTime();
tasks.stream()
.map(CompletableFuture::runAsync)
.forEach(
f -> {
try {
f.get();
} catch (Exception e) {
e.printStackTrace();
}
});

ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum);
long started = System.nanoTime();
// create threads and add them to the async pool.
// This will start execution of all the concurrent tasks.
List<CompletableFuture> asyncTasks =
tasks.stream().map((runnable) -> runAsync(runnable, threadPool)).collect(Collectors.toList());
// close pool and await for tasks to complete
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
// wait before waiting for threads to complete
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
// wait for all futures to complete
asyncTasks.forEach(
future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
long after = System.nanoTime();
var calculatedResults = calculateResults(actionResults);

if (config.resultsFile.isPresent()) {
double tps = iterationCounter.get() * SECONDS_IN_NANO / (after - started);
JsonWriter.Write(
calculatedResults,
config.resultsFile.get(),
Expand All @@ -202,7 +228,7 @@ public static void testClientSetGet(
clientCreator.get().getName(),
clientCount,
concurrentNum,
iterationCounter.get() * 1e9 / (System.nanoTime() - started));
tps);
}
printResults(calculatedResults);
}
Expand Down

0 comments on commit 4ac9b33

Please sign in to comment.