Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add managers layer to client #44

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ tasks.register('cleanProtobuf') {
tasks.register('buildRustRelease', Exec) {
commandLine 'cargo', 'build', '--release'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRustReleaseStrip', Exec) {
commandLine 'cargo', 'build', '--release', '--strip'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildRust', Exec) {
commandLine 'cargo', 'build'
workingDir project.rootDir
environment 'CARGO_TERM_COLOR', 'always'
}

tasks.register('buildWithRust') {
Expand Down
24 changes: 24 additions & 0 deletions java/client/src/main/java/babushka/connectors/ClientState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package babushka.connectors;

public class ClientState {

private ClientState() {}

/**
* A read only client state. It is supposed that main Client class will have instance of the state
* of this type and won't be able to change the state directly.
*/
public static interface ReadOnlyClientState {
/** Check that connection established. This doesn't validate whether it is alive. */
boolean isConnected();

/** Check that connection is not yet established. */
boolean isInitializing();
}

/** A client state which accepts switching to <em>Connected</em> state. */
public static interface OpenableClientState extends ReadOnlyClientState {
/** Report connection status. */
void connect(boolean successful);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
package babushka.connectors.handlers;

import babushka.connectors.ClientState;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.Response;

/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */
@RequiredArgsConstructor
public class CallbackDispatcher {
/** Unique request ID (callback ID). Thread-safe. */

/** Reserved callback ID for connection request. */
private final int CONNECTION_PROMISE_ID = 0;

/** Client state reference. */
private final ClientState.ReadOnlyClientState clientState;

/**
* Unique request ID (callback ID). Thread-safe and overflow-safe.<br>
* Note: Protobuf packet contains callback ID as uint32, but it stores data as a bit field.<br>
* Negative java values would be shown as positive on rust side. Meanwhile, no data loss happen,
* because callback ID remains unique.
*/
private final AtomicInteger requestId = new AtomicInteger(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not use CONNECTION_PROMISE_ID in the constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok, since registerConnection calls to registerRequest and share the same future array


/**
Expand All @@ -18,28 +33,28 @@ public class CallbackDispatcher {
*/
private final Map<Integer, CompletableFuture<Response>> responses = new ConcurrentHashMap<>();

/**
* Storage for connection request similar to {@link #responses}. Unfortunately, connection
* requests can't be stored in the same storage, because callback ID = 0 is hardcoded for
* connection requests.
*/
private final CompletableFuture<Response> connectionPromise = new CompletableFuture<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we keeping this separate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed to combine that...
Need to discuss again the destiny of connectionPromise - I have to restore it if we support re-connection


/**
* Register a new request to be sent. Once response received, the given future completes with it.
*
* @return A pair of unique callback ID which should set into request and a client promise for
* response.
*/
public Pair<Integer, CompletableFuture<Response>> registerRequest() {
int callbackId = requestId.incrementAndGet();
int callbackId = 0;
do {
callbackId = requestId.getAndIncrement();
} while (responses.containsKey(callbackId));
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
var future = new CompletableFuture<Response>();
responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
var res = registerRequest();
if (res.getKey() != CONNECTION_PROMISE_ID) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to protect vs re-using a client for another connection

throw new IllegalStateException();
}
return res.getValue();
}

/**
Expand All @@ -48,17 +63,20 @@ public CompletableFuture<Response> registerConnection() {
* @param response A response received
*/
public void completeRequest(Response response) {
int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
connectionPromise.completeAsync(() -> response);
// A connection response doesn't contain a callback id
int callbackId =
clientState.isInitializing() ? CONNECTION_PROMISE_ID : response.getCallbackIdx();
CompletableFuture<Response> future = responses.get(callbackId);
if (future != null) {
future.completeAsync(() -> response);
} else {
responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
// TODO: log an error.
// probably a response was received after shutdown or `registerRequest` call was missing

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only really happen after shutdown...? should we check the state and just log that a response was completed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a part of logging task

}
responses.remove(callbackId);
}

public void shutdownGracefully() {
connectionPromise.cancel(false);
responses.values().forEach(future -> future.cancel(false));
responses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package babushka.connectors.handlers;

import babushka.connectors.resources.Platform;
import babushka.connectors.resources.ThreadPoolAllocator;
import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.unix.DomainSocketAddress;
import java.util.OptionalInt;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;

Expand All @@ -24,7 +24,7 @@ public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
channel =
new Bootstrap()
// TODO let user specify the thread pool or pool size as an option
.group(Platform.createNettyThreadPool("babushka-channel", OptionalInt.empty()))
.group(ThreadPoolAllocator.createNettyThreadPool("babushka-channel", Optional.empty()))
.channel(Platform.getClientUdsNettyChannelType())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
Expand Down Expand Up @@ -52,13 +52,9 @@ public CompletableFuture<Response> connect(ConnectionRequest request) {
return callbackDispatcher.registerConnection();
}

private final AtomicBoolean closed = new AtomicBoolean(false);

/** Closes the UDS connection and frees corresponding resources. */
public void close() {
if (closed.compareAndSet(false, true)) {
channel.close();
callbackDispatcher.shutdownGracefully();
}
channel.close();
callbackDispatcher.shutdownGracefully();
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package babushka.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -44,12 +36,6 @@ public static class Capabilities {
private static final Capabilities capabilities =
new Capabilities(isKQueueAvailable(), isEPollAvailable(), false, false);

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();

/** Detect <em>kqueue</em> availability. */
private static boolean isKQueueAvailable() {
try {
Expand All @@ -70,42 +56,6 @@ private static boolean isEPollAvailable() {
}
}

/**
* Allocate Netty thread pool required to manage connection. A thread pool could be shared across
* multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, OptionalInt threadLimit) {
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (capabilities.isKQueueAvailable()) {
var name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (capabilities.isEPollAvailable()) {
var name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
var group = supplier.get();
groups.put(name, group);
return group;
}

/**
* Get a channel class required by Netty to open a client UDS channel.
*
Expand All @@ -120,20 +70,4 @@ public static Class<? extends DomainSocketChannel> getClientUdsNettyChannelType(
}
throw new RuntimeException("Current platform supports no known socket types");
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package babushka.connectors.resources;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/** A class responsible to allocating and deallocating shared thread pools. */
public class ThreadPoolAllocator {

/**
* Thread pools supplied to <em>Netty</em> to perform all async IO.<br>
* Map key is supposed to be pool name + thread count as a string concat product.
*/
private static final Map<String, EventLoopGroup> groups = new ConcurrentHashMap<>();

/**
* Allocate (create new or share existing) Netty thread pool required to manage connection. A
* thread pool could be shared across multiple connections.
*
* @return A new thread pool.
*/
public static EventLoopGroup createNettyThreadPool(String prefix, Optional<Integer> threadLimit) {
int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors());
if (Platform.getCapabilities().isKQueueAvailable()) {
String name = prefix + "-kqueue-elg";
return getOrCreate(
name + threadCount,
() -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
} else if (Platform.getCapabilities().isEPollAvailable()) {
String name = prefix + "-epoll-elg";
return getOrCreate(
name + threadCount,
() -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true)));
}
// TODO support IO-Uring and NIO

throw new RuntimeException("Current platform supports no known thread pool types");
}

/**
* Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache.
*/
private static EventLoopGroup getOrCreate(String name, Supplier<EventLoopGroup> supplier) {
if (groups.containsKey(name)) {
return groups.get(name);
}
EventLoopGroup group = supplier.get();
groups.put(name, group);
return group;
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
@Override
public void run() {
groups.values().forEach(EventLoopGroup::shutdownGracefully);
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Babushka-shutdown-hook"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package babushka.ffi.resolvers;

import response.ResponseOuterClass.Response;

public class RedisValueResolver {
/**
* Resolve a value received from Redis using given C-style pointer.
*
* @param pointer A memory pointer from {@link Response}
* @return A RESP3 value
*/
public static native Object valueFromPointer(long pointer);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package babushka.ffi.resolvers;

public class BabushkaCoreNativeDefinitions {
public static native String startSocketListenerExternal() throws Exception;
public class SocketListenerResolver {

public static native Object valueFromPointer(long pointer);
/** Make an FFI call to Babushka to open a UDS socket to connect to. */
private static native String startSocketListener() throws Exception;

static {
System.loadLibrary("javababushka");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this statically load the library multiple times? I'm not sure if it makes sense to split these classes up and load the library multiple times...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I load it only once - here.
According to java docs:

If this method is called more than once with the same library name, the second and subsequent calls are ignored.

Expand All @@ -16,7 +16,7 @@ public class BabushkaCoreNativeDefinitions {
*/
public static String getSocket() {
try {
return startSocketListenerExternal();
return startSocketListener();
} catch (Exception | UnsatisfiedLinkError e) {
System.err.printf("Failed to create a UDS connection: %s%n%n", e);
throw new RuntimeException(e);
Expand Down
Loading
Loading