From 14ebf287619b3f30c59d9b96093d6ee16b797756 Mon Sep 17 00:00:00 2001 From: Yi-Pin Chen Date: Fri, 2 Aug 2024 14:37:02 -0700 Subject: [PATCH] Node: added WATCH and UNWATCH commands (#2076) * Node: added WATCH and UNWATCH commands Signed-off-by: Yi-Pin Chen --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 37 ++++++++- node/src/Commands.ts | 10 +++ node/src/GlideClient.ts | 23 +++++- node/src/GlideClusterClient.ts | 25 +++++- node/tests/GlideClient.test.ts | 112 +++++++++++++++++++++++++- node/tests/GlideClusterClient.test.ts | 105 +++++++++++++++++++++++- 7 files changed, 306 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41d81b0401..a679780281 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ * Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009)) * Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018)) * Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053)) +* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076)) * Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022)) * Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025)) * Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index c04d118091..97cfcce3dc 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -143,6 +143,7 @@ import { createTouch, createType, createUnlink, + createWatch, createXAdd, createXDel, createXLen, @@ -172,8 +173,8 @@ import { createZRemRangeByScore, createZRevRank, createZRevRankWithScore, - createZScore, createZScan, + createZScore, } from "./Commands"; import { ClosingError, @@ -4484,10 +4485,42 @@ export class BaseClient { * console.log(result); // Output: 2 - The last access time of 2 keys has been updated. * ``` */ - public touch(keys: string[]): Promise { + public async touch(keys: string[]): Promise { return this.createWritePromise(createTouch(keys)); } + /** + * Marks the given keys to be watched for conditional execution of a transaction. Transactions + * will only execute commands if the watched keys are not modified before execution of the + * transaction. Executing a transaction will automatically flush all previously watched keys. + * + * See https://valkey.io/commands/watch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @remarks When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots. + * @param keys - The keys to watch. + * @returns A simple "OK" response. + * + * @example + * ```typescript + * const response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * const transaction = new Transaction().set("SampleKey", "foobar"); + * const result = await client.exec(transaction); + * console.log(result); // Output: "OK" - Executes successfully and keys are unwatched. + * ``` + * ```typescript + * const response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * const transaction = new Transaction().set("SampleKey", "foobar"); + * await client.set("sampleKey", "hello world"); + * const result = await client.exec(transaction); + * console.log(result); // Output: null - null is returned when the watched key is modified before transaction execution. + * ``` + */ + public async watch(keys: string[]): Promise<"OK"> { + return this.createWritePromise(createWatch(keys)); + } + /** * Overwrites part of the string stored at `key`, starting at the specified `offset`, * for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 5416883d9d..8fb9d53b24 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3013,6 +3013,16 @@ export function createRandomKey(): command_request.Command { return createCommand(RequestType.RandomKey, []); } +/** @internal */ +export function createWatch(keys: string[]): command_request.Command { + return createCommand(RequestType.Watch, keys); +} + +/** @internal */ +export function createUnWatch(): command_request.Command { + return createCommand(RequestType.UnWatch, []); +} + /** * This base class represents the common set of optional arguments for the SCAN family of commands. * Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN, diff --git a/node/src/GlideClient.ts b/node/src/GlideClient.ts index 582b9cd3b5..1c62b34506 100644 --- a/node/src/GlideClient.ts +++ b/node/src/GlideClient.ts @@ -43,6 +43,7 @@ import { createSort, createSortReadOnly, createTime, + createUnWatch, } from "./Commands"; import { connection_request } from "./ProtobufMessage"; import { Transaction } from "./Transaction"; @@ -742,7 +743,27 @@ export class GlideClient extends BaseClient { * console.log(result); // Output: "key12" - "key12" is a random existing key name from the currently selected database. * ``` */ - public randomKey(): Promise { + public async randomKey(): Promise { return this.createWritePromise(createRandomKey()); } + + /** + * Flushes all the previously watched keys for a transaction. Executing a transaction will + * automatically flush all previously watched keys. + * + * See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @returns A simple "OK" response. + * + * @example + * ```typescript + * let response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * response = await client.unwatch(); + * console.log(response); // Output: "OK" + * ``` + */ + public async unwatch(): Promise<"OK"> { + return this.createWritePromise(createUnWatch()); + } } diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 8f1c9ad99c..0cc34f0b8a 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -44,6 +44,7 @@ import { createSort, createSortReadOnly, createTime, + createUnWatch, } from "./Commands"; import { RequestError } from "./Errors"; import { command_request, connection_request } from "./ProtobufMessage"; @@ -1117,10 +1118,32 @@ export class GlideClusterClient extends BaseClient { * console.log(result); // Output: "key12" - "key12" is a random existing key name. * ``` */ - public randomKey(route?: Routes): Promise { + public async randomKey(route?: Routes): Promise { return this.createWritePromise( createRandomKey(), toProtobufRoute(route), ); } + + /** + * Flushes all the previously watched keys for a transaction. Executing a transaction will + * automatically flush all previously watched keys. + * + * See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details. + * + * @param route - (Optional) The command will be routed to all primary nodes, unless `route` is provided, + * in which case the client will route the command to the nodes defined by `route`. + * @returns A simple "OK" response. + * + * @example + * ```typescript + * let response = await client.watch(["sampleKey"]); + * console.log(response); // Output: "OK" + * response = await client.unwatch(); + * console.log(response); // Output: "OK" + * ``` + */ + public async unwatch(route?: Routes): Promise<"OK"> { + return this.createWritePromise(createUnWatch(), toProtobufRoute(route)); + } } diff --git a/node/tests/GlideClient.test.ts b/node/tests/GlideClient.test.ts index 8893c81cf2..11b75ef82e 100644 --- a/node/tests/GlideClient.test.ts +++ b/node/tests/GlideClient.test.ts @@ -12,7 +12,13 @@ import { } from "@jest/globals"; import { BufferReader, BufferWriter } from "protobufjs"; import { v4 as uuidv4 } from "uuid"; -import { GlideClient, ListDirection, ProtocolVersion, Transaction } from ".."; +import { + GlideClient, + ListDirection, + ProtocolVersion, + RequestError, + Transaction, +} from ".."; import { RedisCluster } from "../../utils/TestUtils.js"; import { FlushMode, SortOrder } from "../build-ts/src/Commands"; import { command_request } from "../src/ProtobufMessage"; @@ -227,7 +233,7 @@ describe("GlideClient", () => { ); const transaction = new Transaction(); transaction.get("key"); - const result1 = await client1.customCommand(["WATCH", "key"]); + const result1 = await client1.watch(["key"]); expect(result1).toEqual("OK"); const result2 = await client2.set("key", "foo"); @@ -939,6 +945,108 @@ describe("GlideClient", () => { TIMEOUT, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "watch test_%p", + async (protocol) => { + const client = await GlideClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const key3 = "{key}-3" + uuidv4(); + const key4 = "{key}-4" + uuidv4(); + const setFoobarTransaction = new Transaction(); + const setHelloTransaction = new Transaction(); + + // Returns null when a watched key is modified before it is executed in a transaction command. + // Transaction commands are not performed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + setFoobarTransaction + .set(key1, "foobar") + .set(key2, "foobar") + .set(key3, "foobar"); + let results = await client.exec(setFoobarTransaction); + expect(results).toEqual(null); + // sanity check + expect(await client.get(key1)).toEqual(null); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual(null); + + // Transaction executes command successfully with a read command on the watch key before + // transaction is executed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.get(key2)).toEqual("hello"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with unmodified watched keys + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with a modified watched key but is not in the + // transaction. + expect(await client.watch([key4])).toEqual("OK"); + setHelloTransaction + .set(key1, "hello") + .set(key2, "hello") + .set(key3, "hello"); + results = await client.exec(setHelloTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("hello"); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual("hello"); + + // WATCH can not have an empty String array parameter + await expect(client.watch([])).rejects.toThrow(RequestError); + + client.close(); + }, + TIMEOUT, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "unwatch test_%p", + async (protocol) => { + const client = await GlideClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + + const setFoobarTransaction = new Transaction(); + + // UNWATCH returns OK when there no watched keys + expect(await client.unwatch()).toEqual("OK"); + + // Transaction executes successfully after modifying a watched key then calling UNWATCH + expect(await client.watch([key1, key2])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + expect(await client.unwatch()).toEqual("OK"); + setFoobarTransaction.set(key1, "foobar").set(key2, "foobar"); + const results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + + client.close(); + }, + TIMEOUT, + ); + runBaseTests({ init: async (protocol, clientName?) => { const options = getClientConfigurationOption( diff --git a/node/tests/GlideClusterClient.test.ts b/node/tests/GlideClusterClient.test.ts index 19d7b91c7b..7b53e16edd 100644 --- a/node/tests/GlideClusterClient.test.ts +++ b/node/tests/GlideClusterClient.test.ts @@ -274,7 +274,7 @@ describe("GlideClusterClient", () => { ); const transaction = new ClusterTransaction(); transaction.get("key"); - const result1 = await client1.customCommand(["WATCH", "key"]); + const result1 = await client1.watch(["key"]); expect(result1).toEqual("OK"); const result2 = await client2.set("key", "foo"); @@ -385,6 +385,7 @@ describe("GlideClusterClient", () => { await client.mget(["abc", "zxy", "lkn"]); await client.mset({ abc: "1", zxy: "2", lkn: "3" }); await client.touch(["abc", "zxy", "lkn"]); + await client.watch(["ghi", "zxy", "lkn"]); client.close(); }, ); @@ -1110,4 +1111,106 @@ describe("GlideClusterClient", () => { }, TIMEOUT, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "watch test_%p", + async (protocol) => { + const client = await GlideClusterClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const key3 = "{key}-3" + uuidv4(); + const key4 = "{key}-4" + uuidv4(); + const setFoobarTransaction = new ClusterTransaction(); + const setHelloTransaction = new ClusterTransaction(); + + // Returns null when a watched key is modified before it is executed in a transaction command. + // Transaction commands are not performed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + setFoobarTransaction + .set(key1, "foobar") + .set(key2, "foobar") + .set(key3, "foobar"); + let results = await client.exec(setFoobarTransaction); + expect(results).toEqual(null); + // sanity check + expect(await client.get(key1)).toEqual(null); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual(null); + + // Transaction executes command successfully with a read command on the watch key before + // transaction is executed. + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + expect(await client.get(key2)).toEqual("hello"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with unmodified watched keys + expect(await client.watch([key1, key2, key3])).toEqual("OK"); + results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + expect(await client.get(key3)).toEqual("foobar"); + + // Transaction executes command successfully with a modified watched key but is not in the + // transaction. + expect(await client.watch([key4])).toEqual("OK"); + setHelloTransaction + .set(key1, "hello") + .set(key2, "hello") + .set(key3, "hello"); + results = await client.exec(setHelloTransaction); + expect(results).toEqual(["OK", "OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("hello"); + expect(await client.get(key2)).toEqual("hello"); + expect(await client.get(key3)).toEqual("hello"); + + // WATCH can not have an empty String array parameter + await expect(client.watch([])).rejects.toThrow(RequestError); + + client.close(); + }, + TIMEOUT, + ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + "unwatch test_%p", + async (protocol) => { + const client = await GlideClusterClient.createClient( + getClientConfigurationOption(cluster.getAddresses(), protocol), + ); + + const key1 = "{key}-1" + uuidv4(); + const key2 = "{key}-2" + uuidv4(); + const setFoobarTransaction = new ClusterTransaction(); + + // UNWATCH returns OK when there no watched keys + expect(await client.unwatch()).toEqual("OK"); + + // Transaction executes successfully after modifying a watched key then calling UNWATCH + expect(await client.watch([key1, key2])).toEqual("OK"); + expect(await client.set(key2, "hello")).toEqual("OK"); + expect(await client.unwatch()).toEqual("OK"); + expect(await client.unwatch("allPrimaries")).toEqual("OK"); + setFoobarTransaction.set(key1, "foobar").set(key2, "foobar"); + const results = await client.exec(setFoobarTransaction); + expect(results).toEqual(["OK", "OK"]); + // sanity check + expect(await client.get(key1)).toEqual("foobar"); + expect(await client.get(key2)).toEqual("foobar"); + + client.close(); + }, + TIMEOUT, + ); });