From d3a9e539210e9db842b169dc4b944ea63fa442ac Mon Sep 17 00:00:00 2001 From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Date: Tue, 20 Aug 2024 11:36:48 -0700 Subject: [PATCH] Node: Add command XGROUP SETID (#2135) Signed-off-by: TJ Zhang Co-authored-by: TJ Zhang --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 35 +++++++++++++ node/src/Commands.ts | 19 +++++++ node/src/Transaction.ts | 25 +++++++++ node/tests/SharedTests.ts | 102 ++++++++++++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 2 + 6 files changed, 184 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d4d56abe7..a401ce8748 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ * Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107)) * Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146)) * Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112)) +* Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 1c4b24c87e..563db515ca 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -220,6 +220,7 @@ import { createZScore, createZUnion, createZUnionStore, + createXGroupSetid, } from "./Commands"; import { ClosingError, @@ -5214,6 +5215,40 @@ export class BaseClient { return this.createWritePromise(createXAck(key, group, ids)); } + /** + * Sets the last delivered ID for a consumer group. + * + * @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param id - The stream entry ID that should be set as the last delivered ID for the consumer + * group. + * @param entriesRead - (Optional) A value representing the number of stream entries already read by the group. + * This option can only be specified if you are using Valkey version 7.0.0 or above. + * @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used. + * @returns `"OK"`. + * + * * @example + * ```typescript + * console.log(await client.xgroupSetId("mystream", "mygroup", "0", 1L)); // Output is "OK" + * ``` + */ + public async xgroupSetId( + key: string, + groupName: string, + id: string, + entriesRead?: number, + decoder?: Decoder, + ): Promise<"OK"> { + return this.createWritePromise( + createXGroupSetid(key, groupName, id, entriesRead), + { + decoder: decoder, + }, + ); + } + /** Returns the element at index `index` in the list stored at `key`. * The index is zero-based, so 0 means the first element, 1 the second element and so on. * Negative indices can be used to designate elements starting at the tail of the list. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 3f127fd1bf..09bb359ecf 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3953,3 +3953,22 @@ export function createXAck( ): command_request.Command { return createCommand(RequestType.XAck, [key, group, ...ids]); } + +/** + * @internal + */ +export function createXGroupSetid( + key: string, + groupName: string, + id: string, + entriesRead?: number, +): command_request.Command { + const args = [key, groupName, id]; + + if (entriesRead !== undefined) { + args.push("ENTRIESREAD"); + args.push(entriesRead.toString()); + } + + return createCommand(RequestType.XGroupSetId, args); +} diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 2fd3fa5e02..993cdf703b 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -256,6 +256,7 @@ import { createZScore, createZUnion, createZUnionStore, + createXGroupSetid, } from "./Commands"; import { command_request } from "./ProtobufMessage"; @@ -2909,6 +2910,30 @@ export class BaseTransaction> { return this.addAndReturn(createXAck(key, group, ids)); } + /** + * Sets the last delivered ID for a consumer group. + * + * @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param id - The stream entry ID that should be set as the last delivered ID for the consumer group. + * @param entriesRead - (Optional) A value representing the number of stream entries already read by the group. + * This option can only be specified if you are using Valkey version 7.0.0 or above. + * + * Command Response - `"OK"`. + */ + public xgroupSetId( + key: string, + groupName: string, + id: string, + entriesRead?: number, + ): T { + return this.addAndReturn( + createXGroupSetid(key, groupName, id, entriesRead), + ); + } + /** * Renames `key` to `newkey`. * If `newkey` already exists it is overwritten. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 62bf853a71..19a8f09493 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -9502,6 +9502,108 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xgroupSetId test %p`, + async (protocol) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { + const key = "testKey" + uuidv4(); + const nonExistingKey = "group" + uuidv4(); + const stringKey = "testKey" + uuidv4(); + const groupName = uuidv4(); + const consumerName = uuidv4(); + const streamid0 = "0"; + const streamid1_0 = "1-0"; + const streamid1_1 = "1-1"; + const streamid1_2 = "1-2"; + + // Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List + expect( + await client.xadd(key, [["f0", "v0"]], { id: streamid1_0 }), + ).toBe(streamid1_0); + expect( + await client.xadd(key, [["f1", "v1"]], { id: streamid1_1 }), + ).toBe(streamid1_1); + expect( + await client.xadd(key, [["f2", "v2"]], { id: streamid1_2 }), + ).toBe(streamid1_2); + + expect( + await client.xgroupCreate(key, groupName, streamid0), + ).toBe("OK"); + + expect( + await client.xreadgroup(groupName, consumerName, { + [key]: ">", + }), + ).toEqual({ + [key]: { + [streamid1_0]: [["f0", "v0"]], + [streamid1_1]: [["f1", "v1"]], + [streamid1_2]: [["f2", "v2"]], + }, + }); + + // Sanity check: xreadgroup should not return more entries since they're all already in the + // Pending Entries List. + expect( + await client.xreadgroup(groupName, consumerName, { + [key]: ">", + }), + ).toBeNull(); + + // Reset the last delivered ID for the consumer group to "1-1" + if (cluster.checkIfServerVersionLessThan("7.0.0")) { + expect( + await client.xgroupSetId(key, groupName, streamid1_1), + ).toBe("OK"); + } else { + expect( + await client.xgroupSetId( + key, + groupName, + streamid1_1, + 1, + ), + ).toBe("OK"); + } + + // xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1 + const newResult = await client.xreadgroup( + groupName, + consumerName, + { [key]: ">" }, + ); + expect(newResult).toEqual({ + [key]: { + [streamid1_2]: [["f2", "v2"]], + }, + }); + + // An error is raised if XGROUP SETID is called with a non-existing key + await expect( + client.xgroupSetId(nonExistingKey, groupName, streamid0), + ).rejects.toThrow(RequestError); + + // An error is raised if XGROUP SETID is called with a non-existing group + await expect( + client.xgroupSetId(key, "non_existing_group", streamid0), + ).rejects.toThrow(RequestError); + + // Setting the ID to a non-existing ID is allowed + expect(await client.xgroupSetId(key, groupName, "99-99")).toBe( + "OK", + ); + + // key exists, but is not a stream + expect(await client.set(stringKey, "xgroup setid")).toBe("OK"); + await expect( + client.xgroupSetId(stringKey, groupName, streamid1_0), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `xpending test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index b6680eb685..5dbb9544d1 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -1217,6 +1217,8 @@ export async function transactionTest( baseTransaction.xack(key9, groupName1, ["0-3"]); responseData.push(["xack(key9, groupName1, ['0-3'])", 0]); + baseTransaction.xgroupSetId(key9, groupName1, "0-2"); + responseData.push(["xgroupSetId(key9, groupName1, '0-2')", "OK"]); baseTransaction.xgroupDelConsumer(key9, groupName1, consumer); responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]); baseTransaction.xgroupDestroy(key9, groupName1);