From c397ce78fa4b49e13c8a6ac8fb7c8ad2bcd8781c Mon Sep 17 00:00:00 2001
From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com>
Date: Tue, 20 Aug 2024 10:14:37 -0700
Subject: [PATCH] Node: add command XACK (#2112)

Signed-off-by: TJ Zhang <tj.zhang@improving.com>
Co-authored-by: TJ Zhang <tj.zhang@improving.com>
---
 CHANGELOG.md                |   1 +
 node/src/BaseClient.ts      |  31 ++++++++++
 node/src/Commands.ts        |  11 ++++
 node/src/Transaction.ts     |  17 ++++++
 node/tests/SharedTests.ts   | 113 ++++++++++++++++++++++++++++++++++++
 node/tests/TestUtilities.ts |   2 +
 6 files changed, 175 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index af4e2672e4..0d4d56abe7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -88,6 +88,7 @@
 * Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
 * Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
 * Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))
+* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
 
 #### Breaking Changes
 * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts
index 68156ef365..1c4b24c87e 100644
--- a/node/src/BaseClient.ts
+++ b/node/src/BaseClient.ts
@@ -171,6 +171,7 @@ import {
     createUnlink,
     createWait,
     createWatch,
+    createXAck,
     createXAdd,
     createXAutoClaim,
     createXClaim,
@@ -5183,6 +5184,36 @@ export class BaseClient {
         preferReplica: connection_request.ReadFrom.PreferReplica,
     };
 
+    /**
+     * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+     * This command should be called on a pending message so that such message does not get processed again.
+     *
+     * @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
+     *
+     * @param key - The key of the stream.
+     * @param group - The consumer group name.
+     * @param ids - An array of entry ids.
+     * @returns The number of messages that were successfully acknowledged.
+     *
+     * @example
+     * ```typescript
+     *  <pre>{@code
+     * const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
+     * // read messages from streamId
+     * const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
+     * // acknowledge messages on stream
+     * console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
+     * </pre>
+     * ```
+     */
+    public async xack(
+        key: string,
+        group: string,
+        ids: string[],
+    ): Promise<number> {
+        return this.createWritePromise(createXAck(key, group, ids));
+    }
+
     /** Returns the element at index `index` in the list stored at `key`.
      * The index is zero-based, so 0 means the first element, 1 the second element and so on.
      * Negative indices can be used to designate elements starting at the tail of the list.
diff --git a/node/src/Commands.ts b/node/src/Commands.ts
index a75c3a7f4e..3f127fd1bf 100644
--- a/node/src/Commands.ts
+++ b/node/src/Commands.ts
@@ -3942,3 +3942,14 @@ export function createGetEx(
 
     return createCommand(RequestType.GetEx, args);
 }
+
+/**
+ * @internal
+ */
+export function createXAck(
+    key: string,
+    group: string,
+    ids: string[],
+): command_request.Command {
+    return createCommand(RequestType.XAck, [key, group, ...ids]);
+}
diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts
index 8d5d633fc9..2fd3fa5e02 100644
--- a/node/src/Transaction.ts
+++ b/node/src/Transaction.ts
@@ -207,6 +207,7 @@ import {
     createType,
     createUnlink,
     createWait,
+    createXAck,
     createXAdd,
     createXAutoClaim,
     createXClaim,
@@ -2892,6 +2893,22 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
         );
     }
 
