diff --git a/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java b/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java index c6d08f43db..625807a1c3 100644 --- a/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java +++ b/java/jabushka/benchmarks/src/main/java/javabushka/client/BenchmarkingApp.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -31,6 +32,10 @@ /** Benchmarking app for reporting performance of various redis-rs Java-clients */ public class BenchmarkingApp { + private static int ITERATIONS_MULTIPLIER = 100000; + private static int ITERATIONS_MIN = 100000; + private static int ITERATIONS_MAX = 100; // TODO max 1000000 + // main application entrypoint public static void main(String[] args) { @@ -198,6 +203,9 @@ private static void testIterateTasksAndClientSetGet( Supplier clientSupplier, RunConfiguration runConfiguration, boolean async) throws IOException { System.out.printf("%n =====> %s <===== %n%n", clientSupplier.get().getName()); + for (int concurrentTasks : runConfiguration.concurrentTasks) { + + } for (int clientCount : runConfiguration.clientCount) { for (int concurrentTasks : runConfiguration.concurrentTasks) { Client client = clientSupplier.get(); @@ -217,8 +225,9 @@ private static void testConcurrentClientSetGet( int clientCount, boolean async) throws IOException { + // fetch a reasonable number of iterations based on the number of concurrent tasks - int iterations = Math.min(Math.max(100000, concurrentTasks * 10000), 10000000); + int iterations = Math.min(Math.max(ITERATIONS_MIN, concurrentTasks * ITERATIONS_MULTIPLIER), ITERATIONS_MAX); AtomicInteger iterationCounter = new AtomicInteger(0); // create clients @@ -231,40 +240,98 @@ private static void testConcurrentClientSetGet( clients.add(newClient); } - // create runnable tasks + // create runnable tasks list and results map List tasks = new ArrayList<>(); + List>> futures = new ArrayList<>(iterations); + List> intermediateActionResults = new ArrayList<>(iterations); Map> actionResults = new HashMap<>(); actionResults.put(ChosenAction.GET_EXISTING, new ArrayList<>()); actionResults.put(ChosenAction.GET_NON_EXISTING, new ArrayList<>()); actionResults.put(ChosenAction.SET, new ArrayList<>()); +// actionResults.put(ChosenAction.FETCH, new ArrayList<>()); // add one runnable task for each concurrentTask // task will run a random action against a client, uniformly distributed amongst all clients for (int concurrentTaskIndex = 0; concurrentTaskIndex < concurrentTasks; concurrentTaskIndex++) { - System.out.printf("concurrent task: %d/%d%n", concurrentTaskIndex + 1, concurrentTasks); tasks.add( () -> { - int iterationIncrement = iterationCounter.incrementAndGet(); + int iterationIncrement = iterationCounter.get(); while (iterationIncrement < iterations) { int clientIndex = iterationIncrement % clients.size(); - System.out.printf( - "> iteration = %d/%d, client# = %d/%d%n", - iterationIncrement + 1, iterations, clientIndex + 1, clientCount); +// System.out.printf( +// "> iteration = %d/%d, client# = %d/%d%n", +// iterationIncrement + 1, iterations, clientIndex + 1, clientCount); - // operate and calculate tik-tok Pair result = - Benchmarking.measurePerformance( - clients.get(clientIndex), runConfiguration.dataSize, async); - - // save tik-tok to actionResults - actionResults.get(result.getLeft()).add(result.getRight()); + async ? + Benchmarking.measureAsyncPerformance( + clients.get(clientIndex), + runConfiguration.dataSize, + iterationIncrement, + futures) + : + Benchmarking.measureSyncPerformance( + clients.get(clientIndex), + runConfiguration.dataSize); + + // save tik-tok to intermediate actionResults + intermediateActionResults.add(iterationIncrement, result); iterationIncrement = iterationCounter.incrementAndGet(); } }); } + // run all tasks asynchronously + tasks.stream() + .map(CompletableFuture::runAsync) + .forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + System.out.println("WAIT 10 SECONDS"); + try { + Thread.sleep(1000L); // TODO update to 10 seconds + } catch (InterruptedException interruptedException) { + throw new RuntimeException("INTERRUPTED"); + } + + // now fetch results from futures + AtomicInteger fetchAsyncFuturesCounter = new AtomicInteger(0); + for (int concurrentTaskIndex = 0; + concurrentTaskIndex < concurrentTasks; + concurrentTaskIndex++) { + tasks.add( + () -> { + int iterationIncrement = fetchAsyncFuturesCounter.get(); + while (iterationIncrement < iterations) { + Pair> futurePair = futures.get(iterationIncrement); + int clientIndex = iterationIncrement % clients.size(); +// System.out.printf( +// "> fetch = %d/%d, client# = %d/%d%n", +// iterationIncrement + 1, iterations, clientIndex + 1, clientCount); + + Pair result = + Benchmarking.measureAsyncFetchPerformance( + clients.get(clientIndex), + futurePair.getRight()); + + // save tik-tok to actionResults, make sure to add the intermediate result + Long intermediateResult = intermediateActionResults.get(iterationIncrement).getRight(); + actionResults.get(futurePair.getLeft()).add( + result.getRight() + intermediateResult); + iterationIncrement = fetchAsyncFuturesCounter.incrementAndGet(); + } + } + ); + } + // run all tasks asynchronously tasks.stream() .map(CompletableFuture::runAsync) diff --git a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java index ed816983e8..6d43824ccb 100644 --- a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java +++ b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/Benchmarking.java @@ -1,12 +1,15 @@ package javabushka.client.utils; +import io.lettuce.core.RedisFuture; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Future; import java.util.stream.Collectors; import javabushka.client.AsyncClient; import javabushka.client.Client; @@ -30,12 +33,12 @@ private static ChosenAction randomAction() { return ChosenAction.GET_EXISTING; } - public static String generateKeyGet() { + public static String generateKeyNew() { int range = SIZE_GET_KEYSPACE - SIZE_SET_KEYSPACE; return Math.floor(Math.random() * range + SIZE_SET_KEYSPACE + 1) + ""; } - public static String generateKeySet() { + public static String generateKeyExisting() { return (Math.floor(Math.random() * SIZE_SET_KEYSPACE) + 1) + ""; } @@ -60,16 +63,12 @@ public static Map> getLatencies( return latencies; } - public static Pair getLatency(Map actions) { - - ChosenAction action = randomAction(); - Operation op = actions.get(action); - + public static Long getLatency(Operation op) { long before = System.nanoTime(); op.go(); long after = System.nanoTime(); - return Pair.of(action, after - before); + return after - before; } private static void addLatency(Operation op, ArrayList latencies) { @@ -162,40 +161,87 @@ public static void printResults(Map resultsMap) { } } - public static Pair measurePerformance( - Client client, int setDataSize, boolean async) { - Map> results = new HashMap<>(); - - String setValue = RandomStringUtils.randomAlphanumeric(setDataSize); - - // if (config.resultsFile.isPresent()) { - // try { - // config.resultsFile.get().write(client.getName() + " client Benchmarking: "); - // } catch (Exception ignored) { - // } - // } else { - // System.out.printf("%s client Benchmarking: %n", client.getName()); - // } - - Map actions = new HashMap<>(); - actions.put( - ChosenAction.GET_EXISTING, - async - ? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeySet()) - : () -> ((SyncClient) client).get(Benchmarking.generateKeySet())); - actions.put( - ChosenAction.GET_NON_EXISTING, - async - ? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeyGet()) - : () -> ((SyncClient) client).get(Benchmarking.generateKeyGet())); - actions.put( - ChosenAction.SET, - async - ? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), setValue) - : () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), setValue)); - - return Benchmarking.getLatency(actions); - - // return Benchmarking.calculateResults(results); + public static Pair measureSyncPerformance( + Client client, int setDataSize) { + + ChosenAction action = randomAction(); + switch (action) { + case GET_EXISTING: + return Pair.of(action, + Benchmarking.getLatency( + () -> ((SyncClient) client).get(Benchmarking.generateKeyExisting()) + ) + ); + case GET_NON_EXISTING: + return Pair.of(action, + Benchmarking.getLatency( + () -> ((SyncClient) client).get(Benchmarking.generateKeyNew()) + ) + ); + case SET: + return Pair.of(action, + Benchmarking.getLatency( + () -> ((SyncClient) client).set(Benchmarking.generateKeyExisting(), + RandomStringUtils.randomAlphanumeric(setDataSize)) + ) + ); + default: + throw new RuntimeException("Unexpected operation"); + } + } + + /** + * Setup action/tasks to measure and track performance + * @param client - async client to perform action + * @param setDataSize - request SET actions of data size + * @param futures - record async futures for gathering response + * @return a pair of latency actions + */ + public static Pair measureAsyncPerformance( + Client client, + int setDataSize, + int index, + List>> futures) { + + ChosenAction action = randomAction(); + switch (action) { + case GET_EXISTING: + return Pair.of(action, + Benchmarking.getLatency( + () -> futures.add(index, Pair.of( + ChosenAction.GET_EXISTING, + ((AsyncClient) client).asyncGet(Benchmarking.generateKeyExisting())) + ) + ) + ); + case GET_NON_EXISTING: + return Pair.of(action, + Benchmarking.getLatency( + () -> futures.add(index, Pair.of( + ChosenAction.GET_NON_EXISTING, + ((AsyncClient) client).asyncGet(Benchmarking.generateKeyNew())) + ) + ) + ); + case SET: + return Pair.of(action, + Benchmarking.getLatency( + () -> futures.add(index, Pair.of( + ChosenAction.SET, + ((AsyncClient) client).asyncSet(Benchmarking.generateKeyExisting(), + RandomStringUtils.randomAlphanumeric(setDataSize))) + ) + ) + ); + default: + throw new RuntimeException("Unexpected operation"); + } + } + + public static Pair measureAsyncFetchPerformance( + Client client, Future future) { + + Long latency = Benchmarking.getLatency(() -> ((AsyncClient) client).waitForResult(future)); + return Pair.of(ChosenAction.FETCH, latency); } } diff --git a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/ChosenAction.java b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/ChosenAction.java index 0bded99ddc..e71a0eb4dc 100644 --- a/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/ChosenAction.java +++ b/java/jabushka/benchmarks/src/main/java/javabushka/client/utils/ChosenAction.java @@ -3,5 +3,7 @@ public enum ChosenAction { GET_NON_EXISTING, GET_EXISTING, - SET + SET, + + FETCH }