Skip to content

Commit

Permalink
Measure async fetch performance with get/set
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Oct 3, 2023
1 parent b3fc45d commit b6d9430
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -31,6 +32,10 @@
/** Benchmarking app for reporting performance of various redis-rs Java-clients */
public class BenchmarkingApp {

private static int ITERATIONS_MULTIPLIER = 100000;
private static int ITERATIONS_MIN = 100000;
private static int ITERATIONS_MAX = 100; // TODO max 1000000

// main application entrypoint
public static void main(String[] args) {

Expand Down Expand Up @@ -198,6 +203,9 @@ private static void testIterateTasksAndClientSetGet(
Supplier<Client> clientSupplier, RunConfiguration runConfiguration, boolean async)
throws IOException {
System.out.printf("%n =====> %s <===== %n%n", clientSupplier.get().getName());
for (int concurrentTasks : runConfiguration.concurrentTasks) {

}
for (int clientCount : runConfiguration.clientCount) {
for (int concurrentTasks : runConfiguration.concurrentTasks) {
Client client = clientSupplier.get();
Expand All @@ -217,8 +225,9 @@ private static void testConcurrentClientSetGet(
int clientCount,
boolean async)
throws IOException {

// fetch a reasonable number of iterations based on the number of concurrent tasks
int iterations = Math.min(Math.max(100000, concurrentTasks * 10000), 10000000);
int iterations = Math.min(Math.max(ITERATIONS_MIN, concurrentTasks * ITERATIONS_MULTIPLIER), ITERATIONS_MAX);
AtomicInteger iterationCounter = new AtomicInteger(0);

// create clients
Expand All @@ -231,40 +240,98 @@ private static void testConcurrentClientSetGet(
clients.add(newClient);
}

// create runnable tasks
// create runnable tasks list and results map
List<Runnable> tasks = new ArrayList<>();
List<Pair<ChosenAction, Future<?>>> futures = new ArrayList<>(iterations);
List<Pair<ChosenAction, Long>> intermediateActionResults = new ArrayList<>(iterations);
Map<ChosenAction, ArrayList<Long>> actionResults = new HashMap<>();
actionResults.put(ChosenAction.GET_EXISTING, new ArrayList<>());
actionResults.put(ChosenAction.GET_NON_EXISTING, new ArrayList<>());
actionResults.put(ChosenAction.SET, new ArrayList<>());
// actionResults.put(ChosenAction.FETCH, new ArrayList<>());

// add one runnable task for each concurrentTask
// task will run a random action against a client, uniformly distributed amongst all clients
for (int concurrentTaskIndex = 0;
concurrentTaskIndex < concurrentTasks;
concurrentTaskIndex++) {
System.out.printf("concurrent task: %d/%d%n", concurrentTaskIndex + 1, concurrentTasks);
tasks.add(
() -> {
int iterationIncrement = iterationCounter.incrementAndGet();
int iterationIncrement = iterationCounter.get();
while (iterationIncrement < iterations) {
int clientIndex = iterationIncrement % clients.size();
System.out.printf(
"> iteration = %d/%d, client# = %d/%d%n",
iterationIncrement + 1, iterations, clientIndex + 1, clientCount);
// System.out.printf(
// "> iteration = %d/%d, client# = %d/%d%n",
// iterationIncrement + 1, iterations, clientIndex + 1, clientCount);

// operate and calculate tik-tok
Pair<ChosenAction, Long> result =
Benchmarking.measurePerformance(
clients.get(clientIndex), runConfiguration.dataSize, async);

// save tik-tok to actionResults
actionResults.get(result.getLeft()).add(result.getRight());
async ?
Benchmarking.measureAsyncPerformance(
clients.get(clientIndex),
runConfiguration.dataSize,
iterationIncrement,
futures)
:
Benchmarking.measureSyncPerformance(
clients.get(clientIndex),
runConfiguration.dataSize);

// save tik-tok to intermediate actionResults
intermediateActionResults.add(iterationIncrement, result);
iterationIncrement = iterationCounter.incrementAndGet();
}
});
}

// run all tasks asynchronously
tasks.stream()
.map(CompletableFuture::runAsync)
.forEach(
f -> {
try {
f.get();
} catch (Exception e) {
e.printStackTrace();
}
});

System.out.println("WAIT 10 SECONDS");
try {
Thread.sleep(1000L); // TODO update to 10 seconds
} catch (InterruptedException interruptedException) {
throw new RuntimeException("INTERRUPTED");
}

// now fetch results from futures
AtomicInteger fetchAsyncFuturesCounter = new AtomicInteger(0);
for (int concurrentTaskIndex = 0;
concurrentTaskIndex < concurrentTasks;
concurrentTaskIndex++) {
tasks.add(
() -> {
int iterationIncrement = fetchAsyncFuturesCounter.get();
while (iterationIncrement < iterations) {
Pair<ChosenAction, Future<?>> futurePair = futures.get(iterationIncrement);
int clientIndex = iterationIncrement % clients.size();
// System.out.printf(
// "> fetch = %d/%d, client# = %d/%d%n",
// iterationIncrement + 1, iterations, clientIndex + 1, clientCount);

Pair<ChosenAction, Long> result =
Benchmarking.measureAsyncFetchPerformance(
clients.get(clientIndex),
futurePair.getRight());

// save tik-tok to actionResults, make sure to add the intermediate result
Long intermediateResult = intermediateActionResults.get(iterationIncrement).getRight();
actionResults.get(futurePair.getLeft()).add(
result.getRight() + intermediateResult);
iterationIncrement = fetchAsyncFuturesCounter.incrementAndGet();
}
}
);
}

// run all tasks asynchronously
tasks.stream()
.map(CompletableFuture::runAsync)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package javabushka.client.utils;

import io.lettuce.core.RedisFuture;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javabushka.client.AsyncClient;
import javabushka.client.Client;
Expand All @@ -30,12 +33,12 @@ private static ChosenAction randomAction() {
return ChosenAction.GET_EXISTING;
}

public static String generateKeyGet() {
public static String generateKeyNew() {
int range = SIZE_GET_KEYSPACE - SIZE_SET_KEYSPACE;
return Math.floor(Math.random() * range + SIZE_SET_KEYSPACE + 1) + "";
}

public static String generateKeySet() {
public static String generateKeyExisting() {
return (Math.floor(Math.random() * SIZE_SET_KEYSPACE) + 1) + "";
}

Expand All @@ -60,16 +63,12 @@ public static Map<ChosenAction, ArrayList<Long>> getLatencies(
return latencies;
}

public static Pair<ChosenAction, Long> getLatency(Map<ChosenAction, Operation> actions) {

ChosenAction action = randomAction();
Operation op = actions.get(action);

public static Long getLatency(Operation op) {
long before = System.nanoTime();
op.go();
long after = System.nanoTime();

return Pair.of(action, after - before);
return after - before;
}

private static void addLatency(Operation op, ArrayList<Long> latencies) {
Expand Down Expand Up @@ -162,40 +161,87 @@ public static void printResults(Map<ChosenAction, LatencyResults> resultsMap) {
}
}

public static Pair<ChosenAction, Long> measurePerformance(
Client client, int setDataSize, boolean async) {
Map<ChosenAction, ArrayList<Long>> results = new HashMap<>();

String setValue = RandomStringUtils.randomAlphanumeric(setDataSize);

// if (config.resultsFile.isPresent()) {
// try {
// config.resultsFile.get().write(client.getName() + " client Benchmarking: ");
// } catch (Exception ignored) {
// }
// } else {
// System.out.printf("%s client Benchmarking: %n", client.getName());
// }

Map<ChosenAction, Benchmarking.Operation> actions = new HashMap<>();
actions.put(
ChosenAction.GET_EXISTING,
async
? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeySet())
: () -> ((SyncClient) client).get(Benchmarking.generateKeySet()));
actions.put(
ChosenAction.GET_NON_EXISTING,
async
? () -> ((AsyncClient) client).asyncGet(Benchmarking.generateKeyGet())
: () -> ((SyncClient) client).get(Benchmarking.generateKeyGet()));
actions.put(
ChosenAction.SET,
async
? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), setValue)
: () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), setValue));

return Benchmarking.getLatency(actions);

// return Benchmarking.calculateResults(results);
public static Pair<ChosenAction, Long> measureSyncPerformance(
Client client, int setDataSize) {

ChosenAction action = randomAction();
switch (action) {
case GET_EXISTING:
return Pair.of(action,
Benchmarking.getLatency(
() -> ((SyncClient) client).get(Benchmarking.generateKeyExisting())
)
);
case GET_NON_EXISTING:
return Pair.of(action,
Benchmarking.getLatency(
() -> ((SyncClient) client).get(Benchmarking.generateKeyNew())
)
);
case SET:
return Pair.of(action,
Benchmarking.getLatency(
() -> ((SyncClient) client).set(Benchmarking.generateKeyExisting(),
RandomStringUtils.randomAlphanumeric(setDataSize))
)
);
default:
throw new RuntimeException("Unexpected operation");
}
}

/**
* Setup action/tasks to measure and track performance
* @param client - async client to perform action
* @param setDataSize - request SET actions of data size
* @param futures - record async futures for gathering response
* @return a pair of latency actions
*/
public static Pair<ChosenAction, Long> measureAsyncPerformance(
Client client,
int setDataSize,
int index,
List<Pair<ChosenAction, Future<?>>> futures) {

ChosenAction action = randomAction();
switch (action) {
case GET_EXISTING:
return Pair.of(action,
Benchmarking.getLatency(
() -> futures.add(index, Pair.of(
ChosenAction.GET_EXISTING,
((AsyncClient) client).asyncGet(Benchmarking.generateKeyExisting()))
)
)
);
case GET_NON_EXISTING:
return Pair.of(action,
Benchmarking.getLatency(
() -> futures.add(index, Pair.of(
ChosenAction.GET_NON_EXISTING,
((AsyncClient) client).asyncGet(Benchmarking.generateKeyNew()))
)
)
);
case SET:
return Pair.of(action,
Benchmarking.getLatency(
() -> futures.add(index, Pair.of(
ChosenAction.SET,
((AsyncClient) client).asyncSet(Benchmarking.generateKeyExisting(),
RandomStringUtils.randomAlphanumeric(setDataSize)))
)
)
);
default:
throw new RuntimeException("Unexpected operation");
}
}

public static Pair<ChosenAction, Long> measureAsyncFetchPerformance(
Client client, Future<?> future) {

Long latency = Benchmarking.getLatency(() -> ((AsyncClient) client).waitForResult(future));
return Pair.of(ChosenAction.FETCH, latency);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
public enum ChosenAction {
GET_NON_EXISTING,
GET_EXISTING,
SET
SET,

FETCH
}

0 comments on commit b6d9430

Please sign in to comment.