Skip to content

Commit

Permalink
Clean up latency and add error checking
Browse files Browse the repository at this point in the history
Signed-off-by: acarbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Oct 20, 2023
1 parent 73c448f commit 79459f5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
public class JniSyncClient implements SyncClient {

private static int MAX_TIMEOUT = 1000;
private static int TIMEOUT_INTERVAL = 100;

private RedisClient client;

private SocketChannel channel;

private boolean isChannelWriting = false;

@Override
public void connectToRedis() {
connectToRedis(new ConnectionSettings("localhost", 6379, false));
Expand All @@ -40,11 +39,11 @@ public void connectToRedis(ConnectionSettings connectionSettings) {
RedisClient.startSocketListenerExternal(client);

int timeout = 0;
int maxTimeout = 1000;
int maxTimeout = MAX_TIMEOUT;
while (client.socketPath == null && timeout < maxTimeout) {
timeout++;
timeout += TIMEOUT_INTERVAL;
try {
Thread.sleep(250);
Thread.sleep(TIMEOUT_INTERVAL);
} catch (InterruptedException exception) {
// ignored
}
Expand Down Expand Up @@ -98,7 +97,8 @@ public void connectToRedis(ConnectionSettings connectionSettings) {
.setDatabaseId(0)
.build();

makeConnection(request);
makeRedisRequest(request.toByteArray());
receiveRedisResponse();
}

@Override
Expand All @@ -119,8 +119,8 @@ public void set(String key, String value) {
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes))
.build();

ResponseOuterClass.Response response = makeRedisRequest(request);
// nothing to do with the response
makeRedisRequest(request.toByteArray());
receiveRedisResponse();
}

@Override
Expand All @@ -139,7 +139,8 @@ public String get(String key) {
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes))
.build();

ResponseOuterClass.Response response = makeRedisRequest(getStringRequest);
makeRedisRequest(getStringRequest.toByteArray());
ResponseOuterClass.Response response = receiveRedisResponse();
return response.toString();
}

Expand Down Expand Up @@ -195,58 +196,76 @@ private static Byte[] varintBytes(int value) {
return output.toArray(arr);
}

private static byte[] readSocketMessage(SocketChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead <= 0) {
return null;
}

byte[] bytes = new byte[bytesRead];
buffer.flip();
buffer.get(bytes);
return bytes;
}

private ResponseOuterClass.Response makeConnection(
connection_request.ConnectionRequestOuterClass.ConnectionRequest request) {
Byte[] varint = varintBytes(request.toByteArray().length);
private void makeRedisRequest(
byte[] request) {
Byte[] varint = varintBytes(request.length);

// System.out.println("Request: \n" + request.toString());
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
for (Byte b : varint) {
buffer.put(b);
}
buffer.put(request.toByteArray());
buffer.put(request);
buffer.flip();
try {
synchronized (buffer) {
// TODO: check that this is the most performant mutex solution
synchronized (channel) {
while (buffer.hasRemaining()) {
channel.write(buffer);
}
}
} catch (IOException ioException) {
// ignore...
}
}

private ResponseOuterClass.Response receiveRedisResponse() {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);

ResponseOuterClass.Response response = null;
int timeout = 0;
int bytesRead = 0;
try {
byte[] responseBuffer = readSocketMessage(channel);
while (responseBuffer == null && timeout < MAX_TIMEOUT) {
Thread.sleep(250);
timeout++;
responseBuffer = readSocketMessage(channel);
synchronized (channel) {
bytesRead = channel.read(readBuffer);
while (bytesRead <= 0) {
timeout += TIMEOUT_INTERVAL;
if (timeout > MAX_TIMEOUT) {
throw new RuntimeException("Max timeout reached");
}

bytesRead = channel.read(readBuffer);
Thread.sleep(TIMEOUT_INTERVAL);
}
}

response = decodeMessage(responseBuffer);
} catch (IOException | InterruptedException exception) {
// ignore...
}
byte[] bytes = new byte[bytesRead];
readBuffer.flip();
readBuffer.get(bytes);
ResponseOuterClass.Response response = null;
try {
response = decodeMessage(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return response;
}

private static byte[] readSocketMessage(SocketChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead <= 0) {
return null;
}

byte[] bytes = new byte[bytesRead];
buffer.flip();
buffer.get(bytes);
return bytes;
}

private ResponseOuterClass.Response makeRedisRequest(
RedisRequestOuterClass.RedisRequest request) {
Byte[] varint = varintBytes(request.toByteArray().length);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package javababushka.benchmarks.utils;

import static java.util.concurrent.CompletableFuture.runAsync;

import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -11,6 +13,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -101,20 +105,13 @@ public static Map<ChosenAction, LatencyResults> calculateResults(
ArrayList<Long> latencies = entry.getValue();

if (latencies.size() == 0) {
results.put(
action,
new LatencyResults(
0,
0,
0,
0,
0,
0
));
results.put(action, new LatencyResults(0, 0, 0, 0, 0, 0));
} else {
Double avgLatency = latencies.size() <= 0 ? 0 :
latencies.stream().collect(Collectors.summingLong(Long::longValue))
/ Double.valueOf(latencies.size());
Double avgLatency =
latencies.size() <= 0
? 0
: latencies.stream().collect(Collectors.summingLong(Long::longValue))
/ Double.valueOf(latencies.size());

Collections.sort(latencies);
results.put(
Expand All @@ -125,8 +122,7 @@ public static Map<ChosenAction, LatencyResults> calculateResults(
percentile(latencies, 90),
percentile(latencies, 99),
stdDeviation(latencies, avgLatency),
latencies.size()
));
latencies.size()));
}
}

Expand Down Expand Up @@ -179,8 +175,7 @@ public static void printResults(Map<ChosenAction, LatencyResults> resultsMap) {
action + " p99 latency in ms: " + results.p99Latency / LATENCY_NORMALIZATION);
System.out.println(
action + " std dev in ms: " + results.stdDeviation / LATENCY_NORMALIZATION);
System.out.println(
action + " total hits: " + results.totalHits);
System.out.println(action + " total hits: " + results.totalHits);
}
}

Expand Down Expand Up @@ -225,16 +220,28 @@ public static void testClientSetGet(
while (iterationIncrement < iterations) {
if (config.debugLogging) {
System.out.printf(
"> iteration = %d/%d, client# = %d/%d%n",
iterationIncrement + 1, iterations, clientIndex + 1, clientCount);
"> task = %d, iteration = %d/%d, client# = %d/%d%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
// operate and calculate tik-tok
Pair<ChosenAction, Long> result =
measurePerformance(clients.get(clientIndex), dataSize, async);
if (config.debugLogging) {
System.out.printf(
"> task = %d, iteration = %d/%d, client# = %d/%d - DONE%n",
taskNumDebugging,
iterationIncrement + 1,
iterations,
clientIndex + 1,
clientCount);
}
if (result != null) {
actionResults.get(result.getLeft()).add(result.getRight());
}

iterationIncrement = iterationCounter.getAndIncrement();
}
});
Expand All @@ -246,16 +253,21 @@ public static void testClientSetGet(
concurrentNum, clientCount, tasks.size());
}
long before = System.nanoTime();
ExecutorService threadPool = Executors.newFixedThreadPool(concurrentNum);

// create threads and add them to the asyncpool.
// create threads and add them to the async pool.
// This will start execution of all the concurrent tasks.
List<CompletableFuture> asyncTasks =
tasks.stream().map(CompletableFuture::runAsync).collect(Collectors.toList());
try {
// wait 1 second before waiting for threads to complete
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
tasks.stream().map((runnable) -> runAsync(runnable, threadPool)).collect(Collectors.toList());
// close pool and await for tasks to complete
threadPool.shutdown();
while (!threadPool.isTerminated()) {
try {
// wait 1 second before waiting for threads to complete
Thread.sleep(100);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
// wait for all futures to complete
asyncTasks.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ public class LatencyResults {
public final int totalHits;

public LatencyResults(
double avgLatency, long p50Latency, long p90Latency, long p99Latency, double stdDeviation, int totalHits) {
double avgLatency,
long p50Latency,
long p90Latency,
long p99Latency,
double stdDeviation,
int totalHits) {
this.avgLatency = avgLatency;
this.p50Latency = p50Latency;
this.p90Latency = p90Latency;
Expand Down

0 comments on commit 79459f5

Please sign in to comment.