Skip to content

Commit

Permalink
Add concurrent tasks and clients loop against iteration max tasks
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Sep 27, 2023
1 parent ef41428 commit 0b7df6f
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -15,11 +19,15 @@
import javabushka.client.lettuce.LettuceAsyncClient;
import javabushka.client.lettuce.LettuceClient;
import javabushka.client.utils.Benchmarking;
import javabushka.client.utils.ChosenAction;
import javabushka.client.utils.ConnectionSettings;
import javabushka.client.utils.LatencyResults;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.tuple.Pair;

/** Benchmarking app for reporting performance of various redis-rs Java-clients */
public class BenchmarkingApp {
Expand All @@ -40,24 +48,28 @@ public static void main(String[] args) {
System.err.println("Parsing failed. Reason: " + exp.getMessage());
}

for (ClientName client : runConfiguration.clients) {
switch (client) {
case JEDIS:
testClientSetGet(JedisClient::new, runConfiguration, false);
break;
case JEDIS_ASYNC:
testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true);
break;
case LETTUCE:
testClientSetGet(LettuceClient::new, runConfiguration, false);
break;
case LETTUCE_ASYNC:
testClientSetGet(LettuceAsyncClient::new, runConfiguration, true);
break;
case BABUSHKA:
System.out.println("Babushka not yet configured");
break;
try {
for (ClientName client : runConfiguration.clients) {
switch (client) {
case JEDIS:
testIterateTasksAndClientSetGet(JedisClient::new, runConfiguration, false);
break;
case JEDIS_ASYNC:
testIterateTasksAndClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true);
break;
case LETTUCE:
testIterateTasksAndClientSetGet(LettuceClient::new, runConfiguration, false);
break;
case LETTUCE_ASYNC:
testIterateTasksAndClientSetGet(LettuceAsyncClient::new, runConfiguration, true);
break;
case BABUSHKA:
System.out.println("Babushka not yet configured");
break;
}
}
} catch (IOException ioException) {
System.out.println("Error writing results to file");
}

if (runConfiguration.resultsFile.isPresent()) {
Expand Down Expand Up @@ -169,7 +181,7 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce

// check if it's the correct format
if (!clientCount.matches("\\d+(\\s+\\d+)?")) {
throw new ParseException("Invalid concurrentTasks");
throw new ParseException("Invalid clientCount");
}
// split the string into a list of integers
runConfiguration.clientCount =
Expand All @@ -183,49 +195,103 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce
return runConfiguration;
}

private static void testClientSetGet(
Supplier<Client> clientCreator, RunConfiguration runConfiguration, boolean async) {
System.out.printf("%n =====> %s <===== %n%n", clientCreator.get().getName());
for (int concurrentNum : runConfiguration.concurrentTasks) {
for (int clientNum : runConfiguration.clientCount) {

List<Runnable> tasks = new ArrayList<>();

for (int i = 0; i < concurrentNum; ) {
for (int j = 0; j < clientNum; j++) {
i++;
int finalI = i;
int finalJ = j;
tasks.add(
() -> {
System.out.printf(
"%n concurrent = %d/%d, client# = %d/%d%n",
finalI, concurrentNum, finalJ + 1, clientNum);
Benchmarking.printResults(
Benchmarking.measurePerformance(
clientCreator.get(), runConfiguration, async));
});
}
}
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientNum, tasks.size());
tasks.stream()
.map(CompletableFuture::runAsync)
.forEach(
f -> {
try {
f.get();
} catch (Exception e) {
e.printStackTrace();
}
});
// call testConcurrentClientSetGet for each concurrentTask/clientCount pairing
private static void testIterateTasksAndClientSetGet(Supplier<Client> clientSupplier,
RunConfiguration runConfiguration,
boolean async) throws IOException {
System.out.printf("%n =====> %s <===== %n%n", clientSupplier.get().getName());
for (int clientCount : runConfiguration.clientCount) {
for (int concurrentTasks : runConfiguration.concurrentTasks) {
Client client = clientSupplier.get();
testConcurrentClientSetGet(
clientSupplier,
runConfiguration,
concurrentTasks,
clientCount,
async);
}
}

System.out.println();
}

// call one test scenario: with a number of concurrent threads against clientCount number
// of clients
private static void testConcurrentClientSetGet(Supplier<Client> clientSupplier,
RunConfiguration runConfiguration,
int concurrentTasks,
int clientCount,
boolean async) throws IOException {
// fetch a reasonable number of iterations based on the number of concurrent tasks
int iterations = Math.min(Math.max(100000, concurrentTasks * 10000), 10000000);
AtomicInteger iterationCounter = new AtomicInteger(0);

// create clients
List<Client> clients = new LinkedList<>();
for(int i = 0; i < clientCount; i++) {
Client newClient = clientSupplier.get();
newClient.connectToRedis(new ConnectionSettings(
runConfiguration.host,
runConfiguration.port,
runConfiguration.tls));
clients.add(newClient);
}

// create runnable tasks
List<Runnable> tasks = new ArrayList<>();
Map<ChosenAction, ArrayList<Long>> actionResults = new HashMap<>();
actionResults.put(ChosenAction.GET_EXISTING, new ArrayList<>());
actionResults.put(ChosenAction.GET_NON_EXISTING, new ArrayList<>());
actionResults.put(ChosenAction.SET, new ArrayList<>());

// add one runnable task for each concurrentTask
// task will run a random action against a client, uniformly distributed amongst all clients
for(int concurrentTaskIndex = 0; concurrentTaskIndex < concurrentTasks; concurrentTaskIndex++) {
System.out.printf("concurrent task: %d/%d%n", concurrentTaskIndex+1, concurrentTasks);
tasks.add(
() -> {
int iterationIncrement = iterationCounter.incrementAndGet();
while (iterationIncrement < iterations) {
int clientIndex = iterationIncrement % clients.size();
System.out.printf(
"> iteration = %d/%d, client# = %d/%d%n",
iterationIncrement+1,
iterations,
clientIndex+1,
clientCount);

// operate and calculate tik-tok
Pair<ChosenAction, Long> result = Benchmarking.measurePerformance(
clients.get(clientIndex),
runConfiguration.dataSize,
async
);

// save tik-tok to actionResults
actionResults.get(result.getLeft()).add(result.getRight());
iterationIncrement = iterationCounter.incrementAndGet();
}
});
}

// run all tasks asynchronously
tasks.stream()
.map(CompletableFuture::runAsync)
.forEach(
f -> {
try {
f.get();
} catch (Exception e) {
e.printStackTrace();
}
});

// use results file to stdout/print
Benchmarking.printResults(Benchmarking.calculateResults(actionResults), runConfiguration.resultsFile);

// close connections
clients.forEach(c -> c.closeConnection());
}

public enum ClientName {
JEDIS("Jedis"),
JEDIS_ASYNC("Jedis async"),
Expand Down Expand Up @@ -277,7 +343,7 @@ public RunConfiguration() {
};
host = "localhost";
port = 6379;
clientCount = new int[] {1};
clientCount = new int[] {1, 2};
tls = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import javabushka.client.Client;
import javabushka.client.SyncClient;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;

public class Benchmarking {
static final double PROB_GET = 0.8;
Expand Down Expand Up @@ -60,6 +61,20 @@ public static Map<ChosenAction, ArrayList<Long>> getLatencies(
return latencies;
}

public static Pair<ChosenAction, Long> getLatency(
Map<ChosenAction, Operation> actions) {

ChosenAction action = randomAction();
Operation op = actions.get(action);

long before = System.nanoTime();
op.go();
long after = System.nanoTime();

return Pair.of(action, after - before);

}

private static void addLatency(Operation op, ArrayList<Long> latencies) {
long before = System.nanoTime();
op.go();
Expand Down Expand Up @@ -150,21 +165,21 @@ public static void printResults(Map<ChosenAction, LatencyResults> resultsMap) {
}
}

public static Map<ChosenAction, LatencyResults> measurePerformance(
Client client, BenchmarkingApp.RunConfiguration config, boolean async) {
client.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls));
public static Pair<ChosenAction, Long> measurePerformance(
Client client, int setDataSize, boolean async) {
Map<ChosenAction, ArrayList<Long>> results = new HashMap<>();

int iterations = 100;
String value = RandomStringUtils.randomAlphanumeric(config.dataSize);

if (config.resultsFile.isPresent()) {
try {
config.resultsFile.get().write(client.getName() + " client Benchmarking: ");
} catch (Exception ignored) {
}
} else {
System.out.printf("%s client Benchmarking: %n", client.getName());
}
String setValue = RandomStringUtils.randomAlphanumeric(setDataSize);

// if (config.resultsFile.isPresent()) {
// try {
// config.resultsFile.get().write(client.getName() + " client Benchmarking: ");
// } catch (Exception ignored) {
// }
// } else {
// System.out.printf("%s client Benchmarking: %n", client.getName());
// }

Map<ChosenAction, Benchmarking.Operation> actions = new HashMap<>();
actions.put(
Expand All @@ -180,11 +195,11 @@ public static Map<ChosenAction, LatencyResults> measurePerformance(
actions.put(
ChosenAction.SET,
async
? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), value)
: () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), value));
? () -> ((AsyncClient) client).asyncSet(Benchmarking.generateKeySet(), setValue)
: () -> ((SyncClient) client).set(Benchmarking.generateKeySet(), setValue));

var results = Benchmarking.calculateResults(Benchmarking.getLatencies(iterations, actions));
client.closeConnection();
return results;
return Benchmarking.getLatency(actions);

// return Benchmarking.calculateResults(results);
}
}

0 comments on commit 0b7df6f

Please sign in to comment.