Skip to content

Commit

Permalink
Node: Add XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands (v…
Browse files Browse the repository at this point in the history
…alkey-io#2088)

* Added XGROUP CREATE and XGROUP DESTROY commands

Signed-off-by: Guian Gumpac <[email protected]>

---------

Signed-off-by: Guian Gumpac <[email protected]>
  • Loading branch information
GumpacG authored Aug 8, 2024
1 parent 60f71af commit 9277a2f
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043))
* Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084))
* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
54 changes: 54 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ import {
createXGroupDestroy,
createXInfoConsumers,
createXInfoStream,
createXGroupCreateConsumer,
createXGroupDelConsumer,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -4098,6 +4100,58 @@ export class BaseClient {
);
}

/**
* Creates a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`.
*
* See https://valkey.io/commands/xgroup-createconsumer for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param consumerName - The newly created consumer.
* @returns `true` if the consumer is created. Otherwise, returns `false`.
*
* @example
* ```typescript
* // The consumer "myconsumer" was created in consumer group "mygroup" for the stream "mystream".
* console.log(await client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer")); // Output is true
* ```
*/
public async xgroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
): Promise<boolean> {
return this.createWritePromise(
createXGroupCreateConsumer(key, groupName, consumerName),
);
}

/**
* Deletes a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`.
*
* See https://valkey.io/commands/xgroup-delconsumer for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param consumerName - The consumer to delete.
* @returns The number of pending messages the `consumer` had before it was deleted.
*
* * @example
* ```typescript
* // Consumer "myconsumer" was deleted, and had 5 pending messages unclaimed.
* console.log(await client.xgroupDelConsumer("mystream", "mygroup", "myconsumer")); // Output is 5
* ```
*/
public async xgroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
): Promise<number> {
return this.createWritePromise(
createXGroupDelConsumer(key, groupName, consumerName),
);
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
30 changes: 30 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,36 @@ export function createXTrim(
return createCommand(RequestType.XTrim, args);
}

/**
* @internal
*/
export function createXGroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
): command_request.Command {
return createCommand(RequestType.XGroupCreateConsumer, [
key,
groupName,
consumerName,
]);
}

/**
* @internal
*/
export function createXGroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
): command_request.Command {
return createCommand(RequestType.XGroupDelConsumer, [
key,
groupName,
consumerName,
]);
}

/**
* @internal
*/
Expand Down
44 changes: 44 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ import {
createXTrim,
createXGroupCreate,
createXGroupDestroy,
createXGroupCreateConsumer,
createXGroupDelConsumer,
createZAdd,
createZCard,
createZCount,
Expand Down Expand Up @@ -2386,6 +2388,48 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXGroupDestroy(key, groupName));
}

/**
* Creates a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`.
*
* See https://valkey.io/commands/xgroup-createconsumer for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param consumerName - The newly created consumer.
*
* Command Response - `true` if the consumer is created. Otherwise, returns `false`.
*/
public xgroupCreateConsumer(
key: string,
groupName: string,
consumerName: string,
): T {
return this.addAndReturn(
createXGroupCreateConsumer(key, groupName, consumerName),
);
}

