From 9277a2f6cc3a69f23b629e312c055f1aaf02ff8c Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Thu, 8 Aug 2024 13:24:37 -0700 Subject: [PATCH] Node: Add `XGROUP CREATECONSUMER` and `XGROUP DELCONSUMER` commands (#2088) * Added XGROUP CREATE and XGROUP DESTROY commands Signed-off-by: Guian Gumpac --------- Signed-off-by: Guian Gumpac --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 54 +++++++++++++++ node/src/Commands.ts | 30 ++++++++ node/src/Transaction.ts | 44 ++++++++++++ node/tests/SharedTests.ts | 133 ++++++++++++++++++++++++++++++------ node/tests/TestUtilities.ts | 18 ++--- 6 files changed, 252 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b7bde5852..fa7f24b6d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043)) * Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084)) * Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077)) +* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 4096424b2e..e93bcd2c2d 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -165,6 +165,8 @@ import { createXGroupDestroy, createXInfoConsumers, createXInfoStream, + createXGroupCreateConsumer, + createXGroupDelConsumer, createXLen, createXRead, createXTrim, @@ -4098,6 +4100,58 @@ export class BaseClient { ); } + /** + * Creates a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`. + * + * See https://valkey.io/commands/xgroup-createconsumer for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param consumerName - The newly created consumer. + * @returns `true` if the consumer is created. Otherwise, returns `false`. + * + * @example + * ```typescript + * // The consumer "myconsumer" was created in consumer group "mygroup" for the stream "mystream". + * console.log(await client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer")); // Output is true + * ``` + */ + public async xgroupCreateConsumer( + key: string, + groupName: string, + consumerName: string, + ): Promise { + return this.createWritePromise( + createXGroupCreateConsumer(key, groupName, consumerName), + ); + } + + /** + * Deletes a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`. + * + * See https://valkey.io/commands/xgroup-delconsumer for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param consumerName - The consumer to delete. + * @returns The number of pending messages the `consumer` had before it was deleted. + * + * * @example + * ```typescript + * // Consumer "myconsumer" was deleted, and had 5 pending messages unclaimed. + * console.log(await client.xgroupDelConsumer("mystream", "mygroup", "myconsumer")); // Output is 5 + * ``` + */ + public async xgroupDelConsumer( + key: string, + groupName: string, + consumerName: string, + ): Promise { + return this.createWritePromise( + createXGroupDelConsumer(key, groupName, consumerName), + ); + } + private readonly MAP_READ_FROM_STRATEGY: Record< ReadFrom, connection_request.ReadFrom diff --git a/node/src/Commands.ts b/node/src/Commands.ts index d20e029e67..6f9715ddf4 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -2027,6 +2027,36 @@ export function createXTrim( return createCommand(RequestType.XTrim, args); } +/** + * @internal + */ +export function createXGroupCreateConsumer( + key: string, + groupName: string, + consumerName: string, +): command_request.Command { + return createCommand(RequestType.XGroupCreateConsumer, [ + key, + groupName, + consumerName, + ]); +} + +/** + * @internal + */ +export function createXGroupDelConsumer( + key: string, + groupName: string, + consumerName: string, +): command_request.Command { + return createCommand(RequestType.XGroupDelConsumer, [ + key, + groupName, + consumerName, + ]); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 4d7f8d81cb..937909f15b 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -202,6 +202,8 @@ import { createXTrim, createXGroupCreate, createXGroupDestroy, + createXGroupCreateConsumer, + createXGroupDelConsumer, createZAdd, createZCard, createZCount, @@ -2386,6 +2388,48 @@ export class BaseTransaction> { return this.addAndReturn(createXGroupDestroy(key, groupName)); } + /** + * Creates a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`. + * + * See https://valkey.io/commands/xgroup-createconsumer for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param consumerName - The newly created consumer. + * + * Command Response - `true` if the consumer is created. Otherwise, returns `false`. + */ + public xgroupCreateConsumer( + key: string, + groupName: string, + consumerName: string, + ): T { + return this.addAndReturn( + createXGroupCreateConsumer(key, groupName, consumerName), + ); + } + + /** + * Deletes a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`. + * + * See https://valkey.io/commands/xgroup-delconsumer for more details. + * + * @param key - The key of the stream. + * @param groupName - The consumer group name. + * @param consumerName - The consumer to delete. + * + * Command Response - The number of pending messages the `consumer` had before it was deleted. + */ + public xgroupDelConsumer( + key: string, + groupName: string, + consumerName: string, + ): T { + return this.addAndReturn( + createXGroupDelConsumer(key, groupName, consumerName), + ); + } + /** * Renames `key` to `newkey`. * If `newkey` already exists it is overwritten. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 69ed2c8ac2..db27f80007 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -4770,16 +4770,8 @@ export function runBaseTests(config: { ), ).toEqual(streamId1_0); - // TODO: uncomment when XGROUP CREATE is implemented - // expect(await client.xgroupCreate(key, groupName, streamId0_0)).toEqual("Ok"); expect( - await client.customCommand([ - "XGROUP", - "CREATE", - key, - groupName, - streamId0_0, - ]), + await client.xgroupCreate(key, groupName, streamId0_0), ).toEqual("OK"); // TODO: uncomment when XREADGROUP is implemented @@ -7373,13 +7365,11 @@ export function runBaseTests(config: { } expect( - await client.customCommand([ - "XGROUP", - "CREATECONSUMER", + await client.xgroupCreateConsumer( key, groupName1, consumer2, - ]), + ), ).toBeTruthy(); expect( await client.customCommand([ @@ -7452,13 +7442,7 @@ export function runBaseTests(config: { }), ).toEqual("OK"); expect( - await client.customCommand([ - "xgroup", - "createconsumer", - key, - group, - "consumer", - ]), + await client.xgroupCreateConsumer(key, group, "consumer"), ).toEqual(true); expect( @@ -7671,6 +7655,115 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xgroupCreateConsumer and xgroupDelConsumer test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const nonExistentKey = uuidv4(); + const stringKey = uuidv4(); + const groupName = uuidv4(); + const consumer = uuidv4(); + const streamId0 = "0"; + + // create group and consumer for the group + expect( + await client.xgroupCreate(key, groupName, streamId0, { + mkStream: true, + }), + ).toEqual("OK"); + expect( + await client.xgroupCreateConsumer(key, groupName, consumer), + ).toEqual(true); + + // attempting to create/delete a consumer for a group that does not exist results in a NOGROUP request error + await expect( + client.xgroupCreateConsumer( + key, + "nonExistentGroup", + consumer, + ), + ).rejects.toThrow(RequestError); + await expect( + client.xgroupDelConsumer(key, "nonExistentGroup", consumer), + ).rejects.toThrow(RequestError); + + // attempt to create consumer for group again + expect( + await client.xgroupCreateConsumer(key, groupName, consumer), + ).toEqual(false); + + // attempting to delete a consumer that has not been created yet returns 0 + expect( + await client.xgroupDelConsumer( + key, + groupName, + "nonExistentConsumer", + ), + ).toEqual(0); + + // Add two stream entries + const streamid1: string | null = await client.xadd(key, [ + ["field1", "value1"], + ]); + expect(streamid1).not.toBeNull(); + const streamid2 = await client.xadd(key, [ + ["field2", "value2"], + ]); + expect(streamid2).not.toBeNull(); + + // read the entire stream for the consumer and mark messages as pending + expect( + await client.customCommand([ + "XREADGROUP", + "GROUP", + groupName, + consumer, + "STREAMS", + key, + ">", + ]), + ).toEqual({ + [key]: { + [streamid1 as string]: [["field1", "value1"]], + [streamid2 as string]: [["field2", "value2"]], + }, + }); + + // delete one of the streams + expect( + await client.xgroupDelConsumer(key, groupName, consumer), + ).toEqual(2); + + // attempting to call XGROUP CREATECONSUMER or XGROUP DELCONSUMER with a non-existing key should raise an error + await expect( + client.xgroupCreateConsumer( + nonExistentKey, + groupName, + consumer, + ), + ).rejects.toThrow(RequestError); + await expect( + client.xgroupDelConsumer( + nonExistentKey, + groupName, + consumer, + ), + ).rejects.toThrow(RequestError); + + // key exists, but it is not a stream + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect( + client.xgroupCreateConsumer(stringKey, groupName, consumer), + ).rejects.toThrow(RequestError); + await expect( + client.xgroupDelConsumer(stringKey, groupName, consumer), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `xgroupCreate and xgroupDestroy test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 24d4611df2..ef044bd008 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -515,6 +515,7 @@ export async function transactionTest( const value = uuidv4(); const groupName1 = uuidv4(); const groupName2 = uuidv4(); + const consumer = uuidv4(); // array of tuples - first element is test name/description, second - expected return value const responseData: [string, ReturnType][] = []; @@ -959,15 +960,9 @@ export async function transactionTest( // key9 has one entry here: {"0-2":[["field","value2"]]} - baseTransaction.customCommand([ - "xgroup", - "createconsumer", - key9, - groupName1, - "consumer1", - ]); + baseTransaction.xgroupCreateConsumer(key9, groupName1, "consumer1"); responseData.push([ - 'xgroupCreateConsumer(key9, groupName1, "consumer1")', + "xgroupCreateConsumer(key9, groupName1, consumer1)", true, ]); baseTransaction.customCommand([ @@ -1011,6 +1006,13 @@ export async function transactionTest( 'xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"], { isForce: true, retryCount: 0, idle: 0})', ["0-2"], ]); + baseTransaction.xgroupCreateConsumer(key9, groupName1, consumer); + responseData.push([ + "xgroupCreateConsumer(key9, groupName1, consumer)", + true, + ]); + baseTransaction.xgroupDelConsumer(key9, groupName1, consumer); + responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 0]); baseTransaction.xgroupDestroy(key9, groupName1); responseData.push(["xgroupDestroy(key9, groupName1)", true]); baseTransaction.xgroupDestroy(key9, groupName2);