From e7cbabb0311b3b614e4a30f01313853a6d30ee06 Mon Sep 17 00:00:00 2001 From: tjzhang-BQ <111323543+tjzhang-BQ@users.noreply.github.com> Date: Sun, 30 Jun 2024 15:47:26 -0700 Subject: [PATCH] Python: command WAIT (#1710) * Python: command WAIT * Python: command WAIT * changelog * doc & test changes * linter * doc update * update example * addressing commends rd2 --------- Co-authored-by: TJ Zhang --- CHANGELOG.md | 1 + .../glide/async_commands/cluster_commands.py | 32 +++++++++++++++++++ .../async_commands/standalone_commands.py | 30 +++++++++++++++++ .../glide/async_commands/transaction.py | 22 +++++++++++++ python/python/tests/test_async_client.py | 21 ++++++++++++ python/python/tests/test_transaction.py | 2 ++ 6 files changed, 108 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 038ae34bf2..dc14238320 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -66,6 +66,7 @@ * Python: Added FUNCTION DELETE command ([#1714](https://github.com/aws/glide-for-redis/pull/1714)) * Python: Added SSCAN command ([#1709](https://github.com/aws/glide-for-redis/pull/1709)) * Python: Added LCS command ([#1716](https://github.com/aws/glide-for-redis/pull/1716)) +* Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index c441272d9f..29bc7ff137 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -742,3 +742,35 @@ async def random_key(self, route: Optional[Route] = None) -> Optional[str]: Optional[str], await self._execute_command(RequestType.RandomKey, [], route), ) + + async def wait( + self, + numreplicas: int, + timeout: int, + route: Optional[Route] = None, + ) -> int: + """ + Blocks the current client until all the previous write commands are successfully transferred + and acknowledged by at least `numreplicas` of replicas. If `timeout` is + reached, the command returns even if the specified number of replicas were not yet reached. + + See https://valkey.io/commands/wait for more details. + + Args: + numreplicas (int): The number of replicas to reach. + timeout (int): The timeout value specified in milliseconds. A value of 0 will block indefinitely. + route (Optional[Route]): The command will be routed to all primary nodes, unless `route` is provided, + in which case the client will route the command to the nodes defined by `route`. + Returns: + int: The number of replicas reached by all the writes performed in the context of the current connection. + + Examples: + >>> await client.set("key", "value"); + >>> await client.wait(1, 1000); + // return 1 when a replica is reached or 0 if 1000ms is reached. + """ + args = [str(numreplicas), str(timeout)] + return cast( + int, + await self._execute_command(RequestType.Wait, args, route), + ) diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 7c0cb3224d..859f4ae0fc 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -665,3 +665,33 @@ async def random_key(self) -> Optional[str]: Optional[str], await self._execute_command(RequestType.RandomKey, []), ) + + async def wait( + self, + numreplicas: int, + timeout: int, + ) -> int: + """ + Blocks the current client until all the previous write commands are successfully transferred + and acknowledged by at least `numreplicas` of replicas. If `timeout` is + reached, the command returns even if the specified number of replicas were not yet reached. + + See https://valkey.io/commands/wait for more details. + + Args: + numreplicas (int): The number of replicas to reach. + timeout (int): The timeout value specified in milliseconds. A value of 0 will block indefinitely. + + Returns: + int: The number of replicas reached by all the writes performed in the context of the current connection. + + Examples: + >>> await client.set("key", "value"); + >>> await client.wait(1, 1000); + // return 1 when a replica is reached or 0 if 1000ms is reached. + """ + args = [str(numreplicas), str(timeout)] + return cast( + int, + await self._execute_command(RequestType.Wait, args), + ) diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index fe9bd72178..495ae062d7 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -4053,6 +4053,28 @@ def lcs_idx( return self.append_command(RequestType.LCS, args) + def wait( + self: TTransaction, + numreplicas: int, + timeout: int, + ) -> TTransaction: + """ + Returns the number of replicas that acknowledged the write commands sent by the current client + before this command, both in the case where the specified number of replicas are reached, or + when the timeout is reached. + + See https://valkey.io/commands/wait for more details. + + Args: + numreplicas (int): The number of replicas to reach. + timeout (int): The timeout value specified in milliseconds. + + Command Response: + str: The number of replicas reached by all the writes performed in the context of the current connection. + """ + args = [str(numreplicas), str(timeout)] + return self.append_command(RequestType.Wait, args) + class Transaction(BaseTransaction): """ diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 8e3d48a3a0..c1f46a71bb 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -7261,6 +7261,27 @@ async def test_copy_database(self, redis_client: GlideClient): finally: assert await redis_client.select(0) == OK + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_wait(self, redis_client: TGlideClient): + key = f"{{key}}-1{get_random_string(5)}" + value = get_random_string(5) + value2 = get_random_string(5) + + assert await redis_client.set(key, value) == OK + if isinstance(redis_client, GlideClusterClient): + assert await redis_client.wait(1, 1000) >= 1 + else: + assert await redis_client.wait(1, 1000) >= 0 + + # ensure that command doesn't time out even if timeout > request timeout (250ms by default) + assert await redis_client.set(key, value2) == OK + assert await redis_client.wait(100, 500) >= 0 + + # command should fail on a negative timeout value + with pytest.raises(RequestError): + await redis_client.wait(1, -1) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_lolwut(self, redis_client: TGlideClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index e104233f6d..e685943de0 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -585,6 +585,8 @@ async def transaction_test( args.append(b"one") transaction.srandmember_count(key7, 1) args.append([b"one"]) + transaction.wait(1, 1000) + args.append(0) transaction.flushall(FlushMode.ASYNC) args.append(OK) transaction.flushall()