/**
* Deletes a consumer named `consumerName` in the consumer group `groupName` for the stream stored at `key`.
*
* See https://valkey.io/commands/xgroup-delconsumer for more details.
*
* @param key - The key of the stream.
* @param groupName - The consumer group name.
* @param consumerName - The consumer to delete.
*
* Command Response - The number of pending messages the `consumer` had before it was deleted.
*/
public xgroupDelConsumer(
key: string,
groupName: string,
consumerName: string,
): T {
return this.addAndReturn(
createXGroupDelConsumer(key, groupName, consumerName),
);
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
133 changes: 113 additions & 20 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4770,16 +4770,8 @@ export function runBaseTests<Context>(config: {
),
).toEqual(streamId1_0);

// TODO: uncomment when XGROUP CREATE is implemented
// expect(await client.xgroupCreate(key, groupName, streamId0_0)).toEqual("Ok");
expect(
await client.customCommand([
"XGROUP",
"CREATE",
key,
groupName,
streamId0_0,
]),
await client.xgroupCreate(key, groupName, streamId0_0),
).toEqual("OK");

// TODO: uncomment when XREADGROUP is implemented
Expand Down Expand Up @@ -7373,13 +7365,11 @@ export function runBaseTests<Context>(config: {
}

expect(
await client.customCommand([
"XGROUP",
"CREATECONSUMER",
await client.xgroupCreateConsumer(
key,
groupName1,
consumer2,
]),
),
).toBeTruthy();
expect(
await client.customCommand([
Expand Down Expand Up @@ -7452,13 +7442,7 @@ export function runBaseTests<Context>(config: {
}),
).toEqual("OK");
expect(
await client.customCommand([
"xgroup",
"createconsumer",
key,
group,
"consumer",
]),
await client.xgroupCreateConsumer(key, group, "consumer"),
).toEqual(true);

expect(
Expand Down Expand Up @@ -7671,6 +7655,115 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xgroupCreateConsumer and xgroupDelConsumer test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const nonExistentKey = uuidv4();
const stringKey = uuidv4();
const groupName = uuidv4();
const consumer = uuidv4();
const streamId0 = "0";

// create group and consumer for the group
expect(
await client.xgroupCreate(key, groupName, streamId0, {
mkStream: true,
}),
).toEqual("OK");
expect(
await client.xgroupCreateConsumer(key, groupName, consumer),
).toEqual(true);

// attempting to create/delete a consumer for a group that does not exist results in a NOGROUP request error
await expect(
client.xgroupCreateConsumer(
key,
"nonExistentGroup",
consumer,
),
).rejects.toThrow(RequestError);
await expect(
client.xgroupDelConsumer(key, "nonExistentGroup", consumer),
).rejects.toThrow(RequestError);

// attempt to create consumer for group again
expect(
await client.xgroupCreateConsumer(key, groupName, consumer),
).toEqual(false);

// attempting to delete a consumer that has not been created yet returns 0
expect(
await client.xgroupDelConsumer(
key,
groupName,
"nonExistentConsumer",
),
).toEqual(0);

// Add two stream entries
const streamid1: string | null = await client.xadd(key, [
["field1", "value1"],
]);
expect(streamid1).not.toBeNull();
const streamid2 = await client.xadd(key, [
["field2", "value2"],
]);
expect(streamid2).not.toBeNull();

// read the entire stream for the consumer and mark messages as pending
expect(
await client.customCommand([
"XREADGROUP",
"GROUP",
groupName,
consumer,
"STREAMS",
key,
">",
]),
).toEqual({
[key]: {
[streamid1 as string]: [["field1", "value1"]],
[streamid2 as string]: [["field2", "value2"]],
},
});

// delete one of the streams
expect(
await client.xgroupDelConsumer(key, groupName, consumer),
).toEqual(2);

// attempting to call XGROUP CREATECONSUMER or XGROUP DELCONSUMER with a non-existing key should raise an error
await expect(
client.xgroupCreateConsumer(
nonExistentKey,
groupName,
consumer,
),
).rejects.toThrow(RequestError);
await expect(
client.xgroupDelConsumer(
nonExistentKey,
groupName,
consumer,
),
).rejects.toThrow(RequestError);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(
client.xgroupCreateConsumer(stringKey, groupName, consumer),
).rejects.toThrow(RequestError);
await expect(
client.xgroupDelConsumer(stringKey, groupName, consumer),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xgroupCreate and xgroupDestroy test_%p`,
async (protocol) => {
Expand Down
18 changes: 10 additions & 8 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ export async function transactionTest(
const value = uuidv4();
const groupName1 = uuidv4();
const groupName2 = uuidv4();
const consumer = uuidv4();
// array of tuples - first element is test name/description, second - expected return value
const responseData: [string, ReturnType][] = [];

Expand Down Expand Up @@ -959,15 +960,9 @@ export async function transactionTest(

// key9 has one entry here: {"0-2":[["field","value2"]]}

baseTransaction.customCommand([
"xgroup",
"createconsumer",
key9,
groupName1,
"consumer1",
]);
baseTransaction.xgroupCreateConsumer(key9, groupName1, "consumer1");
responseData.push([
'xgroupCreateConsumer(key9, groupName1, "consumer1")',
"xgroupCreateConsumer(key9, groupName1, consumer1)",
true,
]);
baseTransaction.customCommand([
Expand Down Expand Up @@ -1011,6 +1006,13 @@ export async function transactionTest(
'xclaimJustId(key9, groupName1, "consumer1", 0, ["0-2"], { isForce: true, retryCount: 0, idle: 0})',
["0-2"],
]);
baseTransaction.xgroupCreateConsumer(key9, groupName1, consumer);
responseData.push([
"xgroupCreateConsumer(key9, groupName1, consumer)",
true,
]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 0]);
baseTransaction.xgroupDestroy(key9, groupName1);
responseData.push(["xgroupDestroy(key9, groupName1)", true]);
baseTransaction.xgroupDestroy(key9, groupName2);
Expand Down

0 comments on commit 9277a2f

Please sign in to comment.