Skip to content

Commit

Permalink
Node: added WATCH and UNWATCH commands (valkey-io#2076)
Browse files Browse the repository at this point in the history
* Node: added WATCH and UNWATCH commands

Signed-off-by: Yi-Pin Chen <[email protected]>
  • Loading branch information
yipin-chen authored Aug 2, 2024
1 parent e9c2723 commit 14ebf28
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009))
* Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018))
* Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053))
* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076))
* Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022))
* Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061))
Expand Down
37 changes: 35 additions & 2 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ import {
createTouch,
createType,
createUnlink,
createWatch,
createXAdd,
createXDel,
createXLen,
Expand Down Expand Up @@ -172,8 +173,8 @@ import {
createZRemRangeByScore,
createZRevRank,
createZRevRankWithScore,
createZScore,
createZScan,
createZScore,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -4484,10 +4485,42 @@ export class BaseClient {
* console.log(result); // Output: 2 - The last access time of 2 keys has been updated.
* ```
*/
public touch(keys: string[]): Promise<number> {
public async touch(keys: string[]): Promise<number> {
return this.createWritePromise(createTouch(keys));
}

/**
* Marks the given keys to be watched for conditional execution of a transaction. Transactions
* will only execute commands if the watched keys are not modified before execution of the
* transaction. Executing a transaction will automatically flush all previously watched keys.
*
* See https://valkey.io/commands/watch/ and https://valkey.io/topics/transactions/#cas for more details.
*
* @remarks When in cluster mode, the command may route to multiple nodes when `keys` map to different hash slots.
* @param keys - The keys to watch.
* @returns A simple "OK" response.
*
* @example
* ```typescript
* const response = await client.watch(["sampleKey"]);
* console.log(response); // Output: "OK"
* const transaction = new Transaction().set("SampleKey", "foobar");
* const result = await client.exec(transaction);
* console.log(result); // Output: "OK" - Executes successfully and keys are unwatched.
* ```
* ```typescript
* const response = await client.watch(["sampleKey"]);
* console.log(response); // Output: "OK"
* const transaction = new Transaction().set("SampleKey", "foobar");
* await client.set("sampleKey", "hello world");
* const result = await client.exec(transaction);
* console.log(result); // Output: null - null is returned when the watched key is modified before transaction execution.
* ```
*/
public async watch(keys: string[]): Promise<"OK"> {
return this.createWritePromise(createWatch(keys));
}

/**
* Overwrites part of the string stored at `key`, starting at the specified `offset`,
* for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`,
Expand Down
10 changes: 10 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,16 @@ export function createRandomKey(): command_request.Command {
return createCommand(RequestType.RandomKey, []);
}

/** @internal */
export function createWatch(keys: string[]): command_request.Command {
return createCommand(RequestType.Watch, keys);
}

/** @internal */
export function createUnWatch(): command_request.Command {
return createCommand(RequestType.UnWatch, []);
}

/**
* This base class represents the common set of optional arguments for the SCAN family of commands.
* Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN,
Expand Down
23 changes: 22 additions & 1 deletion node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
createSort,
createSortReadOnly,
createTime,
createUnWatch,
} from "./Commands";
import { connection_request } from "./ProtobufMessage";
import { Transaction } from "./Transaction";
Expand Down Expand Up @@ -742,7 +743,27 @@ export class GlideClient extends BaseClient {
* console.log(result); // Output: "key12" - "key12" is a random existing key name from the currently selected database.
* ```
*/
public randomKey(): Promise<string | null> {
public async randomKey(): Promise<string | null> {
return this.createWritePromise(createRandomKey());
}

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details.
*
* @returns A simple "OK" response.
*
* @example
* ```typescript
* let response = await client.watch(["sampleKey"]);
* console.log(response); // Output: "OK"
* response = await client.unwatch();
* console.log(response); // Output: "OK"
* ```
*/
public async unwatch(): Promise<"OK"> {
return this.createWritePromise(createUnWatch());
}
}
25 changes: 24 additions & 1 deletion node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
createSort,
createSortReadOnly,
createTime,
createUnWatch,
} from "./Commands";
import { RequestError } from "./Errors";
import { command_request, connection_request } from "./ProtobufMessage";
Expand Down Expand Up @@ -1117,10 +1118,32 @@ export class GlideClusterClient extends BaseClient {
* console.log(result); // Output: "key12" - "key12" is a random existing key name.
* ```
*/
public randomKey(route?: Routes): Promise<string | null> {
public async randomKey(route?: Routes): Promise<string | null> {
return this.createWritePromise(
createRandomKey(),
toProtobufRoute(route),
);
}

/**
* Flushes all the previously watched keys for a transaction. Executing a transaction will
* automatically flush all previously watched keys.
*
* See https://valkey.io/commands/unwatch/ and https://valkey.io/topics/transactions/#cas for more details.
*
* @param route - (Optional) The command will be routed to all primary nodes, unless `route` is provided,
* in which case the client will route the command to the nodes defined by `route`.
* @returns A simple "OK" response.
*
* @example
* ```typescript
* let response = await client.watch(["sampleKey"]);
* console.log(response); // Output: "OK"
* response = await client.unwatch();
* console.log(response); // Output: "OK"
* ```
*/
public async unwatch(route?: Routes): Promise<"OK"> {
return this.createWritePromise(createUnWatch(), toProtobufRoute(route));
}
}
112 changes: 110 additions & 2 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import {
} from "@jest/globals";
import { BufferReader, BufferWriter } from "protobufjs";
import { v4 as uuidv4 } from "uuid";
import { GlideClient, ListDirection, ProtocolVersion, Transaction } from "..";
import {
GlideClient,
ListDirection,
ProtocolVersion,
RequestError,
Transaction,
} from "..";
import { RedisCluster } from "../../utils/TestUtils.js";
import { FlushMode, SortOrder } from "../build-ts/src/Commands";
import { command_request } from "../src/ProtobufMessage";
Expand Down Expand Up @@ -227,7 +233,7 @@ describe("GlideClient", () => {
);
const transaction = new Transaction();
transaction.get("key");
const result1 = await client1.customCommand(["WATCH", "key"]);
const result1 = await client1.watch(["key"]);
expect(result1).toEqual("OK");

const result2 = await client2.set("key", "foo");
Expand Down Expand Up @@ -939,6 +945,108 @@ describe("GlideClient", () => {
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"watch test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const key3 = "{key}-3" + uuidv4();
const key4 = "{key}-4" + uuidv4();
const setFoobarTransaction = new Transaction();
const setHelloTransaction = new Transaction();

// Returns null when a watched key is modified before it is executed in a transaction command.
// Transaction commands are not performed.
expect(await client.watch([key1, key2, key3])).toEqual("OK");
expect(await client.set(key2, "hello")).toEqual("OK");
setFoobarTransaction
.set(key1, "foobar")
.set(key2, "foobar")
.set(key3, "foobar");
let results = await client.exec(setFoobarTransaction);
expect(results).toEqual(null);
// sanity check
expect(await client.get(key1)).toEqual(null);
expect(await client.get(key2)).toEqual("hello");
expect(await client.get(key3)).toEqual(null);

// Transaction executes command successfully with a read command on the watch key before
// transaction is executed.
expect(await client.watch([key1, key2, key3])).toEqual("OK");
expect(await client.get(key2)).toEqual("hello");
results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");
expect(await client.get(key3)).toEqual("foobar");

// Transaction executes command successfully with unmodified watched keys
expect(await client.watch([key1, key2, key3])).toEqual("OK");
results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");
expect(await client.get(key3)).toEqual("foobar");

// Transaction executes command successfully with a modified watched key but is not in the
// transaction.
expect(await client.watch([key4])).toEqual("OK");
setHelloTransaction
.set(key1, "hello")
.set(key2, "hello")
.set(key3, "hello");
results = await client.exec(setHelloTransaction);
expect(results).toEqual(["OK", "OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("hello");
expect(await client.get(key2)).toEqual("hello");
expect(await client.get(key3)).toEqual("hello");

// WATCH can not have an empty String array parameter
await expect(client.watch([])).rejects.toThrow(RequestError);

client.close();
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"unwatch test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();

const setFoobarTransaction = new Transaction();

// UNWATCH returns OK when there no watched keys
expect(await client.unwatch()).toEqual("OK");

// Transaction executes successfully after modifying a watched key then calling UNWATCH
expect(await client.watch([key1, key2])).toEqual("OK");
expect(await client.set(key2, "hello")).toEqual("OK");
expect(await client.unwatch()).toEqual("OK");
setFoobarTransaction.set(key1, "foobar").set(key2, "foobar");
const results = await client.exec(setFoobarTransaction);
expect(results).toEqual(["OK", "OK"]);
// sanity check
expect(await client.get(key1)).toEqual("foobar");
expect(await client.get(key2)).toEqual("foobar");

client.close();
},
TIMEOUT,
);

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
Expand Down
Loading

0 comments on commit 14ebf28

Please sign in to comment.