From 5ae81cc086c3c5682283da97a7cdc20caf5d8e6e Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Thu, 1 Aug 2024 14:16:22 -0700 Subject: [PATCH 1/4] Node: Added `XDEL` command (#2064) * Added XDEL command Signed-off-by: Guian Gumpac * Fixed flaky test Signed-off-by: Guian Gumpac --------- Signed-off-by: Guian Gumpac --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 21 ++++++++++ node/src/Commands.ts | 13 +++++++ node/src/Transaction.ts | 28 ++++++++++++-- node/tests/SharedTests.ts | 76 ++++++++++++++++++++++++++++++++----- node/tests/TestUtilities.ts | 2 + 6 files changed, 128 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bf594b0e6..7e1cb142d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ * Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061)) * Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066)) +* Node: Added XDEL command ([#2064]((https://github.com/valkey-io/valkey-glide/pull/2064)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 2cd5025517..a7f3608ad6 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -142,6 +142,7 @@ import { createType, createUnlink, createXAdd, + createXDel, createXLen, createXRead, createXTrim, @@ -3489,6 +3490,26 @@ export class BaseClient { return this.createWritePromise(createXAdd(key, values, options)); } + /** + * Removes the specified entries by id from a stream, and returns the number of entries deleted. + * + * See https://valkey.io/commands/xdel for more details. + * + * @param key - The key of the stream. + * @param ids - An array of entry ids. + * @returns The number of entries removed from the stream. This number may be less than the number of entries in + * `ids`, if the specified `ids` don't exist in the stream. + * + * @example + * ```typescript + * console.log(await client.xdel("key", ["1538561698944-0", "1538561698944-1"])); + * // Output is 2 since the stream marked 2 entries as deleted. + * ``` + */ + public xdel(key: string, ids: string[]): Promise { + return this.createWritePromise(createXDel(key, ids)); + } + /** * Trims the stream stored at `key` by evicting older entries. * See https://valkey.io/commands/xtrim/ for more details. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index c59e56b806..c7390e85ef 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -1933,6 +1933,9 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) { } } +/** + * @internal + */ export function createXAdd( key: string, values: [string, string][], @@ -1962,6 +1965,16 @@ export function createXAdd( return createCommand(RequestType.XAdd, args); } +/** + * @internal + */ +export function createXDel( + key: string, + ids: string[], +): command_request.Command { + return createCommand(RequestType.XDel, [key, ...ids]); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 3292f20d73..9f5eab8f0c 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -172,6 +172,7 @@ import { createType, createUnlink, createXAdd, + createXDel, createXLen, createXRead, createXTrim, @@ -1984,7 +1985,9 @@ export class BaseTransaction> { * * @param key - The key of the stream. * @param values - field-value pairs to be added to the entry. - * @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. + * @param options - (Optional) Stream add options. + * + * Command Response - The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. */ public xadd( key: string, @@ -1994,13 +1997,29 @@ export class BaseTransaction> { return this.addAndReturn(createXAdd(key, values, options)); } + /** + * Removes the specified entries by id from a stream, and returns the number of entries deleted. + * + * See https://valkey.io/commands/xdel for more details. + * + * @param key - The key of the stream. + * @param ids - An array of entry ids. + * + * Command Response - The number of entries removed from the stream. This number may be less than the number of entries in + * `ids`, if the specified `ids` don't exist in the stream. + */ + public xdel(key: string, ids: string[]): T { + return this.addAndReturn(createXDel(key, ids)); + } + /** * Trims the stream stored at `key` by evicting older entries. * See https://valkey.io/commands/xtrim/ for more details. * * @param key - the key of the stream * @param options - options detailing how to trim the stream. - * @returns The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. + * + * Command Response - The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. */ public xtrim(key: string, options: StreamTrimOptions): T { return this.addAndReturn(createXTrim(key, options)); @@ -2009,7 +2028,7 @@ export class BaseTransaction> { /** Returns the server time. * See https://valkey.io/commands/time/ for details. * - * @returns - The current server time as a two items `array`: + * Command Response - The current server time as a two items `array`: * A Unix timestamp and the amount of microseconds already elapsed in the current second. * The returned `array` is in a [Unix timestamp, Microseconds already elapsed] format. */ @@ -2023,7 +2042,8 @@ export class BaseTransaction> { * * @param keys_and_ids - pairs of keys and entry ids to read from. A pair is composed of a stream's key and the id of the entry after which the stream will be read. * @param options - options detailing how to read the stream. - * @returns A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format. + * + * Command Response - A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format. */ public xread( keys_and_ids: Record, diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 84def5e10e..0f5f9f7c10 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -5941,11 +5941,8 @@ export function runBaseTests(config: { // Setup test data - use a large number of entries to force an iterative cursor. const numberMap: Record = {}; - const expectedNumberMapArray: string[] = []; - for (let i = 0; i < 10000; i++) { - expectedNumberMapArray.push(i.toString()); - expectedNumberMapArray.push(i.toString()); + for (let i = 0; i < 50000; i++) { numberMap[i.toString()] = i; } @@ -6018,15 +6015,18 @@ export function runBaseTests(config: { } // Fetching by cursor is randomized. - const expectedCombinedMapArray = - expectedNumberMapArray.concat(expectedCharMapArray); + const expectedFullMap: Record = { + ...numberMap, + ...charMap, + }; + expect(fullResultMapArray.length).toEqual( - expectedCombinedMapArray.length, + Object.keys(expectedFullMap).length * 2, ); for (let i = 0; i < fullResultMapArray.length; i += 2) { - expect(fullResultMapArray).toContain( - expectedCombinedMapArray[i], + expect(fullResultMapArray[i] in expectedFullMap).toEqual( + true, ); } @@ -6544,6 +6544,64 @@ export function runBaseTests(config: { }, config.timeout, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xdel test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const stringKey = uuidv4(); + const nonExistentKey = uuidv4(); + const streamId1 = "0-1"; + const streamId2 = "0-2"; + const streamId3 = "0-3"; + + expect( + await client.xadd( + key, + [ + ["f1", "foo1"], + ["f2", "foo2"], + ], + { id: streamId1 }, + ), + ).toEqual(streamId1); + + expect( + await client.xadd( + key, + [ + ["f1", "foo1"], + ["f2", "foo2"], + ], + { id: streamId2 }, + ), + ).toEqual(streamId2); + + expect(await client.xlen(key)).toEqual(2); + + // deletes one stream id, and ignores anything invalid + expect(await client.xdel(key, [streamId1, streamId3])).toEqual( + 1, + ); + expect(await client.xdel(nonExistentKey, [streamId3])).toEqual( + 0, + ); + + // invalid argument - id list should not be empty + await expect(client.xdel(key, [])).rejects.toThrow( + RequestError, + ); + + // key exists, but it is not a stream + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect( + client.xdel(stringKey, [streamId3]), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); } export function runCommonTests(config: { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 686241f2b5..4aa4cabbf9 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -841,6 +841,8 @@ export async function transactionTest( 'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }', 1, ]); + baseTransaction.xdel(key9, ["0-3", "0-5"]); + responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]); baseTransaction.rename(key9, key10); responseData.push(["rename(key9, key10)", "OK"]); baseTransaction.exists([key10]); From ca0b317f5e0f57200215db3de04462fef29c4c22 Mon Sep 17 00:00:00 2001 From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Date: Thu, 1 Aug 2024 16:46:47 -0700 Subject: [PATCH 2/4] Node: Add command LMPOP & BLMPOP (#2050) --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 66 +++++++++++++++ node/src/Commands.ts | 42 ++++++++++ node/src/Transaction.ts | 49 ++++++++++- node/tests/GlideClient.test.ts | 23 ++++-- node/tests/GlideClusterClient.test.ts | 2 + node/tests/SharedTests.ts | 112 ++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 19 +++++ 8 files changed, 307 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e1cb142d9..41d81b0401 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ * Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061)) * Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066)) * Node: Added XDEL command ([#2064]((https://github.com/valkey-io/valkey-glide/pull/2064)) +* Node: Added LMPOP & BLMPOP command ([#2050](https://github.com/valkey-io/valkey-glide/pull/2050)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index a7f3608ad6..c04d118091 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -55,6 +55,7 @@ import { createBitField, createBitOp, createBitPos, + createBLMPop, createDecr, createDecrBy, createDel, @@ -91,6 +92,7 @@ import { createLInsert, createLLen, createLMove, + createLMPop, createLPop, createLPos, createLPush, @@ -4514,6 +4516,70 @@ export class BaseClient { return this.createWritePromise(createSetRange(key, offset, value)); } + /** + * Pops one or more elements from the first non-empty list from the provided `keys`. + * + * See https://valkey.io/commands/lmpop/ for more details. + * + * @remarks When in cluster mode, all `key`s must map to the same hash slot. + * @param keys - An array of keys to lists. + * @param direction - The direction based on which elements are popped from - see {@link ListDirection}. + * @param count - (Optional) The maximum number of popped elements. + * @returns A `Record` of key-name mapped array of popped elements. + * + * since Valkey version 7.0.0. + * + * @example + * ```typescript + * await client.lpush("testKey", ["one", "two", "three"]); + * await client.lpush("testKey2", ["five", "six", "seven"]); + * const result = await client.lmpop(["testKey", "testKey2"], ListDirection.LEFT, 1L); + * console.log(result.get("testKey")); // Output: { "testKey": ["three"] } + * ``` + */ + public async lmpop( + keys: string[], + direction: ListDirection, + count?: number, + ): Promise> { + return this.createWritePromise(createLMPop(keys, direction, count)); + } + + /** + * Blocks the connection until it pops one or more elements from the first non-empty list from the + * provided `key`. `BLMPOP` is the blocking variant of {@link lmpop}. + * + * See https://valkey.io/commands/blmpop/ for more details. + * + * @remarks When in cluster mode, all `key`s must map to the same hash slot. + * @param keys - An array of keys to lists. + * @param direction - The direction based on which elements are popped from - see {@link ListDirection}. + * @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely. + * @param count - (Optional) The maximum number of popped elements. + * @returns - A `Record` of `key` name mapped array of popped elements. + * If no member could be popped and the timeout expired, returns `null`. + * + * since Valkey version 7.0.0. + * + * @example + * ```typescript + * await client.lpush("testKey", ["one", "two", "three"]); + * await client.lpush("testKey2", ["five", "six", "seven"]); + * const result = await client.blmpop(["testKey", "testKey2"], ListDirection.LEFT, 0.1, 1L); + * console.log(result.get("testKey")); // Output: { "testKey": ["three"] } + * ``` + */ + public async blmpop( + keys: string[], + direction: ListDirection, + timeout: number, + count?: number, + ): Promise> { + return this.createWritePromise( + createBLMPop(timeout, keys, direction, count), + ); + } + /** * @internal */ diff --git a/node/src/Commands.ts b/node/src/Commands.ts index c7390e85ef..5416883d9d 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3066,3 +3066,45 @@ export function createSetRange( ): command_request.Command { return createCommand(RequestType.SetRange, [key, offset.toString(), value]); } + +/** + * @internal + */ +export function createLMPop( + keys: string[], + direction: ListDirection, + count?: number, +): command_request.Command { + const args: string[] = [keys.length.toString(), ...keys, direction]; + + if (count !== undefined) { + args.push("COUNT"); + args.push(count.toString()); + } + + return createCommand(RequestType.LMPop, args); +} + +/** + * @internal + */ +export function createBLMPop( + timeout: number, + keys: string[], + direction: ListDirection, + count?: number, +): command_request.Command { + const args: string[] = [ + timeout.toString(), + keys.length.toString(), + ...keys, + direction, + ]; + + if (count !== undefined) { + args.push("COUNT"); + args.push(count.toString()); + } + + return createCommand(RequestType.BLMPop, args); +} diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 9f5eab8f0c..61f03233cf 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -53,6 +53,7 @@ import { ZAddOptions, createBLMove, createBLPop, + createBLMPop, createBRPop, createBZMPop, createBitCount, @@ -112,6 +113,7 @@ import { createLInsert, createLLen, createLMove, + createLMPop, createLPop, createLPos, createLPush, @@ -2524,8 +2526,7 @@ export class BaseTransaction> { * @param keys - The keys of the sorted sets. * @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or * {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly. - * @param timeout - The number of seconds to wait for a blocking operation to complete. - * A value of 0 will block indefinitely. + * @param timeout - The number of seconds to wait for a blocking operation to complete. A value of `0` will block indefinitely. * @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped. * * Command Response - A two-element `array` containing the key name of the set from which the element @@ -2732,6 +2733,50 @@ export class BaseTransaction> { public setrange(key: string, offset: number, value: string): T { return this.addAndReturn(createSetRange(key, offset, value)); } + + /** + * Pops one or more elements from the first non-empty list from the provided `keys`. + * + * See https://valkey.io/commands/lmpop/ for more details. + * + * @remarks When in cluster mode, `source` and `destination` must map to the same hash slot. + * @param keys - An array of keys to lists. + * @param direction - The direction based on which elements are popped from - see {@link ListDirection}. + * @param count - (Optional) The maximum number of popped elements. + * + * Command Response - A `Record` of `key` name mapped array of popped elements. + * + * since Valkey version 7.0.0. + */ + public lmpop(keys: string[], direction: ListDirection, count?: number): T { + return this.addAndReturn(createLMPop(keys, direction, count)); + } + + /** + * Blocks the connection until it pops one or more elements from the first non-empty list from the + * provided `key`. `BLMPOP` is the blocking variant of {@link lmpop}. + * + * See https://valkey.io/commands/blmpop/ for more details. + * + * @param keys - An array of keys to lists. + * @param direction - The direction based on which elements are popped from - see {@link ListDirection}. + * @param timeout - The number of seconds to wait for a blocking operation to complete. A value of + * `0` will block indefinitely. + * @param count - (Optional) The maximum number of popped elements. + * + * Command Response - A `Record` of `key` name mapped array of popped elements. + * If no member could be popped and the timeout expired, returns `null`. + * + * since Valkey version 7.0.0. + */ + public blmpop( + keys: string[], + direction: ListDirection, + timeout: number, + count?: number, + ): T { + return this.addAndReturn(createBLMPop(timeout, keys, direction, count)); + } } /** diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index c168284057..8893c81cf2 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -143,14 +143,27 @@ describe("GlideClient", () => { ListDirection.LEFT, 0.1, ); - const timeoutPromise = new Promise((resolve) => { - setTimeout(resolve, 500); - }); + + const blmpopPromise = client.blmpop( + ["key1", "key2"], + ListDirection.LEFT, + 0.1, + ); + + const promiseList = [blmovePromise, blmpopPromise]; try { - await Promise.race([blmovePromise, timeoutPromise]); + for (const promise of promiseList) { + const timeoutPromise = new Promise((resolve) => { + setTimeout(resolve, 500); + }); + await Promise.race([promise, timeoutPromise]); + } } finally { - Promise.resolve(blmovePromise); + for (const promise of promiseList) { + await Promise.resolve([promise]); + } + client.close(); } }, diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index f5c3b1603b..19d7b91c7b 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -332,6 +332,8 @@ describe("GlideClusterClient", () => { client.sdiffstore("abc", ["zxy", "lkn"]), client.sortStore("abc", "zyx"), client.sortStore("abc", "zyx", { isAlpha: true }), + client.lmpop(["abc", "def"], ListDirection.LEFT, 1), + client.blmpop(["abc", "def"], ListDirection.RIGHT, 0.1, 1), ]; if (gte(cluster.getVersion(), "6.2.0")) { diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 0f5f9f7c10..87c96f32d8 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -6602,6 +6602,118 @@ export function runBaseTests(config: { }, config.timeout, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `lmpop test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) { + return; + } + + const key1 = "{key}" + uuidv4(); + const key2 = "{key}" + uuidv4(); + const nonListKey = uuidv4(); + const singleKeyArray = [key1]; + const multiKeyArray = [key2, key1]; + const count = 1; + const lpushArgs = ["one", "two", "three", "four", "five"]; + const expected = { [key1]: ["five"] }; + const expected2 = { [key2]: ["one", "two"] }; + + // nothing to be popped + expect( + await client.lmpop( + singleKeyArray, + ListDirection.LEFT, + count, + ), + ).toBeNull(); + + // pushing to the arrays to be popped + expect(await client.lpush(key1, lpushArgs)).toEqual(5); + expect(await client.lpush(key2, lpushArgs)).toEqual(5); + + // checking correct result from popping + expect( + await client.lmpop(singleKeyArray, ListDirection.LEFT), + ).toEqual(expected); + + // popping multiple elements from the right + expect( + await client.lmpop(multiKeyArray, ListDirection.RIGHT, 2), + ).toEqual(expected2); + + // Key exists, but is not a set + expect(await client.set(nonListKey, "lmpop")).toBe("OK"); + await expect( + client.lmpop([nonListKey], ListDirection.RIGHT), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `blmpop test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient, cluster: RedisCluster) => { + if (cluster.checkIfServerVersionLessThan("7.0.0")) { + return; + } + + const key1 = "{key}" + uuidv4(); + const key2 = "{key}" + uuidv4(); + const nonListKey = uuidv4(); + const singleKeyArray = [key1]; + const multiKeyArray = [key2, key1]; + const count = 1; + const lpushArgs = ["one", "two", "three", "four", "five"]; + const expected = { [key1]: ["five"] }; + const expected2 = { [key2]: ["one", "two"] }; + + // nothing to be popped + expect( + await client.blmpop( + singleKeyArray, + ListDirection.LEFT, + 0.1, + count, + ), + ).toBeNull(); + + // pushing to the arrays to be popped + expect(await client.lpush(key1, lpushArgs)).toEqual(5); + expect(await client.lpush(key2, lpushArgs)).toEqual(5); + + // checking correct result from popping + expect( + await client.blmpop( + singleKeyArray, + ListDirection.LEFT, + 0.1, + ), + ).toEqual(expected); + + // popping multiple elements from the right + expect( + await client.blmpop( + multiKeyArray, + ListDirection.RIGHT, + 0.1, + 2, + ), + ).toEqual(expected2); + + // Key exists, but is not a set + expect(await client.set(nonListKey, "blmpop")).toBe("OK"); + await expect( + client.blmpop([nonListKey], ListDirection.RIGHT, 0.1, 1), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); } export function runCommonTests(config: { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 4aa4cabbf9..34d79f3f05 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -471,6 +471,7 @@ export async function transactionTest( const key21 = "{key}" + uuidv4(); // list for sort const key22 = "{key}" + uuidv4(); // list for sort const key23 = "{key}" + uuidv4(); // zset random + const key24 = "{key}" + uuidv4(); // list value const field = uuidv4(); const value = uuidv4(); // array of tuples - first element is test name/description, second - expected return value @@ -548,6 +549,24 @@ export async function transactionTest( field + "4", ]); responseData.push(["lpush(key5, [1, 2, 3, 4])", 4]); + + if (gte("7.0.0", version)) { + baseTransaction.lpush(key24, [field + "1", field + "2"]); + responseData.push(["lpush(key22, [1, 2])", 2]); + baseTransaction.lmpop([key24], ListDirection.LEFT); + responseData.push([ + "lmpop([key22], ListDirection.LEFT)", + { [key24]: [field + "2"] }, + ]); + baseTransaction.lpush(key24, [field + "2"]); + responseData.push(["lpush(key22, [2])", 2]); + baseTransaction.blmpop([key24], ListDirection.LEFT, 0.1, 1); + responseData.push([ + "blmpop([key22], ListDirection.LEFT, 0.1, 1)", + { [key24]: [field + "2"] }, + ]); + } + baseTransaction.lpop(key5); responseData.push(["lpop(key5)", field + "4"]); baseTransaction.llen(key5); From e9c27231f5534d98c020f44c5d88d11d1c14ecca Mon Sep 17 00:00:00 2001 From: jonathanl-bq <72158117+jonathanl-bq@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:44:52 -0700 Subject: [PATCH 3/4] Downgrade and fix cargo-deny version (#2081) Signed-off-by: Jonathan Louie --- .github/workflows/lint-rust/action.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint-rust/action.yml b/.github/workflows/lint-rust/action.yml index 8a7cdf185f..11ca944f71 100644 --- a/.github/workflows/lint-rust/action.yml +++ b/.github/workflows/lint-rust/action.yml @@ -42,7 +42,7 @@ runs: - run: | cargo update - cargo install cargo-deny + cargo install --locked --version 0.15.1 cargo-deny cargo deny check --config ${GITHUB_WORKSPACE}/deny.toml working-directory: ${{ inputs.cargo-toml-folder }} shell: bash From 14ebf287619b3f30c59d9b96093d6ee16b797756 Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Fri, 2 Aug 2024 14:37:02 -0700 Subject: [PATCH 4/4] Node: added WATCH and UNWATCH commands (#2076) * Node: added WATCH and UNWATCH commands Signed-off-by: Yi-Pin Chen --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 37 ++++++++- node/src/Commands.ts | 10 +++ node/src/GlideClient.ts | 23 +++++- node/src/GlideClusterClient.ts | 25 +++++- node/tests/GlideClient.test.ts | 112 +++++++++++++++++++++++++- node/tests/GlideClusterClient.test.ts | 105 +++++++++++++++++++++++- 7 files changed, 306 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41d81b0401..a679780281 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009)) * Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018)) * Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053)) +* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076)) * Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022)) * Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index c04d118091..97cfcce3dc 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -143,6 +143,7 @@ import { createTouch, createType, createUnlink, + createWatch, createXAdd, createXDel, createXLen, @@ -172,8 +173,8 @@ import { createZRemRangeByScore, createZRevRank, createZRevRankWithScore, - createZScore, createZScan, + createZScore, } from "./Commands"; import { ClosingError, @@ -4484,10 +4485,42 @@ export class BaseClient { * console.log(result); // Output: 2 - The last access time of 2 keys has been updated. * ``` */ - public touch(keys: string[]): Promise { + public async touch(keys: string[]): Promise { return this.createWritePromise(createTouch(keys)); } + /** + * Marks the given keys to be watched for conditional execution of a transaction. Transactions + * will only execute commands if the watched keys are not modified before execution of the + * transaction. Executing a transaction will automatically flush all previously watched keys. + * + * See https://valkey.io/commands/watch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @remarks When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots. + * @param keys - The keys to watch. + * @returns A simple "OK" response. + * + * @example + * ```typescript + * const response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * const transaction = new Transaction().set("SampleKey", "foobar"); + * const result = await client.exec(transaction); + * console.log(result); // Output: "OK" - Executes successfully and keys are unwatched. + * ``` + * ```typescript + * const response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * const transaction = new Transaction().set("SampleKey", "foobar"); + * await client.set("sampleKey", "hello world"); + * const result = await client.exec(transaction); + * console.log(result); // Output: null - null is returned when the watched key is modified before transaction execution. + * ``` + */ + public async watch(keys: string[]): Promise<"OK"> { + return this.createWritePromise(createWatch(keys)); + } + /** * Overwrites part of the string stored at `key`, starting at the specified `offset`, * for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 5416883d9d..8fb9d53b24 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3013,6 +3013,16 @@ export function createRandomKey(): command_request.Command { return createCommand(RequestType.RandomKey, []); } +/** @internal */ +export function createWatch(keys: string[]): command_request.Command { + return createCommand(RequestType.Watch, keys); +} + +/** @internal */ +export function createUnWatch(): command_request.Command { + return createCommand(RequestType.UnWatch, []); +} + /** * This base class represents the common set of optional arguments for the SCAN family of commands. * Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN, diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index 582b9cd3b5..1c62b34506 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -43,6 +43,7 @@ import { createSort, createSortReadOnly, createTime, + createUnWatch, } from "./Commands"; import { connection_request } from "./ProtobufMessage"; import { Transaction } from "./Transaction"; @@ -742,7 +743,27 @@ export class GlideClient extends BaseClient { * console.log(result); // Output: "key12" - "key12" is a random existing key name from the currently selected database. * ``` */ - public randomKey(): Promise { + public async randomKey(): Promise { return this.createWritePromise(createRandomKey()); } + + /** + * Flushes all the previously watched keys for a transaction. Executing a transaction will + * automatically flush all previously watched keys. + * + * See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @returns A simple "OK" response. + * + * @example + * ```typescript + * let response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * response = await client.unwatch(); + * console.log(response); // Output: "OK" + * ``` + */ + public async unwatch(): Promise<"OK"> { + return this.createWritePromise(createUnWatch()); + } } diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 8f1c9ad99c..0cc34f0b8a 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -44,6 +44,7 @@ import { createSort, createSortReadOnly, createTime, + createUnWatch, } from "./Commands"; import { RequestError } from "./Errors"; import { command_request, connection_request } from "./ProtobufMessage"; @@ -1117,10 +1118,32 @@ export class GlideClusterClient extends BaseClient { * console.log(result); // Output: "key12" - "key12" is a random existing key name. * ``` */ - public randomKey(route?: Routes): Promise { + public async randomKey(route?: Routes): Promise { return this.createWritePromise( createRandomKey(), toProtobufRoute(route), ); } + + /** + * Flushes all the previously watched keys for a transaction. Executing a transaction will + * automatically flush all previously watched keys. + * + * See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @param route - (Optional) The command will be routed to all primary nodes, unless `route` is provided, + * in which case the client will route the command to the nodes defined by `route`. + * @returns A simple "OK" response. + * + * @example + * ```typescript + * let response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * response = await client.unwatch(); + * console.log(response); // Output: "OK" + * ``` + */ + public async unwatch(route?: Routes): Promise<"OK"> { + return this.createWritePromise(createUnWatch(), toProtobufRoute(route)); + } } diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index 8893c81cf2..11b75ef82e 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -12,7 +12,13 @@ import { } from "@jest/globals"; import { BufferReader, BufferWriter } from "protobufjs"; import { v4 as uuidv4 } from "uuid"; -import { GlideClient, ListDirection, ProtocolVersion, Transaction } from ".."; +import { + GlideClient, + ListDirection, + ProtocolVersion, + RequestError, + Transaction, +} from ".."; import { RedisCluster } from "../../utils/TestUtils.js"; import { FlushMode, SortOrder } from "../build-ts/src/Commands"; import { command_request } from "../src/ProtobufMessage"; @@ -227,7 +233,7 @@ describe("GlideClient", () => { ); const transaction = new Transaction(); transaction.get("key"); - const result1 = await client1.customCommand(["WATCH", "key"]); + const result1 = await client1.watch(["key"]); expect(result1).toEqual("OK"); const result2 = await client2.set("key", "foo"); @@ -939,6 +945,108 @@ describe("GlideClient", () => { TIMEOUT, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "watch test_%p", + async (protocol) => { + const client = await GlideClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const key3 = "{key}-3" + uuidv4(); + const key4 = "{key}-4" + uuidv4(); + const setFoobarTransaction = new Transaction(); + const setHelloTransaction = new Transaction(); + + // Returns null when a watched key is modified before it is executed in a transaction command. + // Transaction commands are not performed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + setFoobarTransaction + .set(key1, "foobar") + .set(key2, "foobar") + .set(key3, "foobar"); + let results = await client.exec(setFoobarTransaction); + expect(results).toEqual(null); + // sanity check + expect(await client.get(key1)).toEqual(null); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual(null); + + // Transaction executes command successfully with a read command on the watch key before + // transaction is executed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.get(key2)).toEqual("hello"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with unmodified watched keys + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with a modified watched key but is not in the + // transaction. + expect(await client.watch([key4])).toEqual("OK"); + setHelloTransaction + .set(key1, "hello") + .set(key2, "hello") + .set(key3, "hello"); + results = await client.exec(setHelloTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("hello"); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual("hello"); + + // WATCH can not have an empty String array parameter + await expect(client.watch([])).rejects.toThrow(RequestError); + + client.close(); + }, + TIMEOUT, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "unwatch test_%p", + async (protocol) => { + const client = await GlideClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + + const setFoobarTransaction = new Transaction(); + + // UNWATCH returns OK when there no watched keys + expect(await client.unwatch()).toEqual("OK"); + + // Transaction executes successfully after modifying a watched key then calling UNWATCH + expect(await client.watch([key1, key2])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + expect(await client.unwatch()).toEqual("OK"); + setFoobarTransaction.set(key1, "foobar").set(key2, "foobar"); + const results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + + client.close(); + }, + TIMEOUT, + ); + runBaseTests({ init: async (protocol, clientName?) => { const options = getClientConfigurationOption( diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 19d7b91c7b..7b53e16edd 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -274,7 +274,7 @@ describe("GlideClusterClient", () => { ); const transaction = new ClusterTransaction(); transaction.get("key"); - const result1 = await client1.customCommand(["WATCH", "key"]); + const result1 = await client1.watch(["key"]); expect(result1).toEqual("OK"); const result2 = await client2.set("key", "foo"); @@ -385,6 +385,7 @@ describe("GlideClusterClient", () => { await client.mget(["abc", "zxy", "lkn"]); await client.mset({ abc: "1", zxy: "2", lkn: "3" }); await client.touch(["abc", "zxy", "lkn"]); + await client.watch(["ghi", "zxy", "lkn"]); client.close(); }, ); @@ -1110,4 +1111,106 @@ describe("GlideClusterClient", () => { }, TIMEOUT, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "watch test_%p", + async (protocol) => { + const client = await GlideClusterClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const key3 = "{key}-3" + uuidv4(); + const key4 = "{key}-4" + uuidv4(); + const setFoobarTransaction = new ClusterTransaction(); + const setHelloTransaction = new ClusterTransaction(); + + // Returns null when a watched key is modified before it is executed in a transaction command. + // Transaction commands are not performed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + setFoobarTransaction + .set(key1, "foobar") + .set(key2, "foobar") + .set(key3, "foobar"); + let results = await client.exec(setFoobarTransaction); + expect(results).toEqual(null); + // sanity check + expect(await client.get(key1)).toEqual(null); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual(null); + + // Transaction executes command successfully with a read command on the watch key before + // transaction is executed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.get(key2)).toEqual("hello"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with unmodified watched keys + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with a modified watched key but is not in the + // transaction. + expect(await client.watch([key4])).toEqual("OK"); + setHelloTransaction + .set(key1, "hello") + .set(key2, "hello") + .set(key3, "hello"); + results = await client.exec(setHelloTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("hello"); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual("hello"); + + // WATCH can not have an empty String array parameter + await expect(client.watch([])).rejects.toThrow(RequestError); + + client.close(); + }, + TIMEOUT, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "unwatch test_%p", + async (protocol) => { + const client = await GlideClusterClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const setFoobarTransaction = new ClusterTransaction(); + + // UNWATCH returns OK when there no watched keys + expect(await client.unwatch()).toEqual("OK"); + + // Transaction executes successfully after modifying a watched key then calling UNWATCH + expect(await client.watch([key1, key2])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + expect(await client.unwatch()).toEqual("OK"); + expect(await client.unwatch("allPrimaries")).toEqual("OK"); + setFoobarTransaction.set(key1, "foobar").set(key2, "foobar"); + const results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + + client.close(); + }, + TIMEOUT, + ); });