Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add XADD command (Stream commands) #155

Merged
merged 7 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.TTL;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
import static redis_request.RedisRequestOuterClass.RequestType.Zadd;
import static redis_request.RedisRequestOuterClass.RequestType.Zcard;
Expand All @@ -57,11 +58,13 @@
import glide.api.commands.ListBaseCommands;
import glide.api.commands.SetBaseCommands;
import glide.api.commands.SortedSetBaseCommands;
import glide.api.commands.StreamBaseCommands;
import glide.api.commands.StringCommands;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -95,7 +98,8 @@ public abstract class BaseClient
HashBaseCommands,
ListBaseCommands,
SetBaseCommands,
SortedSetBaseCommands {
SortedSetBaseCommands,
StreamBaseCommands {

/** Redis simple string response with "OK" */
public static final String OK = ConstantResponse.OK.toString();
Expand Down Expand Up @@ -619,6 +623,20 @@ public CompletableFuture<Double> zscore(@NonNull String key, @NonNull String mem
ZScore, new String[] {key, member}, this::handleDoubleOrNullResponse);
}

@Override
public CompletableFuture<String> xadd(@NonNull String key, @NonNull Map<String, String> values) {
return xadd(key, values, StreamAddOptions.builder().build());
}

@Override
public CompletableFuture<String> xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.commands.StreamAddOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Supports commands and transactions for the "Stream Commands" group for standalone clients and
* cluster clients.
*
* @see <a href="https://redis.io/commands/?group=stream">Stream Commands</a>
*/
public interface StreamBaseCommands {

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return The id of the added entry.
* @example
* <pre>{@code
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor").get();
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* System.out.println("Stream: " + streamId);
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values);

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return The id of the added entry, or <code>null</code> if {@link StreamAddOptions#makeStream}
* is set to <code>false</code> and no stream with the matching <code>key</code> exists.
* @example
* <pre>{@code
* // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
* StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
* String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor"), options).get();
* if (streamId != null) {
* assert streamId.equals("sid");
* }
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);
}
36 changes: 36 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Time;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZScore;
import static redis_request.RedisRequestOuterClass.RequestType.Zadd;
import static redis_request.RedisRequestOuterClass.RequestType.Zcard;
Expand All @@ -67,6 +68,7 @@
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.SetOptions.ConditionalSet;
import glide.api.models.commands.SetOptions.SetOptionsBuilder;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.ZaddOptions;
import java.util.Map;
import lombok.Getter;
Expand Down Expand Up @@ -1292,6 +1294,40 @@ public T zscore(@NonNull String key, @NonNull String member) {
return getThis();
}

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @return Command Response - The id of the added entry.
*/
public T xadd(@NonNull String key, @NonNull Map<String, String> values) {
this.xadd(key, values, StreamAddOptions.builder().build());
return getThis();
}
SanHalacogluImproving marked this conversation as resolved.
Show resolved Hide resolved

/**
* Adds an entry to the specified stream.
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a> for details.
* @param key The key of the stream.
* @param values Field-value pairs to be added to the entry.
* @param options Stream add options.
* @return Command Response - The id of the added entry, or <code>null</code> if {@link
* StreamAddOptions#makeStream} is set to <code>false</code> and no stream with the matching
* <code>key</code> exists.
*/
public T xadd(
@NonNull String key, @NonNull Map<String, String> values, @NonNull StreamAddOptions options) {
String[] arguments =
ArrayUtils.addAll(
ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values));
ArgsArray commandArgs = buildArgs(arguments);
protobufTransaction.addCommands(buildCommand(XAdd, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import lombok.Builder;
import lombok.NonNull;

/**
* Optional arguments to {@link StreamBaseCommands#xadd}
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a>
*/
@Builder
public final class StreamAddOptions {

public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM";
SanHalacogluImproving marked this conversation as resolved.
Show resolved Hide resolved
public static final String ID_WILDCARD_REDIS_API = "*";
public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN";
public static final String TRIM_MINID_REDIS_API = "MINID";
public static final String TRIM_EXACT_REDIS_API = "=";
public static final String TRIM_NOT_EXACT_REDIS_API = "~";
public static final String TRIM_LIMIT_REDIS_API = "LIMIT";

/** If set, the new entry will be added with this <code>id</code>. */
private final String id;

/**
* If set to <code>false</code>, a new stream won't be created if no stream matches the given key.
* <br>
* Equivalent to <code>NOMKSTREAM</code> in the Redis API.
*/
private final Boolean makeStream;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/** If set, the add operation will also trim the older entries in the stream. */
private final StreamTrimOptions trim;

public abstract static class StreamTrimOptions {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
/**
* If <code>true</code>, the stream will be trimmed exactly. Equivalent to <code>=</code> in the
* Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more
* efficient, equivalent to <code>~</code> in the Redis API.
*/
protected boolean exact;

/** If set, sets the maximal amount of entries that will be deleted. */
protected Long limit;

protected abstract String getMethod();

protected abstract String getThreshold();

protected List<String> getRedisApi() {
List<String> optionArgs = new ArrayList<>();

optionArgs.add(this.getMethod());
optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API);
optionArgs.add(this.getThreshold());

if (this.limit != null) {
optionArgs.add(TRIM_LIMIT_REDIS_API);
optionArgs.add(this.limit.toString());
}

return optionArgs;
}
}

/** Option to trim the stream according to minimum ID. */
public static class MinId extends StreamTrimOptions {
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
/** Trim the stream according to entry ID. Equivalent to <code>MINID</code> in the Redis API. */
private final String threshold;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison id.
*/
public MinId(boolean exact, @NonNull String threshold) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison id.
* @param limit max number of stream entries to be trimmed.
*/
public MinId(boolean exact, @NonNull String threshold, long limit) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MINID_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold;
}
}

/** Option to trim the stream according to maximum stream length. */
public static class MaxLen extends StreamTrimOptions {
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
/**
* Trim the stream according to length.<br>
* Equivalent to <code>MAXLEN</code> in the Redis API.
*/
private final Long threshold;

/**
* Create a Max Length trim option to trim stream based on length.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison count.
*/
public MaxLen(boolean exact, long threshold) {
this.threshold = threshold;
this.exact = exact;
}

/**
* Create a Max Length trim option to trim stream entries exceeds the threshold.
*
* @param exact whether to match exactly on the threshold.
* @param threshold comparison count.
* @param limit max number of stream entries to be trimmed.
*/
public MaxLen(boolean exact, long threshold, long limit) {
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

@Override
protected String getMethod() {
return TRIM_MAXLEN_REDIS_API;
}

@Override
protected String getThreshold() {
return threshold.toString();
}
}

/**
* Converts options for Xadd into a String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (makeStream != null && !makeStream) {
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
optionArgs.add(NO_MAKE_STREAM_REDIS_API);
}

if (trim != null) {
optionArgs.addAll(trim.getRedisApi());
}

if (id != null) {
optionArgs.add(id);
} else {
optionArgs.add(ID_WILDCARD_REDIS_API);
}

return optionArgs.toArray(new String[0]);
}
}
Loading
Loading