-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add java JNI/Netty client - rust part. #631
Add java JNI/Netty client - rust part. #631
Conversation
Signed-off-by: Yury-Fridlyand <[email protected]>
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); | ||
} | ||
|
||
private void createChannel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: upcoming PR will split this file. The channel
would be created once per multiple clients (singleton)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: the channel
would be created per client, but other shared resources (thread pool) would be in a singletone class
Hey @Yury-Fridlyand, the tests are failing - so let me know when it's ready for review |
This PR depends on the previous (#629, #630), but it is ready for review. Client won't change when I resolve merge conflicts. |
java/client/build.gradle
Outdated
|
||
implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' | ||
// https://github.com/netty/netty/wiki/Native-transports | ||
// Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to
// At the moment, Windows is not supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
java/client/build.gradle
Outdated
} | ||
|
||
tasks.register('protobuf', Exec) { | ||
doFirst { | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString()) | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refactor javababushka to babushka. It will make the refactor easier when we'll have the new name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Going to change in another PR (benchmarking)
java/client/build.gradle
Outdated
} | ||
commandLine 'protoc', | ||
'-Iprotobuf=babushka-core/src/protobuf/', | ||
'--java_out=java/client/src/main/java/org/babushka/javababushka/generated', | ||
'--java_out=java/client/src/main/java/javababushka/generated', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it unified with python - lets rename the 'generated' folder to 'protobuf'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
public class Client implements AutoCloseable { | ||
|
||
private static final int REQUEST_TIMEOUT_MILLISECONDS = 250; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the default timeout set in the core. We don't want to expose defaults in the wrapper so we'll be able to maintain it only in a single place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
} catch (InterruptedException | ExecutionException e) { | ||
throw new RuntimeException(e); | ||
} catch (TimeoutException e) { | ||
// System.out.println("-- auto flush - timeout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
return CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return future.get(AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove timeouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is java client internal stuff. We have a task to research and design a flushing policy.
This functions requests a flush if request wasn't compileted within the given timeout. The timeout could be configurable and this feature could be switched off completely.
All flushing related code will be reworked as well.
} | ||
|
||
public Future<Response> asyncSet(String key, String value) { | ||
// System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
public Future<String> asyncGet(String key) { | ||
// System.out.printf("== get(%s), callback %d%n", key, callbackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
java/src/lib.rs
Outdated
Value::Bulk(_bulk) => { | ||
let _ = env.throw("Not implemented"); | ||
JObject::null() | ||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some code from python client left here as an example. Note, this function isn't completed and will be reworked later
Signed-off-by: Yury-Fridlyand <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, re-review @barshaul
java/client/build.gradle
Outdated
} | ||
commandLine 'protoc', | ||
'-Iprotobuf=babushka-core/src/protobuf/', | ||
'--java_out=java/client/src/main/java/org/babushka/javababushka/generated', | ||
'--java_out=java/client/src/main/java/javababushka/generated', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
java/client/build.gradle
Outdated
} | ||
|
||
tasks.register('protobuf', Exec) { | ||
doFirst { | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString()) | ||
project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Going to change in another PR (benchmarking)
java/client/build.gradle
Outdated
|
||
implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' | ||
// https://github.com/netty/netty/wiki/Native-transports | ||
// Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
public class Client implements AutoCloseable { | ||
|
||
private static final int REQUEST_TIMEOUT_MILLISECONDS = 250; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
private static final int REQUEST_TIMEOUT_MILLISECONDS = 250; | ||
private static final int HIGH_WRITE_WATERMARK = 4096; | ||
private static final int LOW_WRITE_WATERMARK = 1024; | ||
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is async task timeout. A sync task wraps async one with this timeout. Let keep it here - it will be removed or significantly reworked in the upcoming PR.
} | ||
|
||
public Future<Response> asyncSet(String key, String value) { | ||
// System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
} | ||
|
||
public Future<String> asyncGet(String key) { | ||
// System.out.printf("== get(%s), callback %d%n", key, callbackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
java/src/lib.rs
Outdated
Value::Bulk(_bulk) => { | ||
let _ = env.throw("Not implemented"); | ||
JObject::null() | ||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some code from python client left here as an example. Note, this function isn't completed and will be reworked later
java/src/lib.rs
Outdated
) -> JObject<'local> { | ||
let (tx, rx) = mpsc::channel::<Result<String, String>>(); | ||
|
||
//logger_core::init(Some(Level::Trace), None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned
? TlsMode.SecureTls | ||
: TlsMode.NoTls) | ||
.setClusterModeEnabled(clusterMode) | ||
.setRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can I remove ReadFrom
too?
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
|
||
public class Client implements AutoCloseable { | ||
|
||
private static final int HIGH_WRITE_WATERMARK = 4096; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you define constants in Java inside functions? if so, it's better if the constant is located near its usage, so it's clearer what it represents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is significantly refactored in the next PR and this parameter is removed.
Please, ignore it for now.
|
||
// https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html | ||
// Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE | ||
public static int AUTO_FLUSH_THRESHOLD_BYTES = 512; // 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the doc (1024) mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some experiments for finding the best parameter value for flushing. 1024 is actually a spare value for this parameter.
This file is significantly refactored in the next PR and all these parameter is removed.
Please, ignore it for now.
|
||
// If !ALWAYS_FLUSH_ON_WRITE and a command has no response in N millis, flush (probably it wasn't | ||
// send) | ||
public static int AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is ALWAYS_FLUSH_ON_WRITE
supposed to be configurable? if not, why have all of the logic handling it being in another state? In general, why have it and not just write the code without the argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will be configurable. There will be a separate PR which brings flushing policy and configuration for it.
|
||
// At the moment, Windows is not supported. | ||
// Probably we should use NIO (NioEventLoopGroup) for Windows. | ||
private static final boolean isMacOs = isMacOs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use an enum, if you know that more than 2 OS will be supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change this in future, once rust core lib will [be about to] support Windows.
private static boolean isMacOs() { | ||
try { | ||
Class.forName("io.netty.channel.kqueue.KQueue"); | ||
return KQueue.isAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surely there's a better way in Java to get the OS type. what about the os.name
system property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are planning to make this part more flexible. kqueue may be unavailable on mac or epoll - on linux. I can't imagine how, but ... why not?
I'll add support for NIO and IO-Uring before release. AFAIK, NIO has purely java implementation without any native dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then the name of the function is incorrect - it should be something like isKQueueAvailable
.
.addLast( | ||
new ChannelInboundHandlerAdapter() { | ||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we split this into methods, and have less indented code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next PR does it. This file is fully changed.
We can drop java files from this PR if you like.
|
||
public String get(String key) { | ||
return waitForResult(asyncGet(key)); | ||
// TODO support non-strings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? get
always returns strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A response from rust core lib may return a pointer which may refer other things too:
https://github.com/redis-rs/redis-rs/blob/168b9c55c2a880d2fb23601e41261c94d308c0d8/redis/src/types.rs#L138-L157
Should we cast everything to a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you get a response to a get
command which isn't a string or an error, something has went wrong and the client should be closed.
Same for every other command with a known return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know, thanks! Removed this TODO
|
||
} catch (Exception e) { | ||
System.err.printf( | ||
"Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: I'm not sure what's the indentation schema you're using. Sometimes the indentation is 2 spaces, sometimes 4, sometimes... something else?
Please align with the indentation scheme of the rest of the project - 4 spaces, no tabs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is GJF - google java format which is not configurable; it is enforced by spotless
. See java/build.gradle
file for reference:
https://github.com/Bit-Quill/babushka/blob/1d9460aa2e8fe810caf1cf0df68a83043d2e03cc/java/build.gradle#L97
Indentation scheme maybe confusing, but java code usually has more indentation than other languages, so most indentation are shortened.
class Example {
// here 2 spaces
public int function() {
// here 2 spaces too
return 42;
}
public void anotherFuncWithLongName(
// 4 spaces here
FirstArgType arg1, ...,
NthArgType argN) {
var result = longChangedFunctionCall(
// 4 spaces
anotherLongFunctionCall(
// 4 more spaces
... ));
}
}
If you dislike GJF and/or spotless or have preferable option - please let us know.
|
||
public class Client implements AutoCloseable { | ||
|
||
private static final int HIGH_WRITE_WATERMARK = 4096; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is significantly refactored in the next PR and this parameter is removed.
Please, ignore it for now.
|
||
// https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html | ||
// Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE | ||
public static int AUTO_FLUSH_THRESHOLD_BYTES = 512; // 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some experiments for finding the best parameter value for flushing. 1024 is actually a spare value for this parameter.
This file is significantly refactored in the next PR and all these parameter is removed.
Please, ignore it for now.
// If !ALWAYS_FLUSH_ON_WRITE flush on timer (like a cron) | ||
public static int AUTO_FLUSH_TIMER_MILLIS = 200; | ||
|
||
public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is significantly refactored in the next PR and this parameter is removed.
Please, ignore it for now.
private static boolean isMacOs() { | ||
try { | ||
Class.forName("io.netty.channel.kqueue.KQueue"); | ||
return KQueue.isAvailable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are planning to make this part more flexible. kqueue may be unavailable on mac or epoll - on linux. I can't imagine how, but ... why not?
I'll add support for NIO and IO-Uring before release. AFAIK, NIO has purely java implementation without any native dependency.
static { | ||
// TODO fix: netty still doesn't use slf4j nor log4j | ||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This removed in the next PR. Another PR will bring logging feature properly implemented.
.addLast( | ||
new ChannelInboundHandlerAdapter() { | ||
@Override | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next PR does it. This file is fully changed.
We can drop java files from this PR if you like.
|
||
} catch (Exception e) { | ||
System.err.printf( | ||
"Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is GJF - google java format which is not configurable; it is enforced by spotless
. See java/build.gradle
file for reference:
https://github.com/Bit-Quill/babushka/blob/1d9460aa2e8fe810caf1cf0df68a83043d2e03cc/java/build.gradle#L97
Indentation scheme maybe confusing, but java code usually has more indentation than other languages, so most indentation are shortened.
class Example {
// here 2 spaces
public int function() {
// here 2 spaces too
return 42;
}
public void anotherFuncWithLongName(
// 4 spaces here
FirstArgType arg1, ...,
NthArgType argN) {
var result = longChangedFunctionCall(
// 4 spaces
anotherLongFunctionCall(
// 4 more spaces
... ));
}
}
If you dislike GJF and/or spotless or have preferable option - please let us know.
|
||
public String get(String key) { | ||
return waitForResult(asyncGet(key)); | ||
// TODO support non-strings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A response from rust core lib may return a pointer which may refer other things too:
https://github.com/redis-rs/redis-rs/blob/168b9c55c2a880d2fb23601e41261c94d308c0d8/redis/src/types.rs#L138-L157
Should we cast everything to a string?
please mark everything which will be resolved in an upcoming PR with a TODO comment explaining what's the required change, so we won't forget to actually fix those. |
@Yury-Fridlyand It seems like the client code is going to have massive refactoring. I think that adding TODOs in this case would be too much. What do you think about removing the code that is going to be heavily refactored from this PR (e.g. the Client.java file), and open a new PR with the new code when it's ready? It doubles the work to review unready code, then to review the new refactored code and to make sure all issues from the first one was addressed |
Signed-off-by: Yury-Fridlyand <[email protected]>
…ilestone-1-client Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely removed the client, please, re-review.
We will add client by chunks even after splitting into multiple files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Squash commits (make sure to keep the commit description) & merge
Issue #, if available:
Part 4 of 4. (see #629)
Description of changes:
This PR contains java wrapper implemented with JNI and Netty.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.