Skip to content

Commit

Permalink
Update for review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Dec 4, 2023
1 parent 2f81317 commit e828f0c
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 12,946 deletions.
2 changes: 1 addition & 1 deletion java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
build

# Ignore protobuf generated files
models/protobuf
protobuf
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,7 @@ public void close() {
channel.close();
group.shutdownGracefully();
INSTANCE = null;
// TODO should we reply in uncompleted futures?
CallbackManager.shutdownGracefully();
CallbackManager.connectionRequests.clear();
CallbackManager.responses.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void initChannel(@NonNull UnixChannel ch) {
// https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html
.addLast("protobufDecoder", new ProtobufVarint32FrameDecoder())
.addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender())
.addLast(new ChannelInboundHandlerAdapter())
.addLast(new ChannelOutboundHandlerAdapter());
.addLast(new ReadHandler())
.addLast(new WriteHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,7 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
buf.readBytes(bytes);
// TODO surround parsing with try-catch, set error to future if parsing failed.
var response = ResponseOuterClass.Response.parseFrom(bytes);
int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
// can't distinguish connection requests since they have no
// callback ID
// https://github.com/aws/babushka/issues/600
CallbackManager.connectionRequests.pop().complete(response);
} else {
CallbackManager.responses.get(callbackId).complete(response);
CallbackManager.responses.remove(callbackId);
}
CallbackManager.completeAsync(response);
buf.release();
}

Expand Down
15 changes: 15 additions & 0 deletions java/client/src/main/java/babushka/managers/CallbackManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,30 @@ public void registerConnection(CompletableFuture<Response> future) {
CallbackManager.connectionRequests.add(future);
}

public static void completeAsync(Response response) {
int callbackId = response.getCallbackIdx();
if (callbackId == 0) {
// can't distinguish connection requests since they have no
// callback ID
// https://github.com/aws/babushka/issues/600
connectionRequests.pop().completeAsync(() -> response);
} else {
responses.get(callbackId).completeAsync(() -> response);
responses.remove(callbackId);
}
}

public static void shutdownGracefully() {
connectionRequests.forEach(
future -> {
future.completeExceptionally(new InterruptedException());
});
connectionRequests.clear();
responses.forEach(
(callbackId, future) -> {
future.completeExceptionally(new InterruptedException());
});
responses.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public CompletableFuture<String> set(String key, String value) {
public CompletableFuture<String> submitNewCommand(RequestType command, List<String> args) {
// TODO this explicitly uses ForkJoin thread pool. May be we should use another one.
CompletableFuture<Response> future = new CompletableFuture<>();
int callbackId = callbackManager.registerRequest(future);

return CompletableFuture.supplyAsync(
() -> {
socketConnection.writeAndFlush(redisSingleCommand(command, args));
return future;
})
.thenCompose(f -> f)
.thenApply(RequestBuilder::resolveRedisResponseToString);
// TODO: is there a better way to execute this?
.thenComposeAsync(f -> f)
.thenApplyAsync(RequestBuilder::resolveRedisResponseToString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletableFuture<String> connectToRedis(
var future = new CompletableFuture<Response>();
callbackManager.registerConnection(future);
socketConnection.writeAndFlush(request);
return future.thenApply(f -> f.getConstantResponse().toString());
return future.thenApplyAsync(f -> f.getConstantResponse().toString());
}

/**
Expand Down
Loading

0 comments on commit e828f0c

Please sign in to comment.