Skip to content

Commit

Permalink
Java: Add XADD command (Stream commands) (valkey-io#1209)
Browse files Browse the repository at this point in the history
* Java: Add XADD command (Stream commands) (#155)

Signed-off-by: Andrew Carbonetto <[email protected]>
Co-authored-by: SanHalacogluImproving <[email protected]>
  • Loading branch information
2 people authored and cyip10 committed Jun 24, 2024
1 parent c33e0e3 commit 49bb9f0
Show file tree
Hide file tree
Showing 8 changed files with 598 additions and 10 deletions.
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.TTL;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
Expand All @@ -66,6 +67,7 @@
import glide.api.commands.ListBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringBaseCommands;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
Expand All @@ -74,6 +76,7 @@
import glide.api.models.commands.RangeOptions.ScoredRangeQuery;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -108,6 +111,7 @@ public abstract class BaseClient
ListBaseCommands,
SetBaseCommands,
SortedSetBaseCommands,
StreamBaseCommands,
HyperLogLogBaseCommands {

/** Redis simple string response with "OK" */
Expand Down Expand Up @@ -690,6 +694,20 @@ public CompletableFuture<Object[]> zrankWithScore(@NonNull String key, @NonNull
Zrank, new String[] {key, member, WITH_SCORE_REDIS_API}, this::handleArrayOrNullResponse);
}

@Override
public CompletableFuture<String> xadd(@NonNull String key, @NonNull Map<String, String> values) {
return xadd(key, values, StreamAddOptions.builder().build());
}

@Override
public CompletableFuture<String> xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.commands.StreamAddOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Supports commands and transactions for the "Stream Commands" group for standalone and cluster
* clients.
*
* @see <a href="https://redis.io/commands/?group=stream">Stream Commands</a>
*/
public interface StreamBaseCommands {

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
* @example
* <pre>{@code
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor").get();
* System.out.println("Stream: " + streamId);
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values);

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return The id of the added entry, or <code>null</code> if {@link StreamAddOptions#makeStream}
* is set to <code>false</code> and no stream with the matching <code>key</code> exists.
* @example
* <pre>{@code
* // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
* StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor"), options).get();
* if (streamId != null) {
* assert streamId.equals("sid");
* }
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);
}
36 changes: 36 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax;
import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
Expand All @@ -82,6 +83,7 @@
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.SetOptions.ConditionalSet;
import glide.api.models.commands.SetOptions.SetOptionsBuilder;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import java.util.Map;
import lombok.Getter;
Expand Down Expand Up @@ -1465,6 +1467,40 @@ public T zrankWithScore(@NonNull String key, @NonNull String member) {
return getThis();
}

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
*/
public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
this.xadd(key, values, StreamAddOptions.builder().build());
return getThis();
}

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return Command Response - The id of the added entry, or <code>null</code> if {@link
* StreamAddOptions#makeStream} is set to <code>false</code> and no stream with the matching
* <code>key</code> exists.
*/
public T xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
ArgsArray commandArgs = buildArgs(arguments);
protobufTransaction.addCommands(buildCommand(XAdd, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import lombok.NonNull;

/**
* Optional arguments to {@link StreamBaseCommands#xadd}
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a>
*/
@Builder
public final class StreamAddOptions {

public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM";
public static final String ID_WILDCARD_REDIS_API = "*";
public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN";
public static final String TRIM_MINID_REDIS_API = "MINID";
public static final String TRIM_EXACT_REDIS_API = "=";
public static final String TRIM_NOT_EXACT_REDIS_API = "~";
public static final String TRIM_LIMIT_REDIS_API = "LIMIT";

/** If set, the new entry will be added with this <code>id</code>. */
private final String id;

/**
* If set to <code>false</code>, a new stream won't be created if no stream matches the given key.
* <br>
* Equivalent to <code>NOMKSTREAM</code> in the Redis API.
*/
private final Boolean makeStream;

/** If set, the add operation will also trim the older entries in the stream. */
private final StreamTrimOptions trim;

public abstract static class StreamTrimOptions {
/**
* If <code>true</code>, the stream will be trimmed exactly. Equivalent to <code>=</code> in the
* Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more
* efficient, equivalent to <code>~</code> in the Redis API.
*/
protected boolean exact;

/** If set, sets the maximal amount of entries that will be deleted. */
protected Long limit;

protected abstract String getMethod();

protected abstract String getThreshold();

protected List<String> getRedisApi() {
List<String> optionArgs = new ArrayList<>();

optionArgs.add(this.getMethod());
optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API);
optionArgs.add(this.getThreshold());

if (this.limit != null) {
optionArgs.add(TRIM_LIMIT_REDIS_API);
optionArgs.add(this.limit.toString());
}

return optionArgs;
}
}

/** Option to trim the stream according to minimum ID. */
public static class MinId extends StreamTrimOptions {
/** Trim the stream according to entry ID. Equivalent to <code>MINID</code> in the Redis API. */
private final String threshold;

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison id.
*/
public MinId(boolean exact, @NonNull String threshold) {
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison id.
* @param limit Max number of stream entries to be trimmed.
*/
public MinId(boolean exact, @NonNull String threshold, long limit) {
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MINID_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold;
}
}

/** Option to trim the stream according to maximum stream length. */
public static class MaxLen extends StreamTrimOptions {
/**
* Trim the stream according to length.<br>
* Equivalent to <code>MAXLEN</code> in the Redis API.
*/
private final Long threshold;

/**
* Create a Max Length trim option to trim stream based on length.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison count.
*/
public MaxLen(boolean exact, long threshold) {
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a Max Length trim option to trim stream entries exceeds the threshold.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison count.
* @param limit Max number of stream entries to be trimmed.
*/
public MaxLen(boolean exact, long threshold, long limit) {
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MAXLEN_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold.toString();
}
}

/**
* Converts options for Xadd into a String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (makeStream != null && !makeStream) {
optionArgs.add(NO_MAKE_STREAM_REDIS_API);
}

if (trim != null) {
optionArgs.addAll(trim.getRedisApi());
}

if (id != null) {
optionArgs.add(id);
} else {
optionArgs.add(ID_WILDCARD_REDIS_API);
}

return optionArgs.toArray(new String[0]);
}
}
Loading

0 comments on commit 49bb9f0

Please sign in to comment.