Skip to content

Commit

Permalink
Java: Add XADD command (Stream commands) (#155)
Browse files Browse the repository at this point in the history
* Add Stream XADD command to Java

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Apr 1, 2024
1 parent a38b45f commit 46cc4df
Show file tree
Hide file tree
Showing 7 changed files with 580 additions and 1 deletion.
20 changes: 19 additions & 1 deletion java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.TTL;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
import static redis_request.RedisRequestOuterClass.RequestType.Zadd;
Expand All @@ -58,11 +59,13 @@
import glide.api.commands.ListBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringCommands;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -96,7 +99,8 @@ public abstract class BaseClient
HashBaseCommands,
ListBaseCommands,
SetBaseCommands,
SortedSetBaseCommands {
SortedSetBaseCommands,
StreamBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();
Expand Down Expand Up @@ -631,6 +635,20 @@ public CompletableFuture<Double> zscore(@NonNull String key, @NonNull String mem
ZScore, new String[] {key, member}, this::handleDoubleOrNullResponse);
}

@Override
public CompletableFuture<String> xadd(@NonNull String key, @NonNull Map<String, String> values) {
return xadd(key, values, StreamAddOptions.builder().build());
}

@Override
public CompletableFuture<String> xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.commands.StreamAddOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Supports commands and transactions for the "Stream Commands" group for standalone clients and
* cluster clients.
*
* @see <a href="https://redis.io/commands/?group=stream">Stream Commands</a>
*/
public interface StreamBaseCommands {

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
* @example
* <pre>{@code
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor").get();
* System.out.println("Stream: " + streamId);
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values);

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return The id of the added entry, or <code>null</code> if {@link StreamAddOptions#makeStream}
* is set to <code>false</code> and no stream with the matching <code>key</code> exists.
* @example
* <pre>{@code
* // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
* StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor"), options).get();
* if (streamId != null) {
* assert streamId.equals("sid");
* }
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);
}
36 changes: 36 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
import static redis_request.RedisRequestOuterClass.RequestType.Zadd;
Expand All @@ -68,6 +69,7 @@
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.SetOptions.ConditionalSet;
import glide.api.models.commands.SetOptions.SetOptionsBuilder;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import java.util.Map;
import lombok.Getter;
Expand Down Expand Up @@ -1329,6 +1331,40 @@ public T zscore(@NonNull String key, @NonNull String member) {
return getThis();
}

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
*/
public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
this.xadd(key, values, StreamAddOptions.builder().build());
return getThis();
}

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return Command Response - The id of the added entry, or <code>null</code> if {@link
* StreamAddOptions#makeStream} is set to <code>false</code> and no stream with the matching
* <code>key</code> exists.
*/
public T xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
ArgsArray commandArgs = buildArgs(arguments);
protobufTransaction.addCommands(buildCommand(XAdd, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import lombok.NonNull;

/**
* Optional arguments to {@link StreamBaseCommands#xadd}
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a>
*/
@Builder
public final class StreamAddOptions {

public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM";
public static final String ID_WILDCARD_REDIS_API = "*";
public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN";
public static final String TRIM_MINID_REDIS_API = "MINID";
public static final String TRIM_EXACT_REDIS_API = "=";
public static final String TRIM_NOT_EXACT_REDIS_API = "~";
public static final String TRIM_LIMIT_REDIS_API = "LIMIT";

/** If set, the new entry will be added with this <code>id</code>. */
private final String id;

/**
* If set to <code>false</code>, a new stream won't be created if no stream matches the given key.
* <br>
* Equivalent to <code>NOMKSTREAM</code> in the Redis API.
*/
private final Boolean makeStream;

/** If set, the add operation will also trim the older entries in the stream. */
private final StreamTrimOptions trim;

public abstract static class StreamTrimOptions {
/**
* If <code>true</code>, the stream will be trimmed exactly. Equivalent to <code>=</code> in the
* Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more
* efficient, equivalent to <code>~</code> in the Redis API.
*/
protected boolean exact;

/** If set, sets the maximal amount of entries that will be deleted. */
protected Long limit;

protected abstract String getMethod();

protected abstract String getThreshold();

protected List<String> getRedisApi() {
List<String> optionArgs = new ArrayList<>();

optionArgs.add(this.getMethod());
optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API);
optionArgs.add(this.getThreshold());

if (this.limit != null) {
optionArgs.add(TRIM_LIMIT_REDIS_API);
optionArgs.add(this.limit.toString());
}

return optionArgs;
}
}

/** Option to trim the stream according to minimum ID. */
public static class MinId extends StreamTrimOptions {
/** Trim the stream according to entry ID. Equivalent to <code>MINID</code> in the Redis API. */
private final String threshold;

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison id.
*/
public MinId(boolean exact, @NonNull String threshold) {
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison id.
* @param limit max number of stream entries to be trimmed.
*/
public MinId(boolean exact, @NonNull String threshold, long limit) {
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MINID_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold;
}
}

/** Option to trim the stream according to maximum stream length. */
public static class MaxLen extends StreamTrimOptions {
/**
* Trim the stream according to length.<br>
* Equivalent to <code>MAXLEN</code> in the Redis API.
*/
private final Long threshold;

/**
* Create a Max Length trim option to trim stream based on length.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison count.
*/
public MaxLen(boolean exact, long threshold) {
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a Max Length trim option to trim stream entries exceeds the threshold.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison count.
* @param limit max number of stream entries to be trimmed.
*/
public MaxLen(boolean exact, long threshold, long limit) {
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MAXLEN_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold.toString();
}
}

/**
* Converts options for Xadd into a String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (makeStream != null && !makeStream) {
optionArgs.add(NO_MAKE_STREAM_REDIS_API);
}

if (trim != null) {
optionArgs.addAll(trim.getRedisApi());
}

if (id != null) {
optionArgs.add(id);
} else {
optionArgs.add(ID_WILDCARD_REDIS_API);
}

return optionArgs.toArray(new String[0]);
}
}
Loading

0 comments on commit 46cc4df

Please sign in to comment.