Skip to content

Commit

Permalink
Node: added WAIT command (valkey-io#2113)
Browse files Browse the repository at this point in the history
* Node: added WAIT command

Signed-off-by: Yi-Pin Chen <[email protected]>
  • Loading branch information
yipin-chen authored Aug 14, 2024
1 parent a05332e commit eb92d07
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018))
* Node: Added PFMERGE command ([#2053](https://github.com/valkey-io/valkey-glide/pull/2053))
* Node: Added WATCH and UNWATCH commands ([#2076](https://github.com/valkey-io/valkey-glide/pull/2076))
* Node: Added WAIT command ([#2113](https://github.com/valkey-io/valkey-glide/pull/2113))
* Node: Added ZLEXCOUNT command ([#2022](https://github.com/valkey-io/valkey-glide/pull/2022))
* Node: Added ZREMRANGEBYLEX command ([#2025](https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added ZRANGESTORE command ([#2068](https://github.com/valkey-io/valkey-glide/pull/2068))
Expand Down
23 changes: 23 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ import {
createTouch,
createType,
createUnlink,
createWait,
createWatch,
createXAdd,
createXAutoClaim,
Expand Down Expand Up @@ -5546,6 +5547,28 @@ export class BaseClient {
return this.createWritePromise(createWatch(keys));
}

/**
* Blocks the current client until all the previous write commands are successfully transferred and
* acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns
* the number of replicas that were not yet reached.
*
* See https://valkey.io/commands/wait/ for more details.
*
* @param numreplicas - The number of replicas to reach.
* @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely.
* @returns The number of replicas reached by all the writes performed in the context of the current connection.
*
* @example
* ```typescript
* await client.set(key, value);
* let response = await client.wait(1, 1000);
* console.log(response); // Output: return 1 when a replica is reached or 0 if 1000ms is reached.
* ```
*/
public async wait(numreplicas: number, timeout: number): Promise<number> {
return this.createWritePromise(createWait(numreplicas, timeout));
}

/**
* Overwrites part of the string stored at `key`, starting at the specified `offset`,
* for the entire length of `value`. If the `offset` is larger than the current length of the string at `key`,
Expand Down
11 changes: 11 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3452,6 +3452,17 @@ export function createUnWatch(): command_request.Command {
return createCommand(RequestType.UnWatch, []);
}

/** @internal */
export function createWait(
numreplicas: number,
timeout: number,
): command_request.Command {
return createCommand(RequestType.Wait, [
numreplicas.toString(),
timeout.toString(),
]);
}

/**
* This base class represents the common set of optional arguments for the SCAN family of commands.
* Concrete implementations of this class are tied to specific SCAN commands (SCAN, HSCAN, SSCAN,
Expand Down
18 changes: 18 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ import {
createTouch,
createType,
createUnlink,
createWait,
createXAdd,
createXAutoClaim,
createXClaim,
Expand Down Expand Up @@ -2774,6 +2775,23 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createLolwut(options));
}

/**
* Blocks the current client until all the previous write commands are successfully transferred and
* acknowledged by at least `numreplicas` of replicas. If `timeout` is reached, the command returns
* the number of replicas that were not yet reached.
*
* See https://valkey.io/commands/wait/ for more details.
*
* @param numreplicas - The number of replicas to reach.
* @param timeout - The timeout value specified in milliseconds. A value of 0 will block indefinitely.
*
* Command Response - The number of replicas reached by all the writes performed in the context of the
* current connection.
*/
public wait(numreplicas: number, timeout: number): T {
return this.addAndReturn(createWait(numreplicas, timeout));
}

/**
* Invokes a previously loaded function.
*
Expand Down
32 changes: 32 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5633,6 +5633,38 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"wait test_%p",
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const value1 = uuidv4();
const value2 = uuidv4();

// assert that wait returns 0 under standalone and 1 under cluster mode.
expect(await client.set(key, value1)).toEqual("OK");

if (client instanceof GlideClusterClient) {
expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual(
1,
);
} else {
expect(await client.wait(1, 1000)).toBeGreaterThanOrEqual(
0,
);
}

// command should fail on a negative timeout value
await expect(client.wait(1, -1)).rejects.toThrow(RequestError);

// ensure that command doesn't time out even if timeout > request timeout (250ms by default)
expect(await client.set(key, value2)).toEqual("OK");
expect(await client.wait(100, 500)).toBeGreaterThanOrEqual(0);
}, protocol);
},
config.timeout,
);

// Set command tests

async function setWithExpiryOptions(client: BaseClient) {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1518,5 +1518,7 @@ export async function transactionTest(
responseData.push(["sortReadOnly(key21)", ["1", "2", "3"]]);
}

baseTransaction.wait(1, 200);
responseData.push(["wait(1, 200)", 1]);
return responseData;
}

0 comments on commit eb92d07

Please sign in to comment.