Skip to content

Commit

Permalink
Node: Add command LMPOP & BLMPOP (valkey-io#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjzhang-BQ authored Aug 1, 2024
1 parent 5ae81cc commit ca0b317
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061))
* Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066))
* Node: Added XDEL command ([#2064]((https://github.com/valkey-io/valkey-glide/pull/2064))
* Node: Added LMPOP & BLMPOP command ([#2050](https://github.com/valkey-io/valkey-glide/pull/2050))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
66 changes: 66 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
createBitField,
createBitOp,
createBitPos,
createBLMPop,
createDecr,
createDecrBy,
createDel,
Expand Down Expand Up @@ -91,6 +92,7 @@ import {
createLInsert,
createLLen,
createLMove,
createLMPop,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -4514,6 +4516,70 @@ export class BaseClient {
return this.createWritePromise(createSetRange(key, offset, value));
}

/**
* Pops one or more elements from the first non-empty list from the provided `keys`.
*
* See https://valkey.io/commands/lmpop/ for more details.
*
* @remarks When in cluster mode, all `key`s must map to the same hash slot.
* @param keys - An array of keys to lists.
* @param direction - The direction based on which elements are popped from - see {@link ListDirection}.
* @param count - (Optional) The maximum number of popped elements.
* @returns A `Record` of key-name mapped array of popped elements.
*
* since Valkey version 7.0.0.
*
* @example
* ```typescript
* await client.lpush("testKey", ["one", "two", "three"]);
* await client.lpush("testKey2", ["five", "six", "seven"]);
* const result = await client.lmpop(["testKey", "testKey2"], ListDirection.LEFT, 1L);
* console.log(result.get("testKey")); // Output: { "testKey": ["three"] }
* ```
*/
public async lmpop(
keys: string[],
direction: ListDirection,
count?: number,
): Promise<Record<string, string[]>> {
return this.createWritePromise(createLMPop(keys, direction, count));
}

/**
* Blocks the connection until it pops one or more elements from the first non-empty list from the
* provided `key`. `BLMPOP` is the blocking variant of {@link lmpop}.
*
* See https://valkey.io/commands/blmpop/ for more details.
*
* @remarks When in cluster mode, all `key`s must map to the same hash slot.
* @param keys - An array of keys to lists.
* @param direction - The direction based on which elements are popped from - see {@link ListDirection}.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
* @param count - (Optional) The maximum number of popped elements.
* @returns - A `Record` of `key` name mapped array of popped elements.
* If no member could be popped and the timeout expired, returns `null`.
*
* since Valkey version 7.0.0.
*
* @example
* ```typescript
* await client.lpush("testKey", ["one", "two", "three"]);
* await client.lpush("testKey2", ["five", "six", "seven"]);
* const result = await client.blmpop(["testKey", "testKey2"], ListDirection.LEFT, 0.1, 1L);
* console.log(result.get("testKey")); // Output: { "testKey": ["three"] }
* ```
*/
public async blmpop(
keys: string[],
direction: ListDirection,
timeout: number,
count?: number,
): Promise<Record<string, string[]>> {
return this.createWritePromise(
createBLMPop(timeout, keys, direction, count),
);
}

/**
* @internal
*/
Expand Down
42 changes: 42 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3066,3 +3066,45 @@ export function createSetRange(
): command_request.Command {
return createCommand(RequestType.SetRange, [key, offset.toString(), value]);
}

/**
* @internal
*/
export function createLMPop(
keys: string[],
direction: ListDirection,
count?: number,
): command_request.Command {
const args: string[] = [keys.length.toString(), ...keys, direction];

if (count !== undefined) {
args.push("COUNT");
args.push(count.toString());
}

return createCommand(RequestType.LMPop, args);
}

/**
* @internal
*/
export function createBLMPop(
timeout: number,
keys: string[],
direction: ListDirection,
count?: number,
): command_request.Command {
const args: string[] = [
timeout.toString(),
keys.length.toString(),
...keys,
direction,
];

if (count !== undefined) {
args.push("COUNT");
args.push(count.toString());
}

return createCommand(RequestType.BLMPop, args);
}
49 changes: 47 additions & 2 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import {
ZAddOptions,
createBLMove,
createBLPop,
createBLMPop,
createBRPop,
createBZMPop,
createBitCount,
Expand Down Expand Up @@ -112,6 +113,7 @@ import {
createLInsert,
createLLen,
createLMove,
createLMPop,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -2524,8 +2526,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param timeout - The number of seconds to wait for a blocking operation to complete.
* A value of 0 will block indefinitely.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
*
* Command Response - A two-element `array` containing the key name of the set from which the element
Expand Down Expand Up @@ -2732,6 +2733,50 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
public setrange(key: string, offset: number, value: string): T {
return this.addAndReturn(createSetRange(key, offset, value));
}

/**
* Pops one or more elements from the first non-empty list from the provided `keys`.
*
* See https://valkey.io/commands/lmpop/ for more details.
*
* @remarks When in cluster mode, `source` and `destination` must map to the same hash slot.
* @param keys - An array of keys to lists.
* @param direction - The direction based on which elements are popped from - see {@link ListDirection}.
* @param count - (Optional) The maximum number of popped elements.
*
* Command Response - A `Record` of `key` name mapped array of popped elements.
*
* since Valkey version 7.0.0.
*/
public lmpop(keys: string[], direction: ListDirection, count?: number): T {
return this.addAndReturn(createLMPop(keys, direction, count));
}

/**
* Blocks the connection until it pops one or more elements from the first non-empty list from the
* provided `key`. `BLMPOP` is the blocking variant of {@link lmpop}.
*
* See https://valkey.io/commands/blmpop/ for more details.
*
* @param keys - An array of keys to lists.
* @param direction - The direction based on which elements are popped from - see {@link ListDirection}.
* @param timeout - The number of seconds to wait for a blocking operation to complete. A value of
* `0` will block indefinitely.
* @param count - (Optional) The maximum number of popped elements.
*
* Command Response - A `Record` of `key` name mapped array of popped elements.
* If no member could be popped and the timeout expired, returns `null`.
*
* since Valkey version 7.0.0.
*/
public blmpop(
keys: string[],
direction: ListDirection,
timeout: number,
count?: number,
): T {
return this.addAndReturn(createBLMPop(timeout, keys, direction, count));
}
}

/**
Expand Down
23 changes: 18 additions & 5 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,27 @@ describe("GlideClient", () => {
ListDirection.LEFT,
0.1,
);
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500);
});

const blmpopPromise = client.blmpop(
["key1", "key2"],
ListDirection.LEFT,
0.1,
);

const promiseList = [blmovePromise, blmpopPromise];

try {
await Promise.race([blmovePromise, timeoutPromise]);
for (const promise of promiseList) {
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500);
});
await Promise.race([promise, timeoutPromise]);
}
} finally {
Promise.resolve(blmovePromise);
for (const promise of promiseList) {
await Promise.resolve([promise]);
}

client.close();
}
},
Expand Down
2 changes: 2 additions & 0 deletions node/tests/GlideClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ describe("GlideClusterClient", () => {
client.sdiffstore("abc", ["zxy", "lkn"]),
client.sortStore("abc", "zyx"),
client.sortStore("abc", "zyx", { isAlpha: true }),
client.lmpop(["abc", "def"], ListDirection.LEFT, 1),
client.blmpop(["abc", "def"], ListDirection.RIGHT, 0.1, 1),
];

if (gte(cluster.getVersion(), "6.2.0")) {
Expand Down
112 changes: 112 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6602,6 +6602,118 @@ export function runBaseTests<Context>(config: {
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lmpop test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) {
return;
}

const key1 = "{key}" + uuidv4();
const key2 = "{key}" + uuidv4();
const nonListKey = uuidv4();
const singleKeyArray = [key1];
const multiKeyArray = [key2, key1];
const count = 1;
const lpushArgs = ["one", "two", "three", "four", "five"];
const expected = { [key1]: ["five"] };
const expected2 = { [key2]: ["one", "two"] };

// nothing to be popped
expect(
await client.lmpop(
singleKeyArray,
ListDirection.LEFT,
count,
),
).toBeNull();

// pushing to the arrays to be popped
expect(await client.lpush(key1, lpushArgs)).toEqual(5);
expect(await client.lpush(key2, lpushArgs)).toEqual(5);

// checking correct result from popping
expect(
await client.lmpop(singleKeyArray, ListDirection.LEFT),
).toEqual(expected);

// popping multiple elements from the right
expect(
await client.lmpop(multiKeyArray, ListDirection.RIGHT, 2),
).toEqual(expected2);

// Key exists, but is not a set
expect(await client.set(nonListKey, "lmpop")).toBe("OK");
await expect(
client.lmpop([nonListKey], ListDirection.RIGHT),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`blmpop test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) {
return;
}

const key1 = "{key}" + uuidv4();
const key2 = "{key}" + uuidv4();
const nonListKey = uuidv4();
const singleKeyArray = [key1];
const multiKeyArray = [key2, key1];
const count = 1;
const lpushArgs = ["one", "two", "three", "four", "five"];
const expected = { [key1]: ["five"] };
const expected2 = { [key2]: ["one", "two"] };

// nothing to be popped
expect(
await client.blmpop(
singleKeyArray,
ListDirection.LEFT,
0.1,
count,
),
).toBeNull();

// pushing to the arrays to be popped
expect(await client.lpush(key1, lpushArgs)).toEqual(5);
expect(await client.lpush(key2, lpushArgs)).toEqual(5);

// checking correct result from popping
expect(
await client.blmpop(
singleKeyArray,
ListDirection.LEFT,
0.1,
),
).toEqual(expected);

// popping multiple elements from the right
expect(
await client.blmpop(
multiKeyArray,
ListDirection.RIGHT,
0.1,
2,
),
).toEqual(expected2);

// Key exists, but is not a set
expect(await client.set(nonListKey, "blmpop")).toBe("OK");
await expect(
client.blmpop([nonListKey], ListDirection.RIGHT, 0.1, 1),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);
}

export function runCommonTests<Context>(config: {
Expand Down
Loading

0 comments on commit ca0b317

Please sign in to comment.