Skip to content

Commit

Permalink
Java: Pub & Sub = <3 (valkey-io#1662)
Browse files Browse the repository at this point in the history
* Java: Add client configuration for subscribing to channels. (#381)

* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* More TODOs for the god of TODOs.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add `PUBLISH` and `SPUBLISH` commands. (#391)

* Add `PUBLISH` and `SPUBLISH` commands.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix the test.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Java client: receive pubsub messages (#385)

* Add client configuartion for subscribing to channels.

Signed-off-by: Yury-Fridlyand <[email protected]>

* CLIPPY I HATE YOU

Signed-off-by: Yury-Fridlyand <[email protected]>

* Get and store callback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rework configuration and add docs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Config rework.

Signed-off-by: Yury-Fridlyand <[email protected]>

* docs

Signed-off-by: Yury-Fridlyand <[email protected]>

* Receive pushes (subscibed messages).

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rename a class.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Java: add IT for pubsub (#400)

* Add some tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Test fixes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Experiment

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add more tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* I HATE YOU SPOTLESS

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Uncomment test timeout.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Typo fix.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Update function signature.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Jul 1, 2024
1 parent d5bafac commit eb2201c
Show file tree
Hide file tree
Showing 36 changed files with 1,990 additions and 100 deletions.
147 changes: 126 additions & 21 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.PfAdd;
import static redis_request.RedisRequestOuterClass.RequestType.PfCount;
import static redis_request.RedisRequestOuterClass.RequestType.PfMerge;
import static redis_request.RedisRequestOuterClass.RequestType.Publish;
import static redis_request.RedisRequestOuterClass.RequestType.RPop;
import static redis_request.RedisRequestOuterClass.RequestType.RPush;
import static redis_request.RedisRequestOuterClass.RequestType.RPushX;
Expand Down Expand Up @@ -183,13 +184,15 @@
import glide.api.commands.HashBaseCommands;
import glide.api.commands.HyperLogLogBaseCommands;
import glide.api.commands.ListBaseCommands;
import glide.api.commands.PubSubBaseCommands;
import glide.api.commands.ScriptingAndFunctionsBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.GlideString;
import glide.api.models.PubSubMessage;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.GetExOptions;
Expand Down Expand Up @@ -230,33 +233,38 @@
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.configuration.BaseSubscriptionConfiguration;
import glide.api.models.exceptions.ConfigurationError;
import glide.api.models.exceptions.RedisException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.MessageHandler;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
import glide.ffi.resolvers.RedisValueResolver;
import glide.managers.BaseCommandResponseResolver;
import glide.managers.BaseResponseResolver;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import response.ResponseOuterClass.ConstantResponse;
import response.ResponseOuterClass.Response;

/** Base Client class for Redis */
@AllArgsConstructor
public abstract class BaseClient
implements AutoCloseable,
BitmapBaseCommands,
Expand All @@ -270,13 +278,41 @@ public abstract class BaseClient
HyperLogLogBaseCommands,
GeospatialIndicesBaseCommands,
ScriptingAndFunctionsBaseCommands,
TransactionsBaseCommands {
TransactionsBaseCommands,
PubSubBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();

protected final ConnectionManager connectionManager;
protected final CommandManager commandManager;
protected final ConnectionManager connectionManager;
protected final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
protected final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;

/** Helper which extracts data from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver responseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointer);

/** Helper which extracts data with binary strings from received {@link Response}s from GLIDE. */
private static final BaseResponseResolver binaryResponseResolver =
new BaseResponseResolver(RedisValueResolver::valueFromPointerBinary);

/** A constructor. */
protected BaseClient(ClientBuilder builder) {
this.connectionManager = builder.connectionManager;
this.commandManager = builder.commandManager;
this.messageQueue = builder.messageQueue;
this.subscriptionConfiguration = builder.subscriptionConfiguration;
}

/** Auxiliary builder which wraps all fields to be initialized in the constructor. */
@RequiredArgsConstructor
protected static class ClientBuilder {
private final ConnectionManager connectionManager;
private final CommandManager commandManager;
private final ConcurrentLinkedDeque<PubSubMessage> messageQueue;
private final Optional<BaseSubscriptionConfiguration> subscriptionConfiguration;
}

/**
* Async request for an async (non-blocking) Redis client.
Expand All @@ -286,31 +322,81 @@ public abstract class BaseClient
* @param <T> Client type.
* @return a Future to connect and return a RedisClient.
*/
protected static <T> CompletableFuture<T> CreateClient(
BaseClientConfiguration config,
BiFunction<ConnectionManager, CommandManager, T> constructor) {
protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
@NonNull BaseClientConfiguration config, Function<ClientBuilder, T> constructor) {
try {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
MessageHandler messageHandler = buildMessageHandler(config);
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource, messageHandler);
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> constructor.apply(connectionManager, commandManager));
.thenApply(
ignored ->
constructor.apply(
new ClientBuilder(
connectionManager,
commandManager,
messageHandler.getQueue(),
Optional.ofNullable(config.getSubscriptionConfiguration()))));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to
// UDS
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<T>();
future.completeExceptionally(e);
return future;
}
}

/**
* Tries to return a next pubsub message.
*
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
* @return A message if any or <code>null</code> if there are no unread messages.
*/
public PubSubMessage tryGetPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new ConfigurationError(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
return messageQueue.poll();
}

/**
* Returns a promise for a next pubsub message.
*
* @apiNote <b>Not implemented!</b>
* @throws ConfigurationError If client is not subscribed to any channel or if client configured
* with a callback.
* @return A <code>Future</code> which resolved with the next incoming message.
*/
public CompletableFuture<PubSubMessage> getPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
+ " client.");
}
if (subscriptionConfiguration.get().getCallback().isPresent()) {
throw new ConfigurationError(
"The operation will never complete since messages will be passed to the configured"
+ " callback.");
}
throw new NotImplementedException(
"This feature will be supported in a future release of the GLIDE java client");
}

/**
* Closes this resource, relinquishing any underlying resources. This method is invoked
* automatically on objects managed by the try-with-resources statement.
Expand All @@ -323,15 +409,25 @@ public void close() throws ExecutionException {
try {
connectionManager.closeConnection().get();
} catch (InterruptedException e) {
// suppressing the interrupted exception - it is already suppressed in the
// future
// suppressing the interrupted exception - it is already suppressed in the future
throw new RuntimeException(e);
}
}

protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource)
protected static MessageHandler buildMessageHandler(BaseClientConfiguration config) {
if (config.getSubscriptionConfiguration() == null) {
return new MessageHandler(Optional.empty(), Optional.empty(), responseResolver);
}
return new MessageHandler(
config.getSubscriptionConfiguration().getCallback(),
config.getSubscriptionConfiguration().getContext(),
responseResolver);
}

protected static ChannelHandler buildChannelHandler(
ThreadPoolResource threadPoolResource, MessageHandler messageHandler)
throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
CallbackDispatcher callbackDispatcher = new CallbackDispatcher(messageHandler);
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

Expand Down Expand Up @@ -367,10 +463,7 @@ protected <T> T handleRedisResponse(
boolean encodingUtf8 = flags.contains(ResponseFlags.ENCODING_UTF8);
boolean isNullable = flags.contains(ResponseFlags.IS_NULLABLE);
Object value =
encodingUtf8
? new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer).apply(response)
: new BaseCommandResponseResolver(RedisValueResolver::valueFromPointerBinary)
.apply(response);
encodingUtf8 ? responseResolver.apply(response) : binaryResponseResolver.apply(response);
if (isNullable && (value == null)) {
return null;
}
Expand Down Expand Up @@ -2821,6 +2914,18 @@ public CompletableFuture<Map<String, Object>> lcsIdxWithMatchLen(
return commandManager.submitNewCommand(LCS, arguments, this::handleMapResponse);
}

@Override
public CompletableFuture<String> publish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
Publish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String> watch(@NonNull String[] keys) {
return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse);
Expand Down
13 changes: 7 additions & 6 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import glide.api.models.commands.SortOptions;
import glide.api.models.commands.function.FunctionRestorePolicy;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,15 +71,18 @@ public class RedisClient extends BaseClient
ScriptingAndFunctionsCommands,
TransactionsCommands {

protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
/**
* A constructor. Use {@link #CreateClient} to get a client. Made protected to simplify testing.
*/
protected RedisClient(ClientBuilder builder) {
super(builder);
}

/**
* Async request for an async (non-blocking) Redis client in Standalone mode.
*
* @param config Redis client Configuration
* @return A Future to connect and return a RedisClient
* @param config Redis client Configuration.
* @return A Future to connect and return a RedisClient.
*/
public static CompletableFuture<RedisClient> CreateClient(
@NonNull RedisClientConfiguration config) {
Expand Down
28 changes: 21 additions & 7 deletions java/client/src/main/java/glide/api/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import static redis_request.RedisRequestOuterClass.RequestType.Lolwut;
import static redis_request.RedisRequestOuterClass.RequestType.Ping;
import static redis_request.RedisRequestOuterClass.RequestType.RandomKey;
import static redis_request.RedisRequestOuterClass.RequestType.SPublish;
import static redis_request.RedisRequestOuterClass.RequestType.Sort;
import static redis_request.RedisRequestOuterClass.RequestType.SortReadOnly;
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;

import glide.api.commands.ConnectionManagementClusterCommands;
import glide.api.commands.GenericClusterCommands;
import glide.api.commands.PubSubClusterCommands;
import glide.api.commands.ScriptingAndFunctionsClusterCommands;
import glide.api.commands.ServerManagementClusterCommands;
import glide.api.commands.TransactionsClusterCommands;
Expand All @@ -57,8 +59,6 @@
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SingleNodeRoute;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -77,17 +77,19 @@ public class RedisClusterClient extends BaseClient
GenericClusterCommands,
ServerManagementClusterCommands,
ScriptingAndFunctionsClusterCommands,
TransactionsClusterCommands {
TransactionsClusterCommands,
PubSubClusterCommands {

protected RedisClusterClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
/** A private constructor. Use {@link #CreateClient} to get a client. */
RedisClusterClient(ClientBuilder builder) {
super(builder);
}

/**
* Async request for an async (non-blocking) Redis client in Cluster mode.
*
* @param config Redis cluster client Configuration
* @return A Future to connect and return a RedisClusterClient
* @param config Redis cluster client Configuration.
* @return A Future to connect and return a RedisClusterClient.
*/
public static CompletableFuture<RedisClusterClient> CreateClient(
@NonNull RedisClusterClientConfiguration config) {
Expand Down Expand Up @@ -780,6 +782,18 @@ public CompletableFuture<String> randomKey() {
RandomKey, new String[0], this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<String> spublish(@NonNull String channel, @NonNull String message) {
return commandManager.submitNewCommand(
SPublish,
new String[] {channel, message},
response -> {
// Check, but ignore the number - it is never valid. A GLIDE bug/limitation TODO
handleLongResponse(response);
return OK;
});
}

@Override
public CompletableFuture<String[]> sort(
@NonNull String key, @NonNull SortClusterOptions sortClusterOptions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import java.util.concurrent.CompletableFuture;

/**
* Supports commands for the "Pub/Sub" group for standalone and cluster clients.
*
* @see <a href="https://redis.io/docs/latest/commands/?group=pubsub">Pub/Sub Commands</a>
*/
public interface PubSubBaseCommands {

/**
* Publishes message on pubsub channel.
*
* @see <a href="https://valkey.io/commands/publish/">redis.io</a> for details.
* @param channel The channel to publish the message on.
* @param message The message to publish.
* @return <code>OK</code>.
* @example
* <pre>{@code
* String response = client.publish("announcements", "The cat said 'meow'!").get();
* assert response.equals("OK");
* }</pre>
*/
CompletableFuture<String> publish(String channel, String message);
}
Loading

0 comments on commit eb2201c

Please sign in to comment.