Skip to content

Commit

Permalink
Clean up sync calls; add close connection
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Dec 2, 2023
1 parent 537d538 commit 2f81317
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package babushka.benchmarks.clients.babushka;

import babushka.api.Awaiter;
import babushka.api.Client;
import babushka.api.Commands;
import babushka.api.Connection;
Expand Down Expand Up @@ -38,7 +39,7 @@ public void connectToRedis(ConnectionSettings connectionSettings) {

@Override
public Future<String> asyncConnectToRedis(ConnectionSettings connectionSettings) {
return connection.asyncConnectToRedis(
return connection.connectToRedis(
connectionSettings.host,
connectionSettings.port,
connectionSettings.useSsl,
Expand All @@ -47,21 +48,21 @@ public Future<String> asyncConnectToRedis(ConnectionSettings connectionSettings)

@Override
public Future<String> asyncSet(String key, String value) {
return asyncCommands.asyncSet(key, value);
return asyncCommands.set(key, value);
}

@Override
public Future<String> asyncGet(String key) {
return asyncCommands.asyncGet(key);
return asyncCommands.get(key);
}

@Override
public void set(String key, String value) {
asyncCommands.set(key, value);
Awaiter.await(asyncCommands.set(key, value));
}

@Override
public String get(String key) {
return asyncCommands.get(key);
return Awaiter.await(asyncCommands.get(key));
}
}
16 changes: 11 additions & 5 deletions java/client/src/main/java/babushka/api/Awaiter.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package babushka.api;

import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Awaiter {
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000;

/** Get the future result with default timeout. */
public static <T> T await(Future<T> future) {
public static <T> T await(CompletableFuture<T> future) {
return await(future, DEFAULT_TIMEOUT_MILLISECONDS);
}

/** Get the future result with given timeout in ms. */
public static <T> T await(Future<T> future, long timeout) {
public static <T> T await(CompletableFuture<T> future, long timeout) {
try {
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception ignored) {
return null;
} catch (ExecutionException | InterruptedException | TimeoutException e) {
// TODO: handle exceptions:
// InterruptedException: should shutdown the client service
// TimeoutException: should be propagated with an error message thrown
// ExecutionException: throw runtime exception
throw new RuntimeException(e);
}
}
}
29 changes: 3 additions & 26 deletions java/client/src/main/java/babushka/api/Commands.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package babushka.api;

import babushka.managers.CommandManager;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;

public class Commands {

Expand All @@ -11,37 +11,14 @@ public Commands(CommandManager commandManager) {
this.commandManager = commandManager;
}

/**
* Sync (blocking) set. See async option in {@link #asyncSet}.<br>
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>.
*
* @param key The key name
* @param value The value to set
*/
public void set(String key, String value) {
Awaiter.await(asyncSet(key, value));
// TODO parse response and rethrow an exception if there is an error
}

/**
* Sync (blocking) get. See async option in {@link #asyncGet}.<br>
* See <a href="https://redis.io/commands/get/">REDIS docs for GET</a>.
*
* @param key The key name
*/
public String get(String key) {
return Awaiter.await(asyncGet(key));
// TODO support non-strings
}

/**
* Async (non-blocking) set. See sync option in {@link #set}.<br>
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>.
*
* @param key The key name
* @param value The value to set
*/
public Future<String> asyncSet(String key, String value) {
public CompletableFuture<String> set(String key, String value) {
return commandManager.set(key, value);
}

Expand All @@ -51,7 +28,7 @@ public Future<String> asyncSet(String key, String value) {
*
* @param key The key name
*/
public Future<String> asyncGet(String key) {
public CompletableFuture<String> get(String key) {
return commandManager.get(key);
}
}
23 changes: 1 addition & 22 deletions java/client/src/main/java/babushka/api/Connection.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package babushka.api;

import static babushka.api.Awaiter.await;

import babushka.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;

Expand All @@ -17,20 +15,6 @@ public ConnectionManager getConnectionManager() {
return connectionManager;
}

/**
* Sync (blocking) connect to REDIS. See async option in {@link #asyncConnectToRedis}.
*
* @param host Server address
* @param port Server port
* @param useSsl true if communication with the server or cluster should use Transport Level
* Security
* @param clusterMode true if REDIS instance runs in the cluster mode
*/
// TODO support configuration object which holds more parameters (e.g. multiple addresses, etc)
public void connectToRedis(String host, int port, boolean useSsl, boolean clusterMode) {
await(asyncConnectToRedis(host, port, useSsl, clusterMode));
}

/**
* Async (non-blocking) connect to REDIS. See sync option in {@link #connectToRedis}.
*
Expand All @@ -41,13 +25,8 @@ public void connectToRedis(String host, int port, boolean useSsl, boolean cluste
* @param clusterMode true if REDIS instance runs in the cluster mode
*/
// TODO support configuration object which holds more parameters (e.g. multiple addresses, etc)
public CompletableFuture<String> asyncConnectToRedis(
public CompletableFuture<String> connectToRedis(
String host, int port, boolean useSsl, boolean clusterMode) {
return connectionManager.connectToRedis(host, port, useSsl, clusterMode);
}

/** Close all connections and release resources */
public void closeConnection() {
connectionManager.closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void close() {
group.shutdownGracefully();
INSTANCE = null;
// TODO should we reply in uncompleted futures?
CallbackManager.shutdownGracefully();
CallbackManager.connectionRequests.clear();
CallbackManager.responses.clear();
}
Expand Down
13 changes: 13 additions & 0 deletions java/client/src/main/java/babushka/managers/CallbackManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
/** Holder for resources owned by {@link CommandManager} and used by {@link ReadHandler}. */
public class CallbackManager {

// TODO: let's make these non-static class variables

/**
* Storage of Futures to handle responses. Map key is callback id, which starts from 1.<br>
* Each future is a promise for every submitted by user request.
Expand Down Expand Up @@ -56,6 +58,17 @@ public void registerConnection(CompletableFuture<Response> future) {
CallbackManager.connectionRequests.add(future);
}

public static void shutdownGracefully() {
connectionRequests.forEach(
future -> {
future.completeExceptionally(new InterruptedException());
});
responses.forEach(
(callbackId, future) -> {
future.completeExceptionally(new InterruptedException());
});
}

/**
* Create a unique callback ID (request ID) and a corresponding registered future for the
* response.<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public CompletableFuture<String> connectToRedis(
* Close socket connection and drop all channels TODO: provide feedback that the connection was
* properly closed
*/
public void closeConnection() {
public CompletableFuture<String> closeConnection() {
socketConnection.close();
return new CompletableFuture<String>();
}
}
51 changes: 5 additions & 46 deletions java/client/src/test/java/babushka/api/CommandsTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package babushka.api;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -11,7 +9,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -30,7 +27,7 @@ public void setUp() {
}

@Test
public void test_asyncGet_success() throws ExecutionException, InterruptedException {
public void get_success() throws ExecutionException, InterruptedException {
// setup
// TODO: randomize keys
String key = "testKey";
Expand All @@ -40,7 +37,7 @@ public void test_asyncGet_success() throws ExecutionException, InterruptedExcept
when(commandsManager.get(eq(key))).thenReturn(testResponse);

// exercise
Future<String> response = service.asyncGet(key);
Future<String> response = service.get(key);
String payload = response.get();

// verify
Expand All @@ -50,29 +47,10 @@ public void test_asyncGet_success() throws ExecutionException, InterruptedExcept
// teardown
}

// TODO: test_asyncGet_InterruptedException and ExecutionException
// TODO: test_get_InterruptedException and ExecutionException

@Test
public void test_get_success() throws ExecutionException, InterruptedException, TimeoutException {
// setup
// TODO: randomize keys
String key = "testKey";
String value = "testValue";
CompletableFuture<String> testResponse = mock(CompletableFuture.class);
when(testResponse.get(anyLong(), any())).thenReturn(value);
when(commandsManager.get(eq(key))).thenReturn(testResponse);

// exercise
String payload = service.get(key);

// verify
assertEquals(value, payload);

// teardown
}

@Test
public void test_asyncSet_success() throws ExecutionException, InterruptedException {
public void set_success() throws ExecutionException, InterruptedException {
// setup
// TODO: randomize key and value
String key = "testKey";
Expand All @@ -82,7 +60,7 @@ public void test_asyncSet_success() throws ExecutionException, InterruptedExcept
when(commandsManager.set(eq(key), eq(value))).thenReturn(testResponse);

// exercise
Future<String> response = service.asyncSet(key, value);
Future<String> response = service.set(key, value);
String payload = response.get();

// verify
Expand All @@ -92,25 +70,6 @@ public void test_asyncSet_success() throws ExecutionException, InterruptedExcept
// teardown
}

@Test
public void test_set_success() throws ExecutionException, InterruptedException, TimeoutException {
// setup
// TODO: randomize key/value
String key = "testKey";
String value = "testValue";
CompletableFuture<String> testResponse = mock(CompletableFuture.class);
when(testResponse.get(anyLong(), any())).thenReturn(value);
when(commandsManager.set(eq(key), eq(value))).thenReturn(testResponse);

// exercise
service.set(key, value);

// verify
// nothing to do

// teardown
}

@Test
public void test_ping_success() {
// setup
Expand Down
4 changes: 2 additions & 2 deletions java/client/src/test/java/babushka/api/ConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void setUp() {
}

@Test
public void test_asyncConnectToRedis_success() {
public void asyncConnectToRedis_success() {
// setup
boolean useSsl = false;
boolean clusterMode = false;
Expand All @@ -41,7 +41,7 @@ public void test_asyncConnectToRedis_success() {

// exercise
CompletableFuture<String> connectionResponse =
service.asyncConnectToRedis(HOST, PORT, useSsl, clusterMode);
service.connectToRedis(HOST, PORT, useSsl, clusterMode);

// verify
Mockito.verify(connectionManager, times(1))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package babushka.connectors;

public class SocketConnectionTest {}

0 comments on commit 2f81317

Please sign in to comment.