forked from valkey-io/valkey-glide
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
6 changed files
with
19 additions
and
276 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 0 additions & 62 deletions
62
java/client/src/main/java/babushka/managers/ClientState.java
This file was deleted.
Oops, something went wrong.
89 changes: 12 additions & 77 deletions
89
java/client/src/main/java/babushka/managers/CommandManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,88 +1,23 @@ | ||
package babushka.managers; | ||
|
||
import babushka.connectors.handlers.ChannelHandler; | ||
import babushka.ffi.resolvers.RedisValueResolver; | ||
import babushka.models.RequestBuilder; | ||
import java.util.List; | ||
import babushka.api.commands.Command; | ||
import java.util.concurrent.CompletableFuture; | ||
import lombok.RequiredArgsConstructor; | ||
import redis_request.RedisRequestOuterClass.RequestType; | ||
import java.util.function.Function; | ||
import response.ResponseOuterClass.Response; | ||
|
||
@RequiredArgsConstructor | ||
public class CommandManager { | ||
|
||
/** UDS connection representation. */ | ||
private final ChannelHandler channel; | ||
|
||
/** | ||
* Client state, which {@link CommandManager} can flick to closed if corresponding error received. | ||
*/ | ||
private final ClientState.ClosableClientState clientState; | ||
|
||
/** | ||
* Async (non-blocking) get.<br> | ||
* See <a href="https://redis.io/commands/get/">REDIS docs for GET</a>. | ||
* | ||
* @param key The key name | ||
*/ | ||
public CompletableFuture<String> get(String key) { | ||
return submitNewCommand(RequestType.GetString, List.of(key)); | ||
} | ||
|
||
/** | ||
* Async (non-blocking) set.<br> | ||
* See <a href="https://redis.io/commands/set/">REDIS docs for SET</a>. | ||
* | ||
* @param key The key name | ||
* @param value The value to set | ||
*/ | ||
public CompletableFuture<String> set(String key, String value) { | ||
return submitNewCommand(RequestType.SetString, List.of(key, value)); | ||
} | ||
|
||
/** | ||
* Build a command and submit it Netty to send. | ||
* | ||
* @param command Command type | ||
* @param args Command arguments | ||
* @return A result promise | ||
*/ | ||
private CompletableFuture<String> submitNewCommand(RequestType command, List<String> args) { | ||
if (!clientState.isConnected()) { | ||
throw new IllegalStateException("Connection is not open"); | ||
} | ||
|
||
// TODO this explicitly uses ForkJoin thread pool. May be we should use another one. | ||
return CompletableFuture.supplyAsync( | ||
() -> channel.write(RequestBuilder.prepareRedisRequest(command, args), true)) | ||
// TODO: is there a better way to execute this? | ||
.thenComposeAsync(f -> f) | ||
.thenApplyAsync(this::extractValueFromResponse); | ||
} | ||
|
||
/** | ||
* Check response and extract data from it. | ||
* | ||
* @param response A response received from Babushka | ||
* @return A String from the Redis RESP2 response, or Ok. Otherwise, returns null | ||
* @param command | ||
* @param responseHandler | ||
* @return | ||
*/ | ||
private String extractValueFromResponse(Response response) { | ||
if (response.hasRequestError()) { | ||
// TODO do we need to support different types of exceptions and distinguish them by type? | ||
throw new RuntimeException( | ||
String.format( | ||
"%s: %s", | ||
response.getRequestError().getType(), response.getRequestError().getMessage())); | ||
} else if (response.hasClosingError()) { | ||
CompletableFuture.runAsync(channel::close); | ||
clientState.disconnect(); | ||
throw new RuntimeException("Connection closed: " + response.getClosingError()); | ||
} else if (response.hasConstantResponse()) { | ||
return response.getConstantResponse().toString(); | ||
} else if (response.hasRespPointer()) { | ||
return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString(); | ||
} | ||
return null; | ||
public <T> CompletableFuture<T> submitNewCommand( | ||
Command command, Function<Response, T> responseHandler) { | ||
// register callback | ||
// create protobuf message from command | ||
// submit async call | ||
// handle return type in the thenApplyAsync | ||
return new CompletableFuture<>(); | ||
} | ||
} |
71 changes: 0 additions & 71 deletions
71
java/client/src/main/java/babushka/managers/ConnectionManager.java
This file was deleted.
Oops, something went wrong.
60 changes: 0 additions & 60 deletions
60
java/client/src/main/java/babushka/models/RequestBuilder.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters