Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add managers layer to client #44

Merged

Conversation

Yury-Fridlyand
Copy link

@Yury-Fridlyand Yury-Fridlyand commented Dec 15, 2023

Part 2 of client implementation. See part 1 in #43/valkey-io#670

The tracking task: https://github.com/orgs/Bit-Quill/projects/4/views/9?pane=issue&itemId=48023248

part3

Proposed usage in Client class:

package babushka.api;

import babushka.connectors.handlers.CallbackDispatcher;
import babushka.connectors.handlers.ChannelHandler;
import babushka.ffi.resolvers.SocketListenerResolver;
import babushka.managers.CommandManager;
import babushka.managers.ConnectionManager;
import java.util.concurrent.atomic.AtomicBoolean;

public class Client {

  public Client() {

    AtomicBoolean connectionStatus = new AtomicBoolean(false);

    CallbackDispatcher callbackDispatcher = new CallbackDispatcher(connectionStatus);
    ChannelHandler channelHandler =
        new ChannelHandler(callbackDispatcher, SocketListenerResolver.getSocket());
    var connectionManager = new ConnectionManager(channelHandler, connectionStatus);
    var commandManager = new CommandManager(channelHandler);
  }
}

And Awaiter:

package babushka.api;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Awaiter {
  private static final long DEFAULT_TIMEOUT_MILLISECONDS = 30000;

  /** Get the future result with default timeout. */
  public static <T> T await(CompletableFuture<T> future) {
    return await(future, DEFAULT_TIMEOUT_MILLISECONDS);
  }

  /** Get the future result with given timeout in ms. */
  public static <T> T await(CompletableFuture<T> future, long timeout) {
    try {
      return future.get(timeout, TimeUnit.MILLISECONDS);
    } catch (ExecutionException e) {
      throw new RuntimeException(e.getMessage(), e.getCause());
    } catch (InterruptedException e) {
      // TODO should shutdown the client service
      if (Thread.currentThread().isInterrupted()) {
        // restore interrupt
        Thread.interrupted();
      }
      throw new RuntimeException("The thread was interrupted", e);
    } catch (TimeoutException e) {
      throw new RuntimeException("Request timed out", e);
    } catch (CancellationException e) {
      throw new RuntimeException("Request was cancelled", e);
    }
  }
}

Signed-off-by: Yury-Fridlyand <[email protected]>
@Yury-Fridlyand Yury-Fridlyand force-pushed the java/dev_yuryf_client_part2 branch from 3b5d20b to 1bff530 Compare December 16, 2023 00:17
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>

private final ClientState.ReadOnlyClientState clientState;

/** Unique request ID (callback ID). Thread-safe and overflow-safe. */
private final AtomicInteger requestId = new AtomicInteger(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not use CONNECTION_PROMISE_ID in the constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok, since registerConnection calls to registerRequest and share the same future array

var future = new CompletableFuture<Response>();
responses.put(callbackId, future);
return Pair.of(callbackId, future);
}

public CompletableFuture<Response> registerConnection() {
return connectionPromise;
var res = registerRequest();
if (res.getKey() != CONNECTION_PROMISE_ID) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to protect vs re-using a client for another connection

if (callbackId == 0) {
connectionPromise.completeAsync(() -> response);
int callbackId =
clientState.isInitializing() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment to say that connection responses don't return a callbackIdx

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 647d802.

connectionPromise.completeAsync(() -> response);
int callbackId =
clientState.isInitializing() ? response.getCallbackIdx() : CONNECTION_PROMISE_ID;
var future = responses.get(callbackId);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid using var?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here or everywhere?

responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
// TODO: log an error.
// probably a response was received after shutdown or `registerRequest` call was missing

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only really happen after shutdown...? should we check the state and just log that a response was completed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a part of logging task

*
* @param key The key name
*/
public CompletableFuture<String> get(String key) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this - we only need submitNewRequest

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which class should call get/set? Should I do submitNewRequest public then?

* @param key The key name
* @param value The value to set
*/
public CompletableFuture<String> set(String key, String value) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

} else if (response.hasRespPointer()) {
return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString();
}
return null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this doesn't happen, we should throw Exception instead

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 647d802

"Unexpected response data: "
+ RedisValueResolver.valueFromPointer(response.getRespPointer()));
}
return false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this happen?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if something went completely wrong on rust side. Surprising, protobuf allows to build such response object with no complains.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 647d802

import redis_request.RedisRequestOuterClass.Routes;
import redis_request.RedisRequestOuterClass.SimpleRoutes;

public class RequestBuilder {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be RedisRequestBuilder

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, we have 2 types of requests: RedisRequest and ConnectionRequest and this guy builds both.

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
synchronized (responses) {
if (callbackId == null) {
long size = responses.mappingCount();
callbackId = (int) (size < Integer.MAX_VALUE ? size : -(size - Integer.MAX_VALUE));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just protect if this wraps around and re-creates zero... but this scenario would be rare and seriously bad.

@Yury-Fridlyand Yury-Fridlyand merged commit 68b42a5 into java/integ_yuryf_client_part2 Dec 21, 2023
3 checks passed
@Yury-Fridlyand Yury-Fridlyand deleted the java/dev_yuryf_client_part2 branch December 21, 2023 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants