From 6111494eed2f5252003f75391de052af47a8854a Mon Sep 17 00:00:00 2001 From: Aaron <69273634+aaron-congo@users.noreply.github.com> Date: Sun, 30 Jun 2024 17:11:34 -0700 Subject: [PATCH] Python: added XAUTOCLAIM command (#1718) * Python: add XAUTOCLAIM command * minor doc update * Minor doc update * PR suggestions * Update test assertions with string conversions * PR suggestions * Add clarifying comments --- CHANGELOG.md | 1 + glide-core/src/client/value_conversion.rs | 224 ++++++++++++++++ glide-core/src/protobuf/redis_request.proto | 1 + glide-core/src/request_type.rs | 3 + python/python/glide/async_commands/core.py | 138 ++++++++++ .../glide/async_commands/transaction.py | 87 +++++++ python/python/tests/test_async_client.py | 239 ++++++++++++++++++ python/python/tests/test_transaction.py | 22 ++ 8 files changed, 715 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc14238320..a1e677c31f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ * Python: Added SSCAN command ([#1709](https://github.com/aws/glide-for-redis/pull/1709)) * Python: Added LCS command ([#1716](https://github.com/aws/glide-for-redis/pull/1716)) * Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710)) +* Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index dff852e839..b2ad3f1560 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -34,6 +34,7 @@ pub(crate) enum ExpectedReturnType<'a> { FunctionStatsReturnType, GeoSearchReturnType, SimpleString, + XAutoClaimReturnType, } pub(crate) fn convert_to_expected_type( @@ -607,6 +608,71 @@ pub(crate) fn convert_to_expected_type( } _ => Err((ErrorKind::TypeError, "Response couldn't be converted").into()), }, + // Used by XAUTOCLAIM. The command returns a list of length 2 if the Redis version is less than 7.0.0 or a list + // of length 3 otherwise. It has the following response format: + /* Redis version < 7.0.0 example: + 1) "0-0" + 2) 1) 1) "1-0" + 2) 1) "field1" + 2) "value1" + 3) "field2" + 4) "value2" + 2) 1) "1-1" + 2) 1) "field3" + 2) "value3" + 3) (nil) // Entry IDs that were in the Pending Entry List but no longer in the stream get a nil value. + 4) (nil) // These nil values will be dropped so that we can return a map value for the second response element. + + Redis version >= 7.0.0 example: + 1) "0-0" + 2) 1) 1) "1-0" + 2) 1) "field1" + 2) "value1" + 3) "field2" + 4) "value2" + 2) 1) "1-1" + 2) 1) "field3" + 2) "value3" + 3) 1) "1-2" // Entry IDs that were in the Pending Entry List but no longer in the stream are listed in the + 2) "1-3" // third response element, which is an array of these IDs. + */ + ExpectedReturnType::XAutoClaimReturnType => match value { + // Response will have 2 elements if Redis version < 7.0.0, and 3 elements otherwise. + Value::Array(mut array) if array.len() == 2 || array.len() == 3 => { + let mut result: Vec = Vec::with_capacity(array.len()); + // The first element is always a stream ID as a string, so the clone is cheap. + result.push(array[0].clone()); + + let mut stale_entry_ids: Option = None; + if array.len() == 3 { + // We use array.remove to avoid having to clone the other element(s). If we removed the second + // element before the third, the third would have to be shifted, so we remove the third element + // first to improve performance. + stale_entry_ids = Some(array.remove(2)); + } + + // Only the element at index 1 needs conversion. + result.push(convert_to_expected_type( + array.remove(1), + Some(ExpectedReturnType::Map { + key_type: &Some(ExpectedReturnType::BulkString), + value_type: &Some(ExpectedReturnType::ArrayOfPairs), + }) + )?); + + if let Some(value) = stale_entry_ids { + result.push(value); + } + + Ok(Value::Array(result)) + }, + _ => Err(( + ErrorKind::TypeError, + "Response couldn't be converted to an XAUTOCLAIM response", + format!("(response was {:?})", get_value_type(&value)), + ) + .into()), + }, } } @@ -762,6 +828,12 @@ fn convert_array_to_map_by_type( convert_to_expected_type(inner_value, value_type)?, )); } + Value::Nil => { + // Ignore nil key values - they will not be placed in the map. This is necessary for commands like + // XAUTOCLAIM, which can contain an array representation of a map with nil keys in place of stream IDs + // that existed in the Pending Entries List but no longer existed in the stream. + continue; + } _ => { let Some(value) = iterator.next() else { return Err(( @@ -853,6 +925,14 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { key_type: &None, value_type: &None, }), + b"XAUTOCLAIM" => { + if cmd.position(b"JUSTID").is_some() { + // Value conversion is not needed if the JUSTID arg was passed. + None + } else { + Some(ExpectedReturnType::XAutoClaimReturnType) + } + } b"XRANGE" | b"XREVRANGE" => Some(ExpectedReturnType::Map { key_type: &Some(ExpectedReturnType::BulkString), value_type: &Some(ExpectedReturnType::ArrayOfPairs), @@ -1236,6 +1316,150 @@ mod tests { )); } + #[test] + fn convert_xautoclaim() { + // Value conversion is not needed if the JUSTID arg was passed. + assert!(expected_type_for_cmd( + redis::cmd("XAUTOCLAIM") + .arg("key") + .arg("group") + .arg("consumer") + .arg("0") + .arg("0-0") + .arg("JUSTID") + ) + .is_none()); + + assert!(matches!( + expected_type_for_cmd( + redis::cmd("XAUTOCLAIM") + .arg("key") + .arg("group") + .arg("consumer") + .arg("0") + .arg("0-0") + ), + Some(ExpectedReturnType::XAutoClaimReturnType) + )); + + let v6_response = Value::Array(vec![ + Value::BulkString("0-0".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("field1".to_string().into_bytes()), + Value::BulkString("value1".to_string().into_bytes()), + Value::BulkString("field2".to_string().into_bytes()), + Value::BulkString("value2".to_string().into_bytes()), + ]), + ]), + Value::Nil, // Entry IDs that were in the Pending Entry List but no longer in the stream get a nil value. + Value::Array(vec![ + Value::BulkString("1-1".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("field3".to_string().into_bytes()), + Value::BulkString("value3".to_string().into_bytes()), + ]), + ]), + ]), + ]); + + let expected_v6_response = Value::Array(vec![ + Value::BulkString("0-0".to_string().into_bytes()), + Value::Map(vec![ + ( + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("field1".to_string().into_bytes()), + Value::BulkString("value1".to_string().into_bytes()), + ]), + Value::Array(vec![ + Value::BulkString("field2".to_string().into_bytes()), + Value::BulkString("value2".to_string().into_bytes()), + ]), + ]), + ), + ( + Value::BulkString("1-1".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("field3".to_string().into_bytes()), + Value::BulkString("value3".to_string().into_bytes()), + ])]), + ), + ]), + ]); + + assert_eq!( + convert_to_expected_type( + v6_response.clone(), + Some(ExpectedReturnType::XAutoClaimReturnType) + ) + .unwrap(), + expected_v6_response.clone() + ); + + let v7_response = Value::Array(vec![ + Value::BulkString("0-0".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("field1".to_string().into_bytes()), + Value::BulkString("value1".to_string().into_bytes()), + Value::BulkString("field2".to_string().into_bytes()), + Value::BulkString("value2".to_string().into_bytes()), + ]), + ]), + Value::Array(vec![ + Value::BulkString("1-1".to_string().into_bytes()), + Value::Array(vec![ + Value::BulkString("field3".to_string().into_bytes()), + Value::BulkString("value3".to_string().into_bytes()), + ]), + ]), + ]), + Value::Array(vec![Value::BulkString("1-2".to_string().into_bytes())]), + ]); + + let expected_v7_response = Value::Array(vec![ + Value::BulkString("0-0".to_string().into_bytes()), + Value::Map(vec![ + ( + Value::BulkString("1-0".to_string().into_bytes()), + Value::Array(vec![ + Value::Array(vec![ + Value::BulkString("field1".to_string().into_bytes()), + Value::BulkString("value1".to_string().into_bytes()), + ]), + Value::Array(vec![ + Value::BulkString("field2".to_string().into_bytes()), + Value::BulkString("value2".to_string().into_bytes()), + ]), + ]), + ), + ( + Value::BulkString("1-1".to_string().into_bytes()), + Value::Array(vec![Value::Array(vec![ + Value::BulkString("field3".to_string().into_bytes()), + Value::BulkString("value3".to_string().into_bytes()), + ])]), + ), + ]), + Value::Array(vec![Value::BulkString("1-2".to_string().into_bytes())]), + ]); + + assert_eq!( + convert_to_expected_type( + v7_response.clone(), + Some(ExpectedReturnType::XAutoClaimReturnType) + ) + .unwrap(), + expected_v7_response.clone() + ); + } + #[test] fn test_convert_empty_array_to_map_is_nil() { let mut cmd = redis::cmd("XREAD"); diff --git a/glide-core/src/protobuf/redis_request.proto b/glide-core/src/protobuf/redis_request.proto index dc1f48735e..63199878d9 100644 --- a/glide-core/src/protobuf/redis_request.proto +++ b/glide-core/src/protobuf/redis_request.proto @@ -241,6 +241,7 @@ enum RequestType { SScan = 200; ZScan = 201; HScan = 202; + XAutoClaim = 203; Wait = 208; } diff --git a/glide-core/src/request_type.rs b/glide-core/src/request_type.rs index ef4a7f1f80..81e9332ea1 100644 --- a/glide-core/src/request_type.rs +++ b/glide-core/src/request_type.rs @@ -211,6 +211,7 @@ pub enum RequestType { SScan = 200, ZScan = 201, HScan = 202, + XAutoClaim = 203, Wait = 208, } @@ -426,6 +427,7 @@ impl From<::protobuf::EnumOrUnknown> for RequestType { ProtobufRequestType::SScan => RequestType::SScan, ProtobufRequestType::ZScan => RequestType::ZScan, ProtobufRequestType::HScan => RequestType::HScan, + ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim, ProtobufRequestType::Wait => RequestType::Wait, } } @@ -639,6 +641,7 @@ impl RequestType { RequestType::SScan => Some(cmd("SSCAN")), RequestType::ZScan => Some(cmd("ZSCAN")), RequestType::HScan => Some(cmd("HSCAN")), + RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")), RequestType::Wait => Some(cmd("WAIT")), } } diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 168c790117..2f4c20b20a 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -3128,6 +3128,144 @@ async def xpending_range( await self._execute_command(RequestType.XPending, args), ) + async def xautoclaim( + self, + key: str, + group_name: str, + consumer_name: str, + min_idle_time_ms: int, + start: str, + count: Optional[int] = None, + ) -> List[Union[str, Mapping[str, List[List[str]]], List[str]]]: + """ + Transfers ownership of pending stream entries that match the specified criteria. + + See https://valkey.io/commands/xautoclaim for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer name. + min_idle_time_ms (int): Filters the claimed entries to those that have been idle for more than the specified + value. + start (str): Filters the claimed entries to those that have an ID equal or greater than the specified value. + count (Optional[int]): Limits the number of claimed entries to the specified value. + + Returns: + List[Union[str, Mapping[str, List[List[str]]], List[str]]]: A list containing the following elements: + - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is equivalent + to the next ID in the stream after the entries that were scanned, or "0-0" if the entire stream was + scanned. + - A mapping of the claimed entries, with the keys being the claimed entry IDs and the values being a + 2D list of the field-value pairs in the format `[[field1, value1], [field2, value2], ...]`. + - If you are using Redis 7.0.0 or above, the response list will also include a list containing the + message IDs that were in the Pending Entries List but no longer exist in the stream. These IDs are + deleted from the Pending Entries List. + + Examples: + # Redis version < 7.0.0: + >>> await client.xautoclaim("my_stream", "my_group", "my_consumer", 3_600_000, "0-0") + [ + "0-0", + { + "1-1": [ + ["field1", "value1"], + ["field2", "value2"], + ] + } + ] + # Stream entry "1-1" was idle for over an hour and was thus claimed by "my_consumer". The entire stream + # was scanned. + + # Redis version 7.0.0 and above: + >>> await client.xautoclaim("my_stream", "my_group", "my_consumer", 3_600_000, "0-0") + [ + "0-0", + { + "1-1": [ + ["field1", "value1"], + ["field2", "value2"], + ] + }, + ["1-2"] + ] + # Stream entry "1-1" was idle for over an hour and was thus claimed by "my_consumer". The entire stream + # was scanned. Additionally, entry "1-2" was removed from the Pending Entries List because it no longer + # exists in the stream. + + Since: Redis version 6.2.0. + """ + args = [key, group_name, consumer_name, str(min_idle_time_ms), start] + if count is not None: + args.extend(["COUNT", str(count)]) + + return cast( + List[Union[str, Mapping[str, List[List[str]]], List[str]]], + await self._execute_command(RequestType.XAutoClaim, args), + ) + + async def xautoclaim_just_id( + self, + key: str, + group_name: str, + consumer_name: str, + min_idle_time_ms: int, + start: str, + count: Optional[int] = None, + ) -> List[Union[str, List[str]]]: + """ + Transfers ownership of pending stream entries that match the specified criteria. This command uses the JUSTID + argument to further specify that the return value should contain a list of claimed IDs without their + field-value info. + + See https://valkey.io/commands/xautoclaim for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer name. + min_idle_time_ms (int): Filters the claimed entries to those that have been idle for more than the specified + value. + start (str): Filters the claimed entries to those that have an ID equal or greater than the specified value. + count (Optional[int]): Limits the number of claimed entries to the specified value. + + Returns: + List[Union[str, List[str]]]: A list containing the following elements: + - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is equivalent + to the next ID in the stream after the entries that were scanned, or "0-0" if the entire stream was + scanned. + - A list of the IDs for the claimed entries. + - If you are using Redis 7.0.0 or above, the response list will also include a list containing the + message IDs that were in the Pending Entries List but no longer exist in the stream. These IDs are + deleted from the Pending Entries List. + + Examples: + # Redis version < 7.0.0: + >>> await client.xautoclaim_just_id("my_stream", "my_group", "my_consumer", 3_600_000, "0-0") + ["0-0", ["1-1"]] + # Stream entry "1-1" was idle for over an hour and was thus claimed by "my_consumer". The entire stream + # was scanned. + + # Redis version 7.0.0 and above: + >>> await client.xautoclaim_just_id("my_stream", "my_group", "my_consumer", 3_600_000, "0-0") + ["0-0", ["1-1"], ["1-2"]] + # Stream entry "1-1" was idle for over an hour and was thus claimed by "my_consumer". The entire stream + # was scanned. Additionally, entry "1-2" was removed from the Pending Entries List because it no longer + # exists in the stream. + + Since: Redis version 6.2.0. + """ + args = [key, group_name, consumer_name, str(min_idle_time_ms), start] + if count is not None: + args.extend(["COUNT", str(count)]) + + args.append("JUSTID") + + return cast( + List[Union[str, List[str]]], + await self._execute_command(RequestType.XAutoClaim, args), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 495ae062d7..aee0ff58e4 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2260,6 +2260,93 @@ def xpending_range( args = _create_xpending_range_args(key, group_name, start, end, count, options) return self.append_command(RequestType.XPending, args) + def xautoclaim( + self: TTransaction, + key: str, + group_name: str, + consumer_name: str, + min_idle_time_ms: int, + start: str, + count: Optional[int] = None, + ) -> TTransaction: + """ + Transfers ownership of pending stream entries that match the specified criteria. + + See https://valkey.io/commands/xautoclaim for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer name. + min_idle_time_ms (int): Filters the claimed entries to those that have been idle for more than the specified + value. + start (str): Filters the claimed entries to those that have an ID equal or greater than the specified value. + count (Optional[int]): Limits the number of claimed entries to the specified value. + + Command response: + List[Union[str, Mapping[str, List[List[str]]], List[str]]]: A list containing the following elements: + - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is equivalent + to the next ID in the stream after the entries that were scanned, or "0-0" if the entire stream was + scanned. + - A mapping of the claimed entries, with the keys being the claimed entry IDs and the values being a + 2D list of the field-value pairs in the format `[[field1, value1], [field2, value2], ...]`. + - If you are using Redis 7.0.0 or above, the response list will also include a list containing the + message IDs that were in the Pending Entries List but no longer exist in the stream. These IDs are + deleted from the Pending Entries List. + + Since: Redis version 6.2.0. + """ + args = [key, group_name, consumer_name, str(min_idle_time_ms), start] + if count is not None: + args.extend(["COUNT", str(count)]) + + return self.append_command(RequestType.XAutoClaim, args) + + def xautoclaim_just_id( + self: TTransaction, + key: str, + group_name: str, + consumer_name: str, + min_idle_time_ms: int, + start: str, + count: Optional[int] = None, + ) -> TTransaction: + """ + Transfers ownership of pending stream entries that match the specified criteria. This command uses the JUSTID + argument to further specify that the return value should contain a list of claimed IDs without their + field-value info. + + See https://valkey.io/commands/xautoclaim for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer name. + min_idle_time_ms (int): Filters the claimed entries to those that have been idle for more than the specified + value. + start (str): Filters the claimed entries to those that have an ID equal or greater than the specified value. + count (Optional[int]): Limits the number of claimed entries to the specified value. + + Command response: + List[Union[str, List[str]]]: A list containing the following elements: + - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is equivalent + to the next ID in the stream after the entries that were scanned, or "0-0" if the entire stream was + scanned. + - A list of the IDs for the claimed entries. + - If you are using Redis 7.0.0 or above, the response list will also include a list containing the + message IDs that were in the Pending Entries List but no longer exist in the stream. These IDs are + deleted from the Pending Entries List. + + Since: Redis version 6.2.0. + """ + args = [key, group_name, consumer_name, str(min_idle_time_ms), start] + if count is not None: + args.extend(["COUNT", str(count)]) + + args.append("JUSTID") + + return self.append_command(RequestType.XAutoClaim, args) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index c1f46a71bb..0524ffcd3c 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -6038,6 +6038,245 @@ async def test_xpending_edge_cases_and_failures(self, redis_client: TGlideClient string_key, group_name, MinId(), MaxId(), 10 ) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xautoclaim(self, redis_client: TGlideClient, protocol): + min_version = "6.2.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + if await check_if_server_version_lt(redis_client, "7.0.0"): + version7_or_above = False + else: + version7_or_above = True + + key = get_random_string(10) + group_name = get_random_string(10) + consumer = get_random_string(10) + stream_id0_0 = "0-0" + stream_id1_0 = "1-0" + stream_id1_1 = "1-1" + stream_id1_2 = "1-2" + stream_id1_3 = "1-3" + + # setup: add stream entries, create consumer group, add entries to Pending Entries List for group + assert ( + await redis_client.xadd( + key, [("f1", "v1"), ("f2", "v2")], StreamAddOptions(stream_id1_0) + ) + == stream_id1_0.encode() + ) + assert ( + await redis_client.xadd( + key, [("f1_1", "v1_1")], StreamAddOptions(stream_id1_1) + ) + == stream_id1_1.encode() + ) + assert ( + await redis_client.xadd( + key, [("f1_2", "v1_2")], StreamAddOptions(stream_id1_2) + ) + == stream_id1_2.encode() + ) + assert ( + await redis_client.xadd( + key, [("f1_3", "v1_3")], StreamAddOptions(stream_id1_3) + ) + == stream_id1_3.encode() + ) + assert await redis_client.xgroup_create(key, group_name, stream_id0_0) == OK + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer) == { + key.encode(): { + stream_id1_0.encode(): [[b"f1", b"v1"], [b"f2", b"v2"]], + stream_id1_1.encode(): [[b"f1_1", b"v1_1"]], + stream_id1_2.encode(): [[b"f1_2", b"v1_2"]], + stream_id1_3.encode(): [[b"f1_3", b"v1_3"]], + } + } + + # autoclaim the first entry only + result = await redis_client.xautoclaim( + key, group_name, consumer, 0, stream_id0_0, count=1 + ) + assert result[0] == stream_id1_1.encode() + assert result[1] == {stream_id1_0.encode(): [[b"f1", b"v1"], [b"f2", b"v2"]]} + # if using Redis 7.0.0 or above, responses also include a list of entry IDs that were removed from the Pending + # Entries List because they no longer exist in the stream + if version7_or_above: + assert result[2] == [] + + # delete entry 1-2 + assert await redis_client.xdel(key, [stream_id1_2]) + + # autoclaim the rest of the entries + result = await redis_client.xautoclaim( + key, group_name, consumer, 0, stream_id1_1 + ) + assert ( + result[0] == stream_id0_0.encode() + ) # "0-0" is returned to indicate the entire stream was scanned. + assert result[1] == { + stream_id1_1.encode(): [[b"f1_1", b"v1_1"]], + stream_id1_3.encode(): [[b"f1_3", b"v1_3"]], + } + if version7_or_above: + assert result[2] == [stream_id1_2.encode()] + + # autoclaim with JUSTID: result at index 1 does not contain fields/values of the claimed entries, only IDs + just_id_result = await redis_client.xautoclaim_just_id( + key, group_name, consumer, 0, stream_id0_0 + ) + assert just_id_result[0] == stream_id0_0.encode() + if version7_or_above: + assert just_id_result[1] == [ + stream_id1_0.encode(), + stream_id1_1.encode(), + stream_id1_3.encode(), + ] + assert just_id_result[2] == [] + else: + # in Redis < 7.0.0, specifically for XAUTOCLAIM with JUSTID, entry IDs that were in the Pending Entries List + # but are no longer in the stream still show up in the response + assert just_id_result[1] == [ + stream_id1_0.encode(), + stream_id1_1.encode(), + stream_id1_2.encode(), + stream_id1_3.encode(), + ] + + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xautoclaim_edge_cases_and_failures( + self, redis_client: TGlideClient, protocol + ): + min_version = "6.2.0" + if await check_if_server_version_lt(redis_client, min_version): + return pytest.mark.skip(reason=f"Redis version required >= {min_version}") + + if await check_if_server_version_lt(redis_client, "7.0.0"): + version7_or_above = False + else: + version7_or_above = True + + key = get_random_string(10) + string_key = get_random_string(10) + non_existing_key = get_random_string(10) + group_name = get_random_string(10) + consumer = get_random_string(10) + stream_id0_0 = "0-0" + stream_id1_0 = "1-0" + + # setup: add entry, create consumer group, add entry to Pending Entries List for group + assert ( + await redis_client.xadd(key, [("f1", "v1")], StreamAddOptions(stream_id1_0)) + == stream_id1_0.encode() + ) + assert await redis_client.xgroup_create(key, group_name, stream_id0_0) == OK + assert await redis_client.xreadgroup({key: ">"}, group_name, consumer) == { + key.encode(): {stream_id1_0.encode(): [[b"f1", b"v1"]]} + } + + # passing a non-existing key is not allowed and will raise an error + with pytest.raises(RequestError): + await redis_client.xautoclaim( + non_existing_key, group_name, consumer, 0, stream_id0_0 + ) + with pytest.raises(RequestError): + await redis_client.xautoclaim_just_id( + non_existing_key, group_name, consumer, 0, stream_id0_0 + ) + + # passing a non-existing group is not allowed and will raise an error + with pytest.raises(RequestError): + await redis_client.xautoclaim( + key, "non_existing_group", consumer, 0, stream_id0_0 + ) + with pytest.raises(RequestError): + await redis_client.xautoclaim_just_id( + key, "non_existing_group", consumer, 0, stream_id0_0 + ) + + # non-existing consumers are created automatically + result = await redis_client.xautoclaim( + key, group_name, "non_existing_consumer", 0, stream_id0_0 + ) + assert result[0] == stream_id0_0.encode() + assert result[1] == {stream_id1_0.encode(): [[b"f1", b"v1"]]} + # if using Redis 7.0.0 or above, responses also include a list of entry IDs that were removed from the Pending + # Entries List because they no longer exist in the stream + if version7_or_above: + assert result[2] == [] + + just_id_result = await redis_client.xautoclaim_just_id( + key, group_name, "non_existing_consumer", 0, stream_id0_0 + ) + assert just_id_result[0] == stream_id0_0.encode() + assert just_id_result[1] == [stream_id1_0.encode()] + if version7_or_above: + assert just_id_result[2] == [] + + # negative min_idle_time_ms values are allowed + result = await redis_client.xautoclaim( + key, group_name, consumer, -1, stream_id0_0 + ) + assert result[0] == stream_id0_0.encode() + assert result[1] == {stream_id1_0.encode(): [[b"f1", b"v1"]]} + if version7_or_above: + assert result[2] == [] + + just_id_result = await redis_client.xautoclaim_just_id( + key, group_name, consumer, -1, stream_id0_0 + ) + assert just_id_result[0] == stream_id0_0.encode() + assert just_id_result[1] == [stream_id1_0.encode()] + if version7_or_above: + assert just_id_result[2] == [] + + with pytest.raises(RequestError): + await redis_client.xautoclaim( + key, group_name, consumer, 0, "invalid_stream_id" + ) + with pytest.raises(RequestError): + await redis_client.xautoclaim_just_id( + key, group_name, consumer, 0, "invalid_stream_id" + ) + + # no stream entries to claim above the given start value + result = await redis_client.xautoclaim(key, group_name, consumer, 0, "99-99") + assert result[0] == stream_id0_0.encode() + assert result[1] == {} + if version7_or_above: + assert result[2] == [] + + just_id_result = await redis_client.xautoclaim_just_id( + key, group_name, consumer, 0, "99-99" + ) + assert just_id_result[0] == stream_id0_0.encode() + assert just_id_result[1] == [] + if version7_or_above: + assert just_id_result[2] == [] + + # invalid arg - count must be positive + with pytest.raises(RequestError): + await redis_client.xautoclaim( + key, group_name, consumer, 0, stream_id0_0, count=0 + ) + with pytest.raises(RequestError): + await redis_client.xautoclaim_just_id( + key, group_name, consumer, 0, stream_id0_0, count=0 + ) + + # key exists, but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xautoclaim( + string_key, group_name, consumer, 0, stream_id0_0 + ) + with pytest.raises(RequestError): + await redis_client.xautoclaim_just_id( + string_key, group_name, consumer, 0, stream_id0_0 + ) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_xgroup_set_id( diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index e685943de0..121ade1a5a 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -546,6 +546,28 @@ async def transaction_test( args.append({key11.encode(): {b"0-2": [[b"foo", b"bar"]]}}) transaction.xpending(key11, group_name1) args.append([1, b"0-2", b"0-2", [[consumer.encode(), b"1"]]]) + + min_version = "6.2.0" + if not await check_if_server_version_lt(redis_client, min_version): + transaction.xautoclaim(key11, group_name1, consumer, 0, "0-0") + transaction.xautoclaim_just_id(key11, group_name1, consumer, 0, "0-0") + # if using Redis 7.0.0 or above, responses also include a list of entry IDs that were removed from the Pending + # Entries List because they no longer exist in the stream + if await check_if_server_version_lt(redis_client, "7.0.0"): + args.append( + [b"0-0", {b"0-2": [[b"foo", b"bar"]]}] + ) # transaction.xautoclaim(key11, group_name1, consumer, 0, "0-0") + args.append( + [b"0-0", [b"0-2"]] + ) # transaction.xautoclaim_just_id(key11, group_name1, consumer, 0, "0-0") + else: + args.append( + [b"0-0", {b"0-2": [[b"foo", b"bar"]]}, []] + ) # transaction.xautoclaim(key11, group_name1, consumer, 0, "0-0") + args.append( + [b"0-0", [b"0-2"], []] + ) # transaction.xautoclaim_just_id(key11, group_name1, consumer, 0, "0-0") + transaction.xack(key11, group_name1, ["0-2"]) args.append(1) transaction.xpending_range(key11, group_name1, MinId(), MaxId(), 1)