From 187dd42396f55989d4a4210dc70eef8ff14ce625 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Mon, 1 Apr 2024 12:44:54 -0700 Subject: [PATCH] Java: Add XADD command (Stream commands) (#155) * Add Stream XADD command to Java Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/BaseClient.java | 20 +- .../api/commands/StreamBaseCommands.java | 51 +++++ .../glide/api/models/BaseTransaction.java | 36 ++++ .../api/models/commands/StreamAddOptions.java | 177 +++++++++++++++++ .../test/java/glide/api/RedisClientTest.java | 184 ++++++++++++++++++ .../glide/api/models/TransactionTests.java | 24 +++ .../test/java/glide/SharedCommandTests.java | 89 +++++++++ 7 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 java/client/src/main/java/glide/api/commands/StreamBaseCommands.java create mode 100644 java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 784f63f474..a99596a4a2 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -47,6 +47,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.TTL; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; import static redis_request.RedisRequestOuterClass.RequestType.Zadd; import static redis_request.RedisRequestOuterClass.RequestType.Zcard; @@ -57,11 +58,13 @@ import glide.api.commands.ListBaseCommands; import glide.api.commands.SetBaseCommands; import glide.api.commands.SortedSetBaseCommands; +import glide.api.commands.StreamBaseCommands; import glide.api.commands.StringCommands; import glide.api.models.Script; import glide.api.models.commands.ExpireOptions; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.exceptions.RedisException; @@ -95,7 +98,8 @@ public abstract class BaseClient HashBaseCommands, ListBaseCommands, SetBaseCommands, - SortedSetBaseCommands { + SortedSetBaseCommands, + StreamBaseCommands { /** Redis simple string response with "OK" */ public static final String OK = ConstantResponse.OK.toString(); @@ -619,6 +623,20 @@ public CompletableFuture zscore(@NonNull String key, @NonNull String mem ZScore, new String[] {key, member}, this::handleDoubleOrNullResponse); } + @Override + public CompletableFuture xadd(@NonNull String key, @NonNull Map values) { + return xadd(key, values, StreamAddOptions.builder().build()); + } + + @Override + public CompletableFuture xadd( + @NonNull String key, @NonNull Map values, @NonNull StreamAddOptions options) { + String[] arguments = + ArrayUtils.addAll( + ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values)); + return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java new file mode 100644 index 0000000000..5577655ab8 --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -0,0 +1,51 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.commands; + +import glide.api.models.commands.StreamAddOptions; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Supports commands and transactions for the "Stream Commands" group for standalone clients and + * cluster clients. + * + * @see Stream Commands + */ +public interface StreamBaseCommands { + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @return The id of the added entry. + * @example + *
{@code
+     * String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor").get();
+     * System.out.println("Stream: " + streamId);
+     * }
+ */ + CompletableFuture xadd(String key, Map values); + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @param options Stream add options. + * @return The id of the added entry, or null if {@link StreamAddOptions#makeStream} + * is set to false and no stream with the matching key exists. + * @example + *
{@code
+     * // Option to use the existing stream, or return null if the stream doesn't already exist at "key"
+     * StreamAddOptions options = StreamAddOptions.builder().id("sid").makeStream(Boolean.FALSE).build();
+     * String streamId = client.xadd("key", Map.of("name", "Sara", "surname", "OConnor"), options).get();
+     * if (streamId != null) {
+     *     assert streamId.equals("sid");
+     * }
+     * }
+ */ + CompletableFuture xadd(String key, Map values, StreamAddOptions options); +} diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index a0cd91ea89..44b2414f43 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -56,6 +56,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; import static redis_request.RedisRequestOuterClass.RequestType.Zadd; import static redis_request.RedisRequestOuterClass.RequestType.Zcard; @@ -67,6 +68,7 @@ import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.ConditionalSet; import glide.api.models.commands.SetOptions.SetOptionsBuilder; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import java.util.Map; import lombok.Getter; @@ -1292,6 +1294,40 @@ public T zscore(@NonNull String key, @NonNull String member) { return getThis(); } + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @return Command Response - The id of the added entry. + */ + public T xadd(@NonNull String key, @NonNull Map values) { + this.xadd(key, values, StreamAddOptions.builder().build()); + return getThis(); + } + + /** + * Adds an entry to the specified stream. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param values Field-value pairs to be added to the entry. + * @param options Stream add options. + * @return Command Response - The id of the added entry, or null if {@link + * StreamAddOptions#makeStream} is set to false and no stream with the matching + * key exists. + */ + public T xadd( + @NonNull String key, @NonNull Map values, @NonNull StreamAddOptions options) { + String[] arguments = + ArrayUtils.addAll( + ArrayUtils.addFirst(options.toArgs(), key), convertMapToKeyValueStringArray(values)); + ArgsArray commandArgs = buildArgs(arguments); + protobufTransaction.addCommands(buildCommand(XAdd, commandArgs)); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java b/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java new file mode 100644 index 0000000000..1f98dca863 --- /dev/null +++ b/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java @@ -0,0 +1,177 @@ +/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ +package glide.api.models.commands; + +import glide.api.commands.StreamBaseCommands; +import java.util.ArrayList; +import java.util.List; +import lombok.Builder; +import lombok.NonNull; + +/** + * Optional arguments to {@link StreamBaseCommands#xadd} + * + * @see redis.io + */ +@Builder +public final class StreamAddOptions { + + public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM"; + public static final String ID_WILDCARD_REDIS_API = "*"; + public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN"; + public static final String TRIM_MINID_REDIS_API = "MINID"; + public static final String TRIM_EXACT_REDIS_API = "="; + public static final String TRIM_NOT_EXACT_REDIS_API = "~"; + public static final String TRIM_LIMIT_REDIS_API = "LIMIT"; + + /** If set, the new entry will be added with this id. */ + private final String id; + + /** + * If set to false, a new stream won't be created if no stream matches the given key. + *
+ * Equivalent to NOMKSTREAM in the Redis API. + */ + private final Boolean makeStream; + + /** If set, the add operation will also trim the older entries in the stream. */ + private final StreamTrimOptions trim; + + public abstract static class StreamTrimOptions { + /** + * If true, the stream will be trimmed exactly. Equivalent to = in the + * Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more + * efficient, equivalent to ~ in the Redis API. + */ + protected boolean exact; + + /** If set, sets the maximal amount of entries that will be deleted. */ + protected Long limit; + + protected abstract String getMethod(); + + protected abstract String getThreshold(); + + protected List getRedisApi() { + List optionArgs = new ArrayList<>(); + + optionArgs.add(this.getMethod()); + optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API); + optionArgs.add(this.getThreshold()); + + if (this.limit != null) { + optionArgs.add(TRIM_LIMIT_REDIS_API); + optionArgs.add(this.limit.toString()); + } + + return optionArgs; + } + } + + /** Option to trim the stream according to minimum ID. */ + public static class MinId extends StreamTrimOptions { + /** Trim the stream according to entry ID. Equivalent to MINID in the Redis API. */ + private final String threshold; + + /** + * Create a trim option to trim stream based on stream ID. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison id. + */ + public MinId(boolean exact, @NonNull String threshold) { + this.threshold = threshold; + this.exact = exact; + } + + /** + * Create a trim option to trim stream based on stream ID. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison id. + * @param limit max number of stream entries to be trimmed. + */ + public MinId(boolean exact, @NonNull String threshold, long limit) { + this.threshold = threshold; + this.exact = exact; + this.limit = limit; + } + + @Override + protected String getMethod() { + return TRIM_MINID_REDIS_API; + } + + @Override + protected String getThreshold() { + return threshold; + } + } + + /** Option to trim the stream according to maximum stream length. */ + public static class MaxLen extends StreamTrimOptions { + /** + * Trim the stream according to length.
+ * Equivalent to MAXLEN in the Redis API. + */ + private final Long threshold; + + /** + * Create a Max Length trim option to trim stream based on length. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison count. + */ + public MaxLen(boolean exact, long threshold) { + this.threshold = threshold; + this.exact = exact; + } + + /** + * Create a Max Length trim option to trim stream entries exceeds the threshold. + * + * @param exact whether to match exactly on the threshold. + * @param threshold comparison count. + * @param limit max number of stream entries to be trimmed. + */ + public MaxLen(boolean exact, long threshold, long limit) { + this.threshold = threshold; + this.exact = exact; + this.limit = limit; + } + + @Override + protected String getMethod() { + return TRIM_MAXLEN_REDIS_API; + } + + @Override + protected String getThreshold() { + return threshold.toString(); + } + } + + /** + * Converts options for Xadd into a String[]. + * + * @return String[] + */ + public String[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (makeStream != null && !makeStream) { + optionArgs.add(NO_MAKE_STREAM_REDIS_API); + } + + if (trim != null) { + optionArgs.addAll(trim.getRedisApi()); + } + + if (id != null) { + optionArgs.add(id); + } else { + optionArgs.add(ID_WILDCARD_REDIS_API); + } + + return optionArgs.toArray(new String[0]); + } +} diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 8647d6a741..283057a428 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -5,7 +5,14 @@ import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_DOES_NOT_EXIST; import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_EXISTS; import static glide.api.models.commands.SetOptions.RETURN_OLD_VALUE; +import static glide.api.models.commands.StreamAddOptions.NO_MAKE_STREAM_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_EXACT_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_LIMIT_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_MAXLEN_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_MINID_REDIS_API; +import static glide.api.models.commands.StreamAddOptions.TRIM_NOT_EXACT_REDIS_API; import static glide.utils.ArrayTransformUtils.concatenateArrays; +import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray; import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -68,6 +75,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; import static redis_request.RedisRequestOuterClass.RequestType.Zadd; import static redis_request.RedisRequestOuterClass.RequestType.Zcard; @@ -79,6 +87,7 @@ import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.Expiry; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.managers.CommandManager; import glide.managers.ConnectionManager; @@ -90,8 +99,12 @@ import java.util.concurrent.CompletableFuture; import lombok.SneakyThrows; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class RedisClientTest { @@ -1814,6 +1827,177 @@ public void zscore_returns_success() { assertEquals(value, payload); } + @SneakyThrows + @Test + public void xadd_returns_success() { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + String[] fieldValuesArgs = convertMapToKeyValueStringArray(fieldValues); + String[] arguments = new String[] {key, "*"}; + arguments = ArrayUtils.addAll(arguments, fieldValuesArgs); + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + + @SneakyThrows + @Test + public void xadd_with_nomakestream_maxlen_options_returns_success() { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + StreamAddOptions options = + StreamAddOptions.builder() + .id("id") + .makeStream(false) + .trim(new StreamAddOptions.MaxLen(true, 5L)) + .build(); + + String[] arguments = + new String[] { + key, + NO_MAKE_STREAM_REDIS_API, + TRIM_MAXLEN_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + "id" + }; + arguments = ArrayUtils.addAll(arguments, convertMapToKeyValueStringArray(fieldValues)); + + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues, options); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + + private static List getStreamAddOptions() { + return List.of( + Arguments.of( + Pair.of( + // no TRIM option + StreamAddOptions.builder().id("id").makeStream(Boolean.FALSE).build(), + new String[] {"testKey", NO_MAKE_STREAM_REDIS_API, "id"}), + Pair.of( + // MAXLEN with LIMIT + StreamAddOptions.builder() + .id("id") + .makeStream(Boolean.TRUE) + .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 5L, 10L)) + .build(), + new String[] { + "testKey", + TRIM_MAXLEN_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + TRIM_LIMIT_REDIS_API, + Long.toString(10L), + "id" + }), + Pair.of( + // MAXLEN with non exact match + StreamAddOptions.builder() + .makeStream(Boolean.FALSE) + .trim(new StreamAddOptions.MaxLen(Boolean.FALSE, 2L)) + .build(), + new String[] { + "testKey", + NO_MAKE_STREAM_REDIS_API, + TRIM_MAXLEN_REDIS_API, + TRIM_NOT_EXACT_REDIS_API, + Long.toString(2L), + "*" + }), + Pair.of( + // MIN ID with LIMIT + StreamAddOptions.builder() + .id("id") + .makeStream(Boolean.TRUE) + .trim(new StreamAddOptions.MinId(Boolean.TRUE, "testKey", 10L)) + .build(), + new String[] { + "testKey", + TRIM_MINID_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + TRIM_LIMIT_REDIS_API, + Long.toString(10L), + "id" + }), + Pair.of( + // MIN ID with non exact match + StreamAddOptions.builder() + .makeStream(Boolean.FALSE) + .trim(new StreamAddOptions.MinId(Boolean.FALSE, "testKey")) + .build(), + new String[] { + "testKey", + NO_MAKE_STREAM_REDIS_API, + TRIM_MINID_REDIS_API, + TRIM_NOT_EXACT_REDIS_API, + Long.toString(5L), + "*" + }))); + } + + @SneakyThrows + @ParameterizedTest + @MethodSource("getStreamAddOptions") + public void xadd_with_options_returns_success(Pair optionAndArgs) { + // setup + String key = "testKey"; + Map fieldValues = new LinkedHashMap<>(); + fieldValues.put("testField1", "testValue1"); + fieldValues.put("testField2", "testValue2"); + String[] arguments = + ArrayUtils.addAll(optionAndArgs.getRight(), convertMapToKeyValueStringArray(fieldValues)); + String returnId = "testId"; + + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(returnId); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xadd(key, fieldValues, optionAndArgs.getLeft()); + String payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(returnId, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 1aa39e169f..62091d0953 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -54,6 +54,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Time; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.ZScore; import static redis_request.RedisRequestOuterClass.RequestType.Zadd; import static redis_request.RedisRequestOuterClass.RequestType.Zcard; @@ -62,6 +63,7 @@ import glide.api.models.commands.ExpireOptions; import glide.api.models.commands.InfoOptions; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -391,6 +393,28 @@ public void transaction_builds_protobuf_request(BaseTransaction transaction) transaction.zscore("key", "member"); results.add(Pair.of(ZScore, ArgsArray.newBuilder().addArgs("key").addArgs("member").build())); + transaction.xadd("key", Map.of("field1", "foo1")); + results.add( + Pair.of( + XAdd, + ArgsArray.newBuilder() + .addArgs("key") + .addArgs("*") + .addArgs("field1") + .addArgs("foo1") + .build())); + + transaction.xadd("key", Map.of("field1", "foo1"), StreamAddOptions.builder().id("id").build()); + results.add( + Pair.of( + XAdd, + ArgsArray.newBuilder() + .addArgs("key") + .addArgs("id") + .addArgs("field1") + .addArgs("foo1") + .build())); + transaction.time(); results.add(Pair.of(Time, ArgsArray.newBuilder().build())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 84d59a0fe5..201ba65819 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -23,6 +23,7 @@ import glide.api.models.commands.ExpireOptions; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; +import glide.api.models.commands.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.NodeAddress; import glide.api.models.configuration.RedisClientConfiguration; @@ -1133,6 +1134,94 @@ public void zscore(BaseClient client) { assertTrue(executionException.getCause() instanceof RequestException); } + @SneakyThrows + @ParameterizedTest + @MethodSource("getClients") + public void xadd(BaseClient client) { + String key = UUID.randomUUID().toString(); + String field1 = UUID.randomUUID().toString(); + String field2 = UUID.randomUUID().toString(); + + assertNull( + client + .xadd( + key, + Map.of(field1, "foo0", field2, "bar0"), + StreamAddOptions.builder().makeStream(Boolean.FALSE).build()) + .get()); + + String timestamp1 = "0-1"; + assertEquals( + timestamp1, + client + .xadd( + key, + Map.of(field1, "foo1", field2, "bar1"), + StreamAddOptions.builder().id(timestamp1).build()) + .get()); + + assertNotNull(client.xadd(key, Map.of(field1, "foo2", field2, "bar2")).get()); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + // this will trim the first entry. + String id = + client + .xadd( + key, + Map.of(field1, "foo3", field2, "bar3"), + StreamAddOptions.builder() + .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 2L)) + .build()) + .get(); + assertNotNull(id); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + // this will trim the second entry. + assertNotNull( + client + .xadd( + key, + Map.of(field1, "foo4", field2, "bar4"), + StreamAddOptions.builder() + .trim(new StreamAddOptions.MinId(Boolean.TRUE, id)) + .build()) + .get()); + if (client instanceof RedisClient) { + assertEquals(2L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 2L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + /** + * TODO add test to XTRIM on maxlen expect( await client.xtrim(key, { method: "maxlen", + * threshold: 1, exact: true, }), ).toEqual(1); expect(await client.customCommand(["XLEN", + * key])).toEqual(1); + */ + } + @SneakyThrows @ParameterizedTest @MethodSource("getClients")