+    /**
+     * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+     * This command should be called on a pending message so that such message does not get processed again.
+     *
+     * @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
+     *
+     * @param key - The key of the stream.
+     * @param group - The consumer group name.
+     * @param ids - An array of entry ids.
+     *
+     * Command Response - The number of messages that were successfully acknowledged.
+     */
+    public xack(key: string, group: string, ids: string[]): T {
+        return this.addAndReturn(createXAck(key, group, ids));
+    }
+
     /**
      * Renames `key` to `newkey`.
      * If `newkey` already exists it is overwritten.
diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts
index 3d815e3b57..62bf853a71 100644
--- a/node/tests/SharedTests.ts
+++ b/node/tests/SharedTests.ts
@@ -9834,6 +9834,119 @@ export function runBaseTests(config: {
         config.timeout,
     );
 
+    it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
+        `xack test_%p`,
+        async (protocol) => {
+            await runTest(async (client: BaseClient) => {
+                const key = "{testKey}:1-" + uuidv4();
+                const nonExistingKey = "{testKey}:2-" + uuidv4();
+                const string_key = "{testKey}:3-" + uuidv4();
+                const groupName = uuidv4();
+                const consumerName = uuidv4();
+                const stream_id0 = "0";
+                const stream_id1_0 = "1-0";
+                const stream_id1_1 = "1-1";
+                const stream_id1_2 = "1-2";
+
+                // setup: add 2 entries to the stream, create consumer group and read to mark them as pending
+                expect(
+                    await client.xadd(key, [["f0", "v0"]], {
+                        id: stream_id1_0,
+                    }),
+                ).toEqual(stream_id1_0);
+                expect(
+                    await client.xadd(key, [["f1", "v1"]], {
+                        id: stream_id1_1,
+                    }),
+                ).toEqual(stream_id1_1);
+                expect(
+                    await client.xgroupCreate(key, groupName, stream_id0),
+                ).toBe("OK");
+                expect(
+                    await client.xreadgroup(groupName, consumerName, {
+                        [key]: ">",
+                    }),
+                ).toEqual({
+                    [key]: {
+                        [stream_id1_0]: [["f0", "v0"]],
+                        [stream_id1_1]: [["f1", "v1"]],
+                    },
+                });
+
+                // add one more entry
+                expect(
+                    await client.xadd(key, [["f2", "v2"]], {
+                        id: stream_id1_2,
+                    }),
+                ).toEqual(stream_id1_2);
+
+                // acknowledge the first 2 entries
+                expect(
+                    await client.xack(key, groupName, [
+                        stream_id1_0,
+                        stream_id1_1,
+                    ]),
+                ).toBe(2);
+
+                // attempt to acknowledge the first 2 entries again, returns 0 since they were already acknowledged
+                expect(
+                    await client.xack(key, groupName, [
+                        stream_id1_0,
+                        stream_id1_1,
+                    ]),
+                ).toBe(0);
+
+                // read the last unacknowledged entry
+                expect(
+                    await client.xreadgroup(groupName, consumerName, {
+                        [key]: ">",
+                    }),
+                ).toEqual({ [key]: { [stream_id1_2]: [["f2", "v2"]] } });
+
+                // deleting the consumer, returns 1 since the last entry still hasn't been acknowledged
+                expect(
+                    await client.xgroupDelConsumer(
+                        key,
+                        groupName,
+                        consumerName,
+                    ),
+                ).toBe(1);
+
+                // attempt to acknowledge a non-existing key, returns 0
+                expect(
+                    await client.xack(nonExistingKey, groupName, [
+                        stream_id1_0,
+                    ]),
+                ).toBe(0);
+
+                // attempt to acknowledge a non-existing group name, returns 0
+                expect(
+                    await client.xack(key, "nonExistingGroup", [stream_id1_0]),
+                ).toBe(0);
+
+                // attempt to acknowledge a non-existing ID, returns 0
+                expect(await client.xack(key, groupName, ["99-99"])).toBe(0);
+
+                // invalid argument - ID list must not be empty
+                await expect(client.xack(key, groupName, [])).rejects.toThrow(
+                    RequestError,
+                );
+
+                // invalid argument - invalid stream ID format
+                await expect(
+                    client.xack(key, groupName, ["invalid stream ID format"]),
+                ).rejects.toThrow(RequestError);
+
+                // key exists, but is not a stream
+                expect(await client.set(string_key, "xack")).toBe("OK");
+                await expect(
+                    client.xack(string_key, groupName, [stream_id1_0]),
+                ).rejects.toThrow(RequestError);
+            }, protocol);
+        },
+        config.timeout,
+    );
+
     it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
         `lmpop test_%p`,
         async (protocol) => {
diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts
index 4605f89aa9..b6680eb685 100644
--- a/node/tests/TestUtilities.ts
+++ b/node/tests/TestUtilities.ts
@@ -1215,6 +1215,8 @@ export async function transactionTest(
         ]);
     }
 
+    baseTransaction.xack(key9, groupName1, ["0-3"]);
+    responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
     baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
     responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
     baseTransaction.xgroupDestroy(key9, groupName1);