Skip to content

Commit

Permalink
Node: Add command XGROUP SETID (valkey-io#2135)
Browse files Browse the repository at this point in the history
Signed-off-by: TJ Zhang <[email protected]>
Co-authored-by: TJ Zhang <[email protected]>
  • Loading branch information
tjzhang-BQ and TJ Zhang authored Aug 20, 2024
1 parent 681d360 commit d3a9e53
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
* Node: Added XGROUP SETID command ([#2135]((https://github.com/valkey-io/valkey-glide/pull/2135))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
35 changes: 35 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ import {
createZScore,
createZUnion,
createZUnionStore,
createXGroupSetid,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -5214,6 +5215,40 @@ export class BaseClient {
return this.createWritePromise(createXAck(key, group, ids));
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param id - The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
* @param decoder - (Optional) {@link Decoder} type which defines how to handle the response. If not set, the default decoder from the client config will be used.
* @returns `"OK"`.
*
* * @example
* ```typescript
* console.log(await client.xgroupSetId("mystream", "mygroup", "0", 1L)); // Output is "OK"
* ```
*/
public async xgroupSetId(
key: string,
groupName: string,
id: string,
entriesRead?: number,
decoder?: Decoder,
): Promise<"OK"> {
return this.createWritePromise(
createXGroupSetid(key, groupName, id, entriesRead),
{
decoder: decoder,
},
);
}

/** Returns the element at index `index` in the list stored at `key`.
* The index is zero-based, so 0 means the first element, 1 the second element and so on.
* Negative indices can be used to designate elements starting at the tail of the list.
Expand Down
19 changes: 19 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3953,3 +3953,22 @@ export function createXAck(
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}

/**
* @internal
*/
export function createXGroupSetid(
key: string,
groupName: string,
id: string,
entriesRead?: number,
): command_request.Command {
const args = [key, groupName, id];

if (entriesRead !== undefined) {
args.push("ENTRIESREAD");
args.push(entriesRead.toString());
}

return createCommand(RequestType.XGroupSetId, args);
}
25 changes: 25 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ import {
createZScore,
createZUnion,
createZUnionStore,
createXGroupSetid,
} from "./Commands";
import { command_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -2909,6 +2910,30 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXAck(key, group, ids));
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see {@link https://valkey.io/commands/xgroup-setid|valkey.io} for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param id - The stream entry ID that should be set as the last delivered ID for the consumer group.
* @param entriesRead - (Optional) A value representing the number of stream entries already read by the group.
* This option can only be specified if you are using Valkey version 7.0.0 or above.
*
* Command Response - `"OK"`.
*/
public xgroupSetId(
key: string,
groupName: string,
id: string,
entriesRead?: number,
): T {
return this.addAndReturn(
createXGroupSetid(key, groupName, id, entriesRead),
);
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
102 changes: 102 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9502,6 +9502,108 @@ export function runBaseTests(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xgroupSetId test %p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
const key = "testKey" + uuidv4();
const nonExistingKey = "group" + uuidv4();
const stringKey = "testKey" + uuidv4();
const groupName = uuidv4();
const consumerName = uuidv4();
const streamid0 = "0";
const streamid1_0 = "1-0";
const streamid1_1 = "1-1";
const streamid1_2 = "1-2";

// Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List
expect(
await client.xadd(key, [["f0", "v0"]], { id: streamid1_0 }),
).toBe(streamid1_0);
expect(
await client.xadd(key, [["f1", "v1"]], { id: streamid1_1 }),
).toBe(streamid1_1);
expect(
await client.xadd(key, [["f2", "v2"]], { id: streamid1_2 }),
).toBe(streamid1_2);

expect(
await client.xgroupCreate(key, groupName, streamid0),
).toBe("OK");

expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toEqual({
[key]: {
[streamid1_0]: [["f0", "v0"]],
[streamid1_1]: [["f1", "v1"]],
[streamid1_2]: [["f2", "v2"]],
},
});

// Sanity check: xreadgroup should not return more entries since they're all already in the
// Pending Entries List.
expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toBeNull();

// Reset the last delivered ID for the consumer group to "1-1"
if (cluster.checkIfServerVersionLessThan("7.0.0")) {
expect(
await client.xgroupSetId(key, groupName, streamid1_1),
).toBe("OK");
} else {
expect(
await client.xgroupSetId(
key,
groupName,
streamid1_1,
1,
),
).toBe("OK");
}

// xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1
const newResult = await client.xreadgroup(
groupName,
consumerName,
{ [key]: ">" },
);
expect(newResult).toEqual({
[key]: {
[streamid1_2]: [["f2", "v2"]],
},
});

// An error is raised if XGROUP SETID is called with a non-existing key
await expect(
client.xgroupSetId(nonExistingKey, groupName, streamid0),
).rejects.toThrow(RequestError);

// An error is raised if XGROUP SETID is called with a non-existing group
await expect(
client.xgroupSetId(key, "non_existing_group", streamid0),
).rejects.toThrow(RequestError);

// Setting the ID to a non-existing ID is allowed
expect(await client.xgroupSetId(key, groupName, "99-99")).toBe(
"OK",
);

// key exists, but is not a stream
expect(await client.set(stringKey, "xgroup setid")).toBe("OK");
await expect(
client.xgroupSetId(stringKey, groupName, streamid1_0),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xpending test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,8 @@ export async function transactionTest(

baseTransaction.xack(key9, groupName1, ["0-3"]);
responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
baseTransaction.xgroupSetId(key9, groupName1, "0-2");
responseData.push(["xgroupSetId(key9, groupName1, '0-2')", "OK"]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
baseTransaction.xgroupDestroy(key9, groupName1);
Expand Down

0 comments on commit d3a9e53

Please sign in to comment.