From 01dfaa6bf866e4f95c05b391280d2f1dd6609135 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 1 Apr 2024 12:44:54 -0700 Subject: [PATCH] Java: Add XADD command (Stream commands) (#155) * Add Stream XADD command to Java Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/BaseClient.java | 18 ++ .../api/commands/StreamBaseCommands.java | 51 +++++ .../glide/api/models/BaseTransaction.java | 36 ++++ .../api/models/commands/StreamAddOptions.java | 177 +++++++++++++++++ .../test/java/glide/api/RedisClientTest.java | 184 ++++++++++++++++++ .../glide/api/models/TransactionTests.java | 24 +++ .../test/java/glide/SharedCommandTests.java | 89 +++++++++ 7 files changed, 579 insertions(+) create mode 100644 java/client/src/main/java/glide/api/commands/StreamBaseCommands.java create mode 100644 java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 1b577e0440..3ef5b7d5ff 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -51,6 +51,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.TTL; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; @@ -67,6 +68,7 @@ import glide.api.commands.SetBaseCommands; import glide.api.commands.SortedSetBaseCommands; import glide.api.commands.StringBaseCommands; +import glide.api.commands.StreamBaseCommands; import glide.api.models.Script; import glide.api.models.commands.ExpireOptions; import glide.api.models.commands.RangeOptions; @@ -74,6 +76,7 @@ import glide.api.models.commands.RangeOptions.ScoredRangeQuery; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.exceptions.RedisException; @@ -108,6 +111,7 @@ public abstract class BaseClient ListBaseCommands, SetBaseCommands, SortedSetBaseCommands, + StreamBaseCommands, HyperLogLogBaseCommands { /** Redis simple string response with "OK" */ @@ -690,6 +694,20 @@ public CompletableFuture zrankWithScore(@NonNull String key, @NonNull Zrank, new String[] {key, member, WITH_SCORE_REDIS_API}, this::handleArrayOrNullResponse); } + @Override + public CompletableFuture xadd(@NonNull String key, @NonNull Map values) { + return xadd(key, values, StreamAddOptions.builder().build()); + } + + @Override + public CompletableFuture xadd( + @NonNull String key, @NonNull Map values, @NonNull StreamAddOptions options) { + String[] arguments = + ArrayUtils.addAll( + ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values)); + return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java new file mode 100644 index 0000000000..5577655ab8 --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -0,0 +1,51 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.commands; + +import glide.api.models.commands.StreamAddOptions; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Supports commands and transactions for the "Stream Commands" group for standalone clients and + * cluster clients. + * + * @see Stream Commands + */ +public interface StreamBaseCommands { + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @return The id of the added entry. + * @example + *
{@code
+     * String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor").get();
+     * System.out.println("Stream: " + streamId);
+     * }
+ */ + CompletableFuture xadd(String key, Map values); + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @param options Stream add options. + * @return The id of the added entry, or null if {@link StreamAddOptions#makeStream} + * is set to false and no stream with the matching key exists. + * @example + *
{@code
+     * // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
+     * StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
+     * String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor"), options).get();
+     * if (streamId != null) {
+     *     assert streamId.equals("sid");
+     * }
+     * }
+ */ + CompletableFuture xadd(String key, Map values, StreamAddOptions options); +} diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index 6f6c2b0dd5..746ca75b0a 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -62,6 +62,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; @@ -82,6 +83,7 @@ import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.ConditionalSet; import glide.api.models.commands.SetOptions.SetOptionsBuilder; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import java.util.Map; import lombok.Getter; @@ -1465,6 +1467,40 @@ public T zrankWithScore(@NonNull String key, @NonNull String member) { return getThis(); } + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @return Command Response - The id of the added entry. + */ + public T xadd(@NonNull String key, @NonNull Map values) { + this.xadd(key, values, StreamAddOptions.builder().build()); + return getThis(); + } + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @param options Stream add options. + * @return Command Response - The id of the added entry, or null if {@link + * StreamAddOptions#makeStream} is set to false and no stream with the matching + * key exists. + */ + public T xadd( + @NonNull String key, @NonNull Map values, @NonNull StreamAddOptions options) { + String[] arguments = + ArrayUtils.addAll( + ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values)); + ArgsArray commandArgs = buildArgs(arguments); + protobufTransaction.addCommands(buildCommand(XAdd, commandArgs)); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java b/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java new file mode 100644 index 0000000000..1f98dca863 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java @@ -0,0 +1,177 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands; + +import glide.api.commands.StreamBaseCommands; +import java.util.ArrayList; +import java.util.List; +import lombok.Builder; +import lombok.NonNull; + +/** + * Optional arguments to {@link StreamBaseCommands#xadd} + * + * @see redis.io + */ +@Builder +public final class StreamAddOptions { + + public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM"; + public static final String ID_WILDCARD_REDIS_API = "*"; + public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN"; + public static final String TRIM_MINID_REDIS_API = "MINID"; + public static final String TRIM_EXACT_REDIS_API = "="; + public static final String TRIM_NOT_EXACT_REDIS_API = "~"; + public static final String TRIM_LIMIT_REDIS_API = "LIMIT"; + + /** If set, the new entry will be added with this id. */ + private final String id; + + /** + * If set to false, a new stream won't be created if no stream matches the given key. + *
+ * Equivalent to NOMKSTREAM in the Redis API. + */ + private final Boolean makeStream; + + /** If set, the add operation will also trim the older entries in the stream. */ + private final StreamTrimOptions trim; + + public abstract static class StreamTrimOptions { + /** + * If true, the stream will be trimmed exactly. Equivalent to = in the + * Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more + * efficient, equivalent to ~ in the Redis API. + */ + protected boolean exact; + + /** If set, sets the maximal amount of entries that will be deleted. */ + protected Long limit; + + protected abstract String getMethod(); + + protected abstract String getThreshold(); + + protected List getRedisApi() { + List optionArgs = new ArrayList<>(); + + optionArgs.add(this.getMethod()); + optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API); + optionArgs.add(this.getThreshold()); + + if (this.limit != null) { + optionArgs.add(TRIM_LIMIT_REDIS_API); + optionArgs.add(this.limit.toString()); + } + + return optionArgs; + } + } + + /** Option to trim the stream according to minimum ID. */ + public static class MinId extends StreamTrimOptions { + /** Trim the stream according to entry ID. Equivalent to MINID in the Redis API. */ + private final String threshold; + + /** + * Create a trim option to trim stream based on stream ID. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison id. + */ + public MinId(boolean exact, @NonNull String threshold) { + this.threshold = threshold; + this.exact = exact; + } + + /** + * Create a trim option to trim stream based on stream ID. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison id. + * @param limit max number of stream entries to be trimmed. + */ + public MinId(boolean exact, @NonNull String threshold, long limit) { + this.threshold = threshold; + this.exact = exact; + this.limit = limit; + } + + @Override + protected String getMethod() { + return TRIM_MINID_REDIS_API; + } + + @Override + protected String getThreshold() { + return threshold; + } + } + + /** Option to trim the stream according to maximum stream length. */ + public static class MaxLen extends StreamTrimOptions { + /** + * Trim the stream according to length.
+ * Equivalent to MAXLEN in the Redis API. + */ + private final Long threshold; + + /** + * Create a Max Length trim option to trim stream based on length. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison count. + */ + public MaxLen(boolean exact, long threshold) { + this.threshold = threshold; + this.exact = exact; + } + + /** + * Create a Max Length trim option to trim stream entries exceeds the threshold. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison count. + * @param limit max number of stream entries to be trimmed. + */ + public MaxLen(boolean exact, long threshold, long limit) { + this.threshold = threshold; + this.exact = exact; + this.limit = limit; + } + + @Override + protected String getMethod() { + return TRIM_MAXLEN_REDIS_API; + } + + @Override + protected String getThreshold() { + return threshold.toString(); + } + } + + /** + * Converts options for Xadd into a String[]. + * + * @return String[] + */ + public String[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (makeStream != null && !makeStream) { + optionArgs.add(NO_MAKE_STREAM_REDIS_API); + } + + if (trim != null) { + optionArgs.addAll(trim.getRedisApi()); + } + + if (id != null) { + optionArgs.add(id); + } else { + optionArgs.add(ID_WILDCARD_REDIS_API); + } + + return optionArgs.toArray(new String[0]); + } +} diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index eca029a1d5..1e669e302d 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -7,7 +7,14 @@ import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_DOES_NOT_EXIST; import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_EXISTS; import static glide.api.models.commands.SetOptions.RETURN_OLD_VALUE; +import static glide.api.models.commands.StreamAddOptions.NO_MAKE_STREAM_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_EXACT_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_LIMIT_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_MAXLEN_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_MINID_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_NOT_EXACT_REDIS_API; import static glide.utils.ArrayTransformUtils.concatenateArrays; +import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray; import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -75,6 +82,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; @@ -98,6 +106,7 @@ import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.Expiry; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.managers.CommandManager; import glide.managers.ConnectionManager; @@ -109,8 +118,12 @@ import java.util.concurrent.CompletableFuture; import lombok.SneakyThrows; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class RedisClientTest { @@ -2200,6 +2213,177 @@ public void zrankWithScore_returns_success() { assertEquals(value, payload); } + @SneakyThrows + @Test + public void xadd_returns_success() { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + String[] fieldValuesArgs = convertMapToKeyValueStringArray(fieldValues); + String[] arguments = new String[] {key, "*"}; + arguments = ArrayUtils.addAll(arguments, fieldValuesArgs); + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + + @SneakyThrows + @Test + public void xadd_with_nomakestream_maxlen_options_returns_success() { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + StreamAddOptions options = + StreamAddOptions.builder() + .id("id") + .makeStream(false) + .trim(new StreamAddOptions.MaxLen(true, 5L)) + .build(); + + String[] arguments = + new String[] { + key, + NO_MAKE_STREAM_REDIS_API, + TRIM_MAXLEN_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + "id" + }; + arguments = ArrayUtils.addAll(arguments, convertMapToKeyValueStringArray(fieldValues)); + + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues, options); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + + private static List getStreamAddOptions() { + return List.of( + Arguments.of( + Pair.of( + // no TRIM option + StreamAddOptions.builder().id("id").makeStream(Boolean.FALSE).build(), + new String[] {"testKey", NO_MAKE_STREAM_REDIS_API, "id"}), + Pair.of( + // MAXLEN with LIMIT + StreamAddOptions.builder() + .id("id") + .makeStream(Boolean.TRUE) + .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 5L, 10L)) + .build(), + new String[] { + "testKey", + TRIM_MAXLEN_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + TRIM_LIMIT_REDIS_API, + Long.toString(10L), + "id" + }), + Pair.of( + // MAXLEN with non exact match + StreamAddOptions.builder() + .makeStream(Boolean.FALSE) + .trim(new StreamAddOptions.MaxLen(Boolean.FALSE, 2L)) + .build(), + new String[] { + "testKey", + NO_MAKE_STREAM_REDIS_API, + TRIM_MAXLEN_REDIS_API, + TRIM_NOT_EXACT_REDIS_API, + Long.toString(2L), + "*" + }), + Pair.of( + // MIN ID with LIMIT + StreamAddOptions.builder() + .id("id") + .makeStream(Boolean.TRUE) + .trim(new StreamAddOptions.MinId(Boolean.TRUE, "testKey", 10L)) + .build(), + new String[] { + "testKey", + TRIM_MINID_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + TRIM_LIMIT_REDIS_API, + Long.toString(10L), + "id" + }), + Pair.of( + // MIN ID with non exact match + StreamAddOptions.builder() + .makeStream(Boolean.FALSE) + .trim(new StreamAddOptions.MinId(Boolean.FALSE, "testKey")) + .build(), + new String[] { + "testKey", + NO_MAKE_STREAM_REDIS_API, + TRIM_MINID_REDIS_API, + TRIM_NOT_EXACT_REDIS_API, + Long.toString(5L), + "*" + }))); + } + + @SneakyThrows + @ParameterizedTest + @MethodSource("getStreamAddOptions") + public void xadd_with_options_returns_success(Pair optionAndArgs) { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + String[] arguments = + ArrayUtils.addAll(optionAndArgs.getRight(), convertMapToKeyValueStringArray(fieldValues)); + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues, optionAndArgs.getLeft()); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 3a2a996192..23f9680aed 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -60,6 +60,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMax; import static redis_request.RedisRequestOuterClass.RequestType.ZPopMin; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; @@ -76,6 +77,7 @@ import glide.api.models.commands.RangeOptions.RangeByScore; import glide.api.models.commands.RangeOptions.ScoreBoundary; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -442,6 +444,28 @@ public void transaction_builds_protobuf_request(BaseTransaction transaction) .addArgs(WITH_SCORE_REDIS_API) .build())); + transaction.xadd("key", Map.of("field1", "foo1")); + results.add( + Pair.of( + XAdd, + ArgsArray.newBuilder() + .addArgs("key") + .addArgs("*") + .addArgs("field1") + .addArgs("foo1") + .build())); + + transaction.xadd("key", Map.of("field1", "foo1"), StreamAddOptions.builder().id("id").build()); + results.add( + Pair.of( + XAdd, + ArgsArray.newBuilder() + .addArgs("key") + .addArgs("id") + .addArgs("field1") + .addArgs("foo1") + .build())); + transaction.time(); results.add(Pair.of(Time, ArgsArray.newBuilder().build())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 5a2c984ea5..824228bc5b 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -31,6 +31,7 @@ import glide.api.models.commands.RangeOptions.ScoreBoundary; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.NodeAddress; import glide.api.models.configuration.RedisClientConfiguration; @@ -1273,6 +1274,94 @@ public void zrank(BaseClient client) { assertTrue(executionException.getCause() instanceof RequestException); } + @SneakyThrows + @ParameterizedTest + @MethodSource("getClients") + public void xadd(BaseClient client) { + String key = UUID.randomUUID().toString(); + String field1 = UUID.randomUUID().toString(); + String field2 = UUID.randomUUID().toString(); + + assertNull( + client + .xadd( + key, + Map.of(field1, "foo0", field2, "bar0"), + StreamAddOptions.builder().makeStream(Boolean.FALSE).build()) + .get()); + + String timestamp1 = "0-1"; + assertEquals( + timestamp1, + client + .xadd( + key, + Map.of(field1, "foo1", field2, "bar1"), + StreamAddOptions.builder().id(timestamp1).build()) + .get()); + + assertNotNull(client.xadd(key, Map.of(field1, "foo2", field2, "bar2")).get()); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + // this will trim the first entry. + String id = + client + .xadd( + key, + Map.of(field1, "foo3", field2, "bar3"), + StreamAddOptions.builder() + .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 2L)) + .build()) + .get(); + assertNotNull(id); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + // this will trim the second entry. + assertNotNull( + client + .xadd( + key, + Map.of(field1, "foo4", field2, "bar4"), + StreamAddOptions.builder() + .trim(new StreamAddOptions.MinId(Boolean.TRUE, id)) + .build()) + .get()); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + /** + * TODO add test to XTRIM on maxlen expect( await client.xtrim(key, { method: "maxlen", + * threshold: 1, exact: true, }), ).toEqual(1); expect(await client.customCommand(["XLEN", + * key])).toEqual(1); + */ + } + @SneakyThrows @ParameterizedTest @MethodSource("getClients")