Skip to content

Commit

Permalink
Java client: update error handling (valkey-io#865)
Browse files Browse the repository at this point in the history
* Java client: update error handling (#67)

* Add exception handling and tests.
* Add IT.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Feb 1, 2024
1 parent 39335cb commit 888109e
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/** Holder for resources required to dispatch responses and used by {@link ReadHandler}. */
@RequiredArgsConstructor
public class CallbackDispatcher {

/** Unique request ID (callback ID). Thread-safe and overflow-safe. */
private final AtomicInteger nextAvailableRequestId = new AtomicInteger(0);
protected final AtomicInteger nextAvailableRequestId = new AtomicInteger(0);

/**
* Storage of Futures to handle responses. Map key is callback id, which starts from 0. The value
Expand All @@ -24,7 +32,7 @@ public class CallbackDispatcher {
* Negative Java values would be shown as positive on Rust side. There is no data loss, because
* callback ID remains unique.
*/
private final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses =
protected final ConcurrentHashMap<Integer, CompletableFuture<Response>> responses =
new ConcurrentHashMap<>();

/**
Expand All @@ -33,7 +41,7 @@ public class CallbackDispatcher {
*/
// TODO: Optimize to avoid growing up to 2e32 (16 Gb)
// https://github.com/aws/glide-for-redis/issues/704
private final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>();
protected final ConcurrentLinkedQueue<Integer> freeRequestIds = new ConcurrentLinkedQueue<>();

/**
* Register a new request to be sent. Once response received, the given future completes with it.
Expand All @@ -54,29 +62,72 @@ public Pair<Integer, CompletableFuture<Response>> registerRequest() {
}

public CompletableFuture<Response> registerConnection() {
var res = registerRequest();
return res.getValue();
return registerRequest().getValue();
}

/**
* Complete the corresponding client promise and free resources.
* Complete the corresponding client promise, handle error and free resources.
*
* @param response A response received
*/
public void completeRequest(Response response) {
if (response.hasClosingError()) {
// According to https://github.com/aws/glide-for-redis/issues/851
// a response with a closing error may arrive with any/random callback ID (usually -1)
// CommandManager and ConnectionManager would close the UDS channel on ClosingException
distributeClosingException(response.getClosingError());
return;
}
// Complete and return the response at callbackId
// free up the callback ID in the freeRequestIds list
int callbackId = response.getCallbackIdx();
CompletableFuture<Response> future = responses.remove(callbackId);
freeRequestIds.add(callbackId);
if (future != null) {
freeRequestIds.add(callbackId);
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
future.completeExceptionally(new RequestException(msg));
case ExecAbort:
// Transactional error on Redis service-side
future.completeExceptionally(new ExecAbortException(msg));
case Timeout:
// Timeout from Glide to Redis service
future.completeExceptionally(new TimeoutException(msg));
case Disconnect:
// Connection problem between Glide and Redis
future.completeExceptionally(new ConnectionException(msg));
default:
// Request or command error from Redis
future.completeExceptionally(new RequestException(msg));
}
}
future.completeAsync(() -> response);
} else {
// TODO: log an error.
// TODO: log an error thru logger.
// probably a response was received after shutdown or `registerRequest` call was missing
System.err.printf(
"Received a response for not registered callback id %d, request error = %s%n",
callbackId, response.getRequestError());
distributeClosingException("Client is in an erroneous state and should close");
}
}

/**
* Distribute {@link ClosingException} to all pending requests. {@link CommandManager} and {@link
* ConnectionManager} should catch it, handle and close the UDS connection.<br>
* Should be used to termination the client/connection only.
*
* @param message Exception message
*/
public void distributeClosingException(String message) {
responses.values().forEach(f -> f.completeExceptionally(new ClosingException(message)));
responses.clear();
}

public void shutdownGracefully() {
responses.values().forEach(future -> future.cancel(false));
responses.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class ChannelHandler {

private static final String THREAD_POOL_NAME = "glide-channel";

private final Channel channel;
private final CallbackDispatcher callbackDispatcher;
protected final Channel channel;
protected final CallbackDispatcher callbackDispatcher;

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.managers;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -21,40 +15,15 @@ public class BaseCommandResponseResolver
private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
* Extracts value from the RESP pointer.
*
* @return A generic Object with the Response | null if the response is empty
* @return A generic Object with the Response or null if the response is empty
*/
public Object apply(Response response) throws RedisException {
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
// Note: errors are already handled before in CallbackDispatcher
assert !response.hasClosingError() : "Unhandled response closing error";
assert !response.hasRequestError() : "Unhandled response request error";

if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
Expand Down
49 changes: 49 additions & 0 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotIdRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.exceptions.ClosingException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.managers.models.Command;
Expand Down Expand Up @@ -48,9 +49,57 @@ public <T> CompletableFuture<T> submitNewCommand(
command.getArguments(),
Optional.ofNullable(command.getRoute())),
true)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(responseHandler::apply);
}

/**
* Exception handler for future pipeline.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it rethrows the exception
*/
private Response exceptionHandler(Throwable e) {
if (e instanceof ClosingException) {
channel.close();
}
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}

/**
* Build a protobuf command/transaction request object.<br>
* Used by {@link CommandManager}.
*
* @param command - Redis command
* @param args - Redis command arguments as string array
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a
* callback id.
*/
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest(
Command.RequestType command, String[] args) {
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs =
RedisRequestOuterClass.Command.ArgsArray.newBuilder();
for (var arg : args) {
commandArgs.addArgs(arg);
}

// TODO: set route properly when no RouteOptions given
return RedisRequestOuterClass.RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build())
.setRoute(
RedisRequestOuterClass.Routes.newBuilder()
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes)
.build());
}

