-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor to Client-Manager-Connection layers #42
Refactor to Client-Manager-Connection layers #42
Conversation
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
This reverts commit d9b26a6.
Signed-off-by: acarbonetto <[email protected]>
* Add a java app to run benchmarks --------- Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
* Merge Pull Request #5 - Add java pipeline. Also changed: * Merged two projects. * Updated CI. * Fixed tests and updated `junit` version. * Spotless. * Add new gradle tasks. Signed-off-by: Yury-Fridlyand <[email protected]>
* Add sync and async clients both to tests. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor fixes. Signed-off-by: Yury-Fridlyand <[email protected]> --------- Signed-off-by: Yury-Fridlyand <[email protected]>
…onvert to TypeScript. (valkey-io#456) removed duplicated logic and refactored to typescript Signed-off-by: acarbonetto <[email protected]>
* Add Jedis and Lettuce benchmarks * Start ignoring .gradle files * Update gitignore and remove generated files from git Signed-off-by: acarbonetto <[email protected]> * Update gitignore and remove generated files from git Signed-off-by: acarbonetto <[email protected]> * Update gitignore and remove generated files from git Signed-off-by: acarbonetto <[email protected]> * Add benchmarks for GET non-existing * Revert "Update gitignore and remove generated files from git" This reverts commit d9b26a6. * fix redis-rs submodules Signed-off-by: acarbonetto <[email protected]> * Randomize commands in Java benchmarks * rename chooseAction to randomAction * Add a Java benchmarking app (#7) * Add a java app to run benchmarks --------- Signed-off-by: acarbonetto <[email protected]> * Add Readme and update install_and_test script to runJava Signed-off-by: acarbonetto <[email protected]> * Add Readme and update install_and_test script to runJava Signed-off-by: acarbonetto <[email protected]> * Combine java pipeline and java benchmarks (#8) * Merge Pull Request #5 - Add java pipeline. Also changed: * Merged two projects. * Updated CI. * Fixed tests and updated `junit` version. * Spotless. * Add new gradle tasks. Signed-off-by: Yury-Fridlyand <[email protected]> * Add sync and async clients both to tests. (#12) * Add sync and async clients both to tests. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor fixes. Signed-off-by: Yury-Fridlyand <[email protected]> --------- Signed-off-by: Yury-Fridlyand <[email protected]> * Add dataSize option to java benchmark. Signed-off-by: Yury-Fridlyand <[email protected]> --------- Signed-off-by: acarbonetto <[email protected]> Signed-off-by: Yury-Fridlyand <[email protected]> Co-authored-by: Jonathan Louie <[email protected]> Co-authored-by: acarbonetto <[email protected]> Co-authored-by: jonathanl-bq <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
* Add option to run tests on multiple clients in concurrency * Common pool of iterations. * Awaiting result from async methods. Signed-off-by: Yury-Fridlyand <[email protected]> * minor fix Signed-off-by: Yury-Fridlyand <[email protected]> * Change while-loop; Spotless Apply Signed-off-by: acarbonetto <[email protected]> --------- Signed-off-by: Yury-Fridlyand <[email protected]> Signed-off-by: acarbonetto <[email protected]> Co-authored-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: acarbonetto <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Andrew Carbonetto <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Andrew Carbonetto <[email protected]>
|
||
private final ConnectionManager connectionManager; | ||
private final CommandManager commandManager; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final Object configuration;
which contains:
- flushing policy
- logging config
- profiling config
- multithreading config
- etc
- kitten pictures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely.
Most definitely with 🐱 pictures
} | ||
|
||
private static RedisRequest redisSingleCommand(RequestType command, List<String> args) { | ||
var commandArgs = RedisRequestOuterClass.Command.ArgsArray.newBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize imports to shorten the identifiers (and below too)
public static String resolveRedisResponseToString(Response response) { | ||
if (response.hasConstantResponse()) { | ||
return BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()).toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static String resolveRedisResponseToString(Response response) { | |
if (response.hasConstantResponse()) { | |
return BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()).toString(); | |
public String resolveRedisResponseToString(Response response) { | |
if (response.hasConstantResponse()) { | |
return nativeDefinitions.valueFromPointer(response.getRespPointer()).toString(); |
return CompletableFuture.supplyAsync( | ||
() -> { | ||
socketConnection.writeAndFlush(request); | ||
return future; | ||
}) | ||
.thenCompose(f -> f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
return CompletableFuture.supplyAsync( | |
() -> { | |
socketConnection.writeAndFlush(request); | |
return future; | |
}) | |
.thenCompose(f -> f) | |
return CompletableFuture.runAsync(() -> socketConnection.writeAndFlush(request)) | |
.thenApply(v -> future) |
* Close socket connection and drop all channels | ||
* TODO: provide feedback that the connection was properly closed | ||
*/ | ||
public void closeConnection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return string or bool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure...
.addAddresses( | ||
ConnectionRequestOuterClass.NodeAddress.newBuilder().setHost(host).setPort(port).build()) | ||
.setTlsMode(useSsl ? ConnectionRequestOuterClass.TlsMode.SecureTls : ConnectionRequestOuterClass.TlsMode.NoTls) | ||
.setClusterModeEnabled(clusterMode) | ||
.setReadFrom(ConnectionRequestOuterClass.ReadFrom.Primary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimize imports
Signed-off-by: Andrew Carbonetto <[email protected]>
@@ -210,7 +210,7 @@ async fn write_result( | |||
error_message.into(), | |||
)) | |||
} else { | |||
log_warn("received error", error_message.as_str()); | |||
log_warn("received error", format!("{} for callback {}", error_message.as_str(), callback_index)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should revert this
|
||
1. A Java client (lib folder): wrapper to rust-client. | ||
2. An examples script: to sanity test javababushka and similar java-clients against a redis host. | ||
3. A benchmark app: to performance benchmark test javababushka and similar java-clients against a redis host. | ||
2. An examples script: to sanity test babushka and similar java-clients against a redis host. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Babushka"... but honestly, we have to change this anyways.
|
||
private final ConnectionManager connectionManager; | ||
private final CommandManager commandManager; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely.
Most definitely with 🐱 pictures
} | ||
|
||
tasks.register('protobuf', Exec) { | ||
doFirst { | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/babushka/models/protobuf').toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: place hard wired strings in a config file or declare it as a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part 1 of my review
try { | ||
return future.get(timeout, TimeUnit.MILLISECONDS); | ||
} catch (Exception ignored) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to ignore the exception here? Why? Also, should we be returning null instead of Optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a TODO to deal with this properly.
* @param key The key name | ||
* @param value The value to set | ||
*/ | ||
public Future<String> asyncSet(String key, String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want to return Future
over CompletableFuture
. Otherwise, you can't compose dependent async computations on the client side in a non-blocking manner, which you would be able to do with thenCompose
in CompletableFuture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points.
We need to update this in our API, but good to be consistent.
* | ||
* @param key The key name | ||
*/ | ||
public Future<String> asyncGet(String key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. We probably should not return a regular Future
.
*/ | ||
public String get(String key) { | ||
return Awaiter.await(asyncGet(key)); | ||
// TODO support non-strings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://redis.io/commands/get/ says it only supports Strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had to do with the response.
We want to support response objects, not just strings (in case something went wrong)
Signed-off-by: Andrew Carbonetto <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part 2 of my review
SocketConnection.socketPath = socketPath; | ||
return; | ||
} | ||
throw new RuntimeException("socket path can only be declared once"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it common practice to throw a generic RuntimeException
like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen... so we throw Runtime for unexpected activity
*/ | ||
@Override | ||
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) | ||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to be throwing generic Exception
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChannelInboundHandlerAdapter throws a general Exception
package babushka.ffi.resolvers; | ||
|
||
public class BabushkaCoreNativeDefinitions { | ||
public static native String startSocketListenerExternal() throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, should we be throwing generic Exception
here?
*/ | ||
public int registerRequest(CompletableFuture<Response> future) { | ||
int callbackId = requestId.incrementAndGet(); | ||
CallbackManager.responses.put(callbackId, future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the responses map just pile up Future
s? I'm worried these may just pile up until we OOM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that could happen if we aren't getting any responses from Babushka and keep on sending requests.
@@ -0,0 +1,6868 @@ | |||
// Generated by the protocol buffer compiler. DO NOT EDIT! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, generated file probably should be removed.
@@ -0,0 +1,2162 @@ | |||
// Generated by the protocol buffer compiler. DO NOT EDIT! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, generated file probably should be removed.
* @param response Redis Response | ||
* @return String or null | ||
*/ | ||
public static String resolveRedisResponseToString(ResponseOuterClass.Response response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to return an Optional<String>
? Also, should the input be non-nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to return a String.
We want to support our own Response objects with support for success responses and errors.
.build(); | ||
} | ||
|
||
public static RedisRequestOuterClass.RedisRequest redisSingleCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make sure the inputs are non-nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Garbage in. Garbage out?
|
||
/** Build a protobuf connection request object */ | ||
// TODO support more parameters and/or configuration object | ||
public static ConnectionRequestOuterClass.ConnectionRequest getConnectionRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be allowing nullable host strings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe...? why would a user do that?
Signed-off-by: Andrew Carbonetto <[email protected]>
CallbackManager.connectionRequests.pop().complete(response); | ||
} else { | ||
CallbackManager.responses.get(callbackId).complete(response); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CallbackManager.connectionRequests.pop().complete(response); | |
} else { | |
CallbackManager.responses.get(callbackId).complete(response); | |
CallbackManager.connectionRequests.pop().completeAsync(() -> response); | |
} else { | |
CallbackManager.responses.get(callbackId).completeAsync(() -> response); |
socketConnection.writeAndFlush(redisSingleCommand(command, args)); | ||
return future; | ||
}) | ||
.thenCompose(f -> f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.thenCompose(f -> f) | |
.thenComposeAsync(f -> f) |
return future; | ||
}) | ||
.thenCompose(f -> f) | ||
.thenApply(RequestBuilder::resolveRedisResponseToString); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.thenApply(RequestBuilder::resolveRedisResponseToString); | |
.thenApplyAsync(RequestBuilder::resolveRedisResponseToString); |
java/client/src/main/java/babushka/managers/ConnectionManager.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Andrew Carbonetto <[email protected]>
1. Create a new channel per client. `Channel` is wrapped by `ChannedHandler` class. 2. Create `CallbackManager` per client. Signed-off-by: Yury-Fridlyand <[email protected]>
|
||
private static String socketPath; | ||
|
||
public static void setSocketPath(String socketPath) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A security issue - a user may call this function before we do. Options there:
- don't export this function (make private or make class private)
- make
socketPath
be set statically automatically
Function should be syncronized though.
* @param response Redis Response | ||
* @return String or null | ||
*/ | ||
public static String resolveRedisResponseToString(Response response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this func to another file
Refactor of main Java client into layers of responsibility: