Skip to content

Commit

Permalink
Python: command WAIT (valkey-io#1710)
Browse files Browse the repository at this point in the history
* Python: command WAIT

* Python: command WAIT

* changelog

* doc & test changes

* linter

* doc update

* update example

* addressing commends rd2

---------

Co-authored-by: TJ Zhang <[email protected]>
  • Loading branch information
tjzhang-BQ and TJ Zhang authored Jun 30, 2024
1 parent 7ddd28c commit e7cbabb
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* Python: Added FUNCTION DELETE command ([#1714](https://github.com/aws/glide-for-redis/pull/1714))
* Python: Added SSCAN command ([#1709](https://github.com/aws/glide-for-redis/pull/1709))
* Python: Added LCS command ([#1716](https://github.com/aws/glide-for-redis/pull/1716))
* Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
32 changes: 32 additions & 0 deletions python/python/glide/async_commands/cluster_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,35 @@ async def random_key(self, route: Optional[Route] = None) -> Optional[str]:
Optional[str],
await self._execute_command(RequestType.RandomKey, [], route),
)

async def wait(
self,
numreplicas: int,
timeout: int,
route: Optional[Route] = None,
) -> int:
"""
Blocks the current client until all the previous write commands are successfully transferred
and acknowledged by at least `numreplicas` of replicas. If `timeout` is
reached, the command returns even if the specified number of replicas were not yet reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds. A value of 0 will block indefinitely.
route (Optional[Route]): The command will be routed to all primary nodes, unless `route` is provided,
in which case the client will route the command to the nodes defined by `route`.
Returns:
int: The number of replicas reached by all the writes performed in the context of the current connection.
Examples:
>>> await client.set("key", "value");
>>> await client.wait(1, 1000);
// return 1 when a replica is reached or 0 if 1000ms is reached.
"""
args = [str(numreplicas), str(timeout)]
return cast(
int,
await self._execute_command(RequestType.Wait, args, route),
)
30 changes: 30 additions & 0 deletions python/python/glide/async_commands/standalone_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,3 +665,33 @@ async def random_key(self) -> Optional[str]:
Optional[str],
await self._execute_command(RequestType.RandomKey, []),
)

async def wait(
self,
numreplicas: int,
timeout: int,
) -> int:
"""
Blocks the current client until all the previous write commands are successfully transferred
and acknowledged by at least `numreplicas` of replicas. If `timeout` is
reached, the command returns even if the specified number of replicas were not yet reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds. A value of 0 will block indefinitely.
Returns:
int: The number of replicas reached by all the writes performed in the context of the current connection.
Examples:
>>> await client.set("key", "value");
>>> await client.wait(1, 1000);
// return 1 when a replica is reached or 0 if 1000ms is reached.
"""
args = [str(numreplicas), str(timeout)]
return cast(
int,
await self._execute_command(RequestType.Wait, args),
)
22 changes: 22 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4053,6 +4053,28 @@ def lcs_idx(

return self.append_command(RequestType.LCS, args)

def wait(
self: TTransaction,
numreplicas: int,
timeout: int,
) -> TTransaction:
"""
Returns the number of replicas that acknowledged the write commands sent by the current client
before this command, both in the case where the specified number of replicas are reached, or
when the timeout is reached.
See https://valkey.io/commands/wait for more details.
Args:
numreplicas (int): The number of replicas to reach.
timeout (int): The timeout value specified in milliseconds.
Command Response:
str: The number of replicas reached by all the writes performed in the context of the current connection.
"""
args = [str(numreplicas), str(timeout)]
return self.append_command(RequestType.Wait, args)


class Transaction(BaseTransaction):
"""
Expand Down
21 changes: 21 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7261,6 +7261,27 @@ async def test_copy_database(self, redis_client: GlideClient):
finally:
assert await redis_client.select(0) == OK

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_wait(self, redis_client: TGlideClient):
key = f"{{key}}-1{get_random_string(5)}"
value = get_random_string(5)
value2 = get_random_string(5)

assert await redis_client.set(key, value) == OK
if isinstance(redis_client, GlideClusterClient):
assert await redis_client.wait(1, 1000) >= 1
else:
assert await redis_client.wait(1, 1000) >= 0

# ensure that command doesn't time out even if timeout > request timeout (250ms by default)
assert await redis_client.set(key, value2) == OK
assert await redis_client.wait(100, 500) >= 0

# command should fail on a negative timeout value
with pytest.raises(RequestError):
await redis_client.wait(1, -1)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_lolwut(self, redis_client: TGlideClient):
Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ async def transaction_test(
args.append(b"one")
transaction.srandmember_count(key7, 1)
args.append([b"one"])
transaction.wait(1, 1000)
args.append(0)
transaction.flushall(FlushMode.ASYNC)
args.append(OK)
transaction.flushall()
Expand Down

0 comments on commit e7cbabb

Please sign in to comment.