private RequestType mapRequestTypes(Command.RequestType inType) {
switch (inType) {
case CUSTOM_COMMAND:
Expand Down
42 changes: 27 additions & 15 deletions java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
import glide.api.models.exceptions.ClosingException;
import glide.connectors.handlers.ChannelHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import lombok.RequiredArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -39,7 +37,25 @@ public class ConnectionManager {
*/
public CompletableFuture<Void> connectToRedis(BaseClientConfiguration configuration) {
ConnectionRequest request = createConnectionRequest(configuration);
return channel.connect(request).thenApplyAsync(this::checkGlideRsResponse);
return channel
.connect(request)
.exceptionally(this::exceptionHandler)
.thenApplyAsync(this::checkGlideRsResponse);
}

/**
* Exception handler for future pipeline.
*
* @param e An exception thrown in the pipeline before
* @return Nothing, it always rethrows the exception
*/
private Response exceptionHandler(Throwable e) {
channel.close();
if (e instanceof RuntimeException) {
// RedisException also goes here
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}

/**
Expand Down Expand Up @@ -155,15 +171,15 @@ private ConnectionRequestOuterClass.ReadFrom mapReadFromEnum(ReadFrom readFrom)

/** Check a response received from Glide. */
private Void checkGlideRsResponse(Response response) {
// Note: errors are already handled before in CallbackDispatcher, but we double-check
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
throwClosingError("Unexpected request error in response: " + error.getMessage());
throwClosingError(
"Unhandled request error in response: " + response.getRequestError().getMessage());
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
throwClosingError(response.getClosingError());
throwClosingError("Unhandled closing error in response: " + response.getClosingError());
}

if (response.hasRespPointer()) {
throwClosingError("Unexpected data in response");
}
Expand All @@ -174,12 +190,8 @@ private Void checkGlideRsResponse(Response response) {
return null;
}

private void throwClosingError(String msg) throws ClosingException {
try {
closeConnection().get();
} catch (InterruptedException | ExecutionException exception) {
throw new RuntimeException(exception);
}
private void throwClosingError(String msg) {
closeConnection();
throw new ClosingException(msg);
}

Expand All @@ -189,6 +201,6 @@ private void throwClosingError(String msg) throws ClosingException {
* @return a CompletableFuture to indicate the channel is closed
*/
public Future<Void> closeConnection() {
return channel.close().syncUninterruptibly();
return channel.close();
}
}
Loading

0 comments on commit 888109e

Please sign in to comment.