Skip to content

Commit

Permalink
Rename2
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Nov 17, 2023
1 parent f0f0e79 commit 06ca6b4
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public void connectToRedis(ConnectionSettings connectionSettings) {
@Override
public Future<Response> asyncConnectToRedis(ConnectionSettings connectionSettings) {
return testClient.asyncConnectToRedis(
connectionSettings.host, connectionSettings.port, connectionSettings.useSsl, connectionSettings.clusterMode);
connectionSettings.host,
connectionSettings.port,
connectionSettings.useSsl,
connectionSettings.clusterMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package javababushka.benchmarks.utils;

import static java.util.concurrent.CompletableFuture.runAsync;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javababushka.benchmarks.BenchmarkingApp;
import javababushka.benchmarks.clients.AsyncClient;
import javababushka.benchmarks.clients.Client;
import javababushka.benchmarks.clients.SyncClient;
import org.apache.commons.lang3.tuple.Pair;

import static java.util.concurrent.CompletableFuture.runAsync;

/** Class to calculate latency on client-actions */
public class Benchmarking {
static final double PROB_GET = 0.8;
Expand Down Expand Up @@ -151,91 +149,93 @@ public static void printResults(
}

public static void testClientSetGet(
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
Supplier<Client> clientCreator, BenchmarkingApp.RunConfiguration config, boolean async) {
for (int concurrentNum : config.concurrentTasks) {
int iterations =
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
Math.min(Math.max(LATENCY_MIN, concurrentNum * LATENCY_MULTIPLIER), LATENCY_MAX);
for (int clientCount : config.clientCount) {
for (int dataSize : config.dataSize) {
AtomicInteger iterationCounter = new AtomicInteger(0);

Map<ChosenAction, List<Long>> actionResults =
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
ChosenAction.GET_NON_EXISTING, new ArrayList<>(),
ChosenAction.SET, new ArrayList<>());
Map.of(
ChosenAction.GET_EXISTING, new ArrayList<>(),
ChosenAction.GET_NON_EXISTING, new ArrayList<>(),
ChosenAction.SET, new ArrayList<>());
List<Runnable> tasks = new ArrayList<>();

// create clients
List<Client> clients = new LinkedList<>();
for (int cc = 0; cc < clientCount; cc++) {
Client newClient = clientCreator.get();
newClient.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls, config.clusterModeEnabled));
newClient.connectToRedis(
new ConnectionSettings(
config.host, config.port, config.tls, config.clusterModeEnabled));
clients.add(newClient);
}

String clientName = clients.get(0).getName();

System.out.printf(
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientName, clientCount, concurrentNum, dataSize);
"%n =====> %s <===== %d clients %d concurrent %d data %n%n",
clientName, clientCount, concurrentNum, dataSize);

for (int taskNum = 0; taskNum < concurrentNum; taskNum++) {
final int taskNumDebugging = taskNum;
tasks.add(
() -> {
int iterationIncrement = iterationCounter.getAndIncrement();
int clientIndex = iterationIncrement % clients.size();
() -> {
int iterationIncrement = iterationCounter.getAndIncrement();
int clientIndex = iterationIncrement % clients.size();

if (config.debugLogging) {
System.out.printf(
"%n concurrent = %d/%d, client# = %d/%d%n",
taskNumDebugging, concurrentNum, clientIndex + 1, clientCount);
}
while (iterationIncrement < iterations) {
if (config.debugLogging) {
System.out.printf(
"> task = %d, iteration = %d/%d, client# = %d/%d%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
var actions = getActionMap(clients.get(clientIndex), dataSize, async);
// operate and calculate tik-tok
Pair<ChosenAction, Long> result = measurePerformance(actions);
if (config.debugLogging) {
System.out.printf(
"> task = %d, iteration = %d/%d, client# = %d/%d - DONE%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
if (result != null) {
actionResults.get(result.getLeft()).add(result.getRight());
}
iterationIncrement = iterationCounter.getAndIncrement();
}
});
if (config.debugLogging) {
System.out.printf(
"%n concurrent = %d/%d, client# = %d/%d%n",
taskNumDebugging, concurrentNum, clientIndex + 1, clientCount);
}
while (iterationIncrement < iterations) {
if (config.debugLogging) {
System.out.printf(
"> task = %d, iteration = %d/%d, client# = %d/%d%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
var actions = getActionMap(clients.get(clientIndex), dataSize, async);
// operate and calculate tik-tok
Pair<ChosenAction, Long> result = measurePerformance(actions);
if (config.debugLogging) {
System.out.printf(
"> task = %d, iteration = %d/%d, client# = %d/%d - DONE%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
if (result != null) {
actionResults.get(result.getLeft()).add(result.getRight());
}
iterationIncrement = iterationCounter.getAndIncrement();
}
});
}
if (config.debugLogging) {
System.out.printf("%s client Benchmarking: %n", clientName);
System.out.printf(
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
"===> concurrentNum = %d, clientNum = %d, tasks = %d%n",
concurrentNum, clientCount, tasks.size());
}
long before = System.nanoTime();
ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum);

// create threads and add them to the async pool.
// This will start execution of all the concurrent tasks.
List<CompletableFuture> asyncTasks =
tasks.stream()
.map((runnable) -> runAsync(runnable, threadPool))
.collect(Collectors.toList());
tasks.stream()
.map((runnable) -> runAsync(runnable, threadPool))
.collect(Collectors.toList());
// close pool and await for tasks to complete
threadPool.shutdown();
while (!threadPool.isTerminated()) {
Expand All @@ -248,28 +248,28 @@ public static void testClientSetGet(
}
// wait for all futures to complete
asyncTasks.forEach(
future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
long after = System.nanoTime();

clients.forEach(Client::closeConnection);

var calculatedResults = calculateResults(actionResults);
if (config.resultsFile.isPresent()) {
JsonWriter.Write(
calculatedResults,
config.resultsFile.get(),
config.clusterModeEnabled,
dataSize,
clientName,
clientCount,
concurrentNum,
iterations / ((after - before) / TPS_NORMALIZATION));
calculatedResults,
config.resultsFile.get(),
config.clusterModeEnabled,
dataSize,
clientName,
clientCount,
concurrentNum,
iterations / ((after - before) / TPS_NORMALIZATION));
}
printResults(calculatedResults, (after - before) / TPS_NORMALIZATION, iterations);
try {
Expand Down
3 changes: 2 additions & 1 deletion java/client/src/main/java/javababushka/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ public Future<String> asyncGet(String key) {
.thenApply(
response ->
response.getRespPointer() != 0
? BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()).toString()
? BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer())
.toString()
: null);
}

Expand Down
4 changes: 2 additions & 2 deletions java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject {
}

#[no_mangle]
pub extern "system" fn Java_javababushka_RustWrapper_valueFromPointer<'local>(
pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_valueFromPointer<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
pointer: jlong
Expand All @@ -48,7 +48,7 @@ pub extern "system" fn Java_javababushka_RustWrapper_valueFromPointer<'local>(
}

#[no_mangle]
pub extern "system" fn Java_javababushka_RustWrapper_startSocketListenerExternal<'local>(
pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_startSocketListenerExternal<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>
) -> JObject<'local> {
Expand Down

0 comments on commit 06ca6b4

Please sign in to comment.