Skip to content

Commit

Permalink
Python: added XAUTOCLAIM command (valkey-io#1718)
Browse files Browse the repository at this point in the history
* Python: add XAUTOCLAIM command

* minor doc update

* Minor doc update

* PR suggestions

* Update test assertions with string conversions

* PR suggestions

* Add clarifying comments
  • Loading branch information
aaron-congo authored Jul 1, 2024
1 parent e7cbabb commit 6111494
Show file tree
Hide file tree
Showing 8 changed files with 715 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* Python: Added SSCAN command ([#1709](https://github.com/aws/glide-for-redis/pull/1709))
* Python: Added LCS command ([#1716](https://github.com/aws/glide-for-redis/pull/1716))
* Python: Added WAIT command ([#1710](https://github.com/aws/glide-for-redis/pull/1710))
* Python: Added XAUTOCLAIM command ([#1718](https://github.com/aws/glide-for-redis/pull/1718))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
224 changes: 224 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) enum ExpectedReturnType<'a> {
FunctionStatsReturnType,
GeoSearchReturnType,
SimpleString,
XAutoClaimReturnType,
}

pub(crate) fn convert_to_expected_type(
Expand Down Expand Up @@ -607,6 +608,71 @@ pub(crate) fn convert_to_expected_type(
}
_ => Err((ErrorKind::TypeError, "Response couldn't be converted").into()),
},
// Used by XAUTOCLAIM. The command returns a list of length 2 if the Redis version is less than 7.0.0 or a list
// of length 3 otherwise. It has the following response format:
/* Redis version < 7.0.0 example:
1) "0-0"
2) 1) 1) "1-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
2) 1) "1-1"
2) 1) "field3"
2) "value3"
3) (nil) // Entry IDs that were in the Pending Entry List but no longer in the stream get a nil value.
4) (nil) // These nil values will be dropped so that we can return a map value for the second response element.
Redis version >= 7.0.0 example:
1) "0-0"
2) 1) 1) "1-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
2) 1) "1-1"
2) 1) "field3"
2) "value3"
3) 1) "1-2" // Entry IDs that were in the Pending Entry List but no longer in the stream are listed in the
2) "1-3" // third response element, which is an array of these IDs.
*/
ExpectedReturnType::XAutoClaimReturnType => match value {
// Response will have 2 elements if Redis version < 7.0.0, and 3 elements otherwise.
Value::Array(mut array) if array.len() == 2 || array.len() == 3 => {
let mut result: Vec<Value> = Vec::with_capacity(array.len());
// The first element is always a stream ID as a string, so the clone is cheap.
result.push(array[0].clone());

let mut stale_entry_ids: Option<Value> = None;
if array.len() == 3 {
// We use array.remove to avoid having to clone the other element(s). If we removed the second
// element before the third, the third would have to be shifted, so we remove the third element
// first to improve performance.
stale_entry_ids = Some(array.remove(2));
}

// Only the element at index 1 needs conversion.
result.push(convert_to_expected_type(
array.remove(1),
Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
})
)?);

if let Some(value) = stale_entry_ids {
result.push(value);
}

Ok(Value::Array(result))
},
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to an XAUTOCLAIM response",
format!("(response was {:?})", get_value_type(&value)),
)
.into()),
},
}
}

Expand Down Expand Up @@ -762,6 +828,12 @@ fn convert_array_to_map_by_type(
convert_to_expected_type(inner_value, value_type)?,
));
}
Value::Nil => {
// Ignore nil key values - they will not be placed in the map. This is necessary for commands like
// XAUTOCLAIM, which can contain an array representation of a map with nil keys in place of stream IDs
// that existed in the Pending Entries List but no longer existed in the stream.
continue;
}
_ => {
let Some(value) = iterator.next() else {
return Err((
Expand Down Expand Up @@ -853,6 +925,14 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
key_type: &None,
value_type: &None,
}),
b"XAUTOCLAIM" => {
if cmd.position(b"JUSTID").is_some() {
// Value conversion is not needed if the JUSTID arg was passed.
None
} else {
Some(ExpectedReturnType::XAutoClaimReturnType)
}
}
b"XRANGE" | b"XREVRANGE" => Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
Expand Down Expand Up @@ -1236,6 +1316,150 @@ mod tests {
));
}

#[test]
fn convert_xautoclaim() {
// Value conversion is not needed if the JUSTID arg was passed.
assert!(expected_type_for_cmd(
redis::cmd("XAUTOCLAIM")
.arg("key")
.arg("group")
.arg("consumer")
.arg("0")
.arg("0-0")
.arg("JUSTID")
)
.is_none());

assert!(matches!(
expected_type_for_cmd(
redis::cmd("XAUTOCLAIM")
.arg("key")
.arg("group")
.arg("consumer")
.arg("0")
.arg("0-0")
),
Some(ExpectedReturnType::XAutoClaimReturnType)
));

let v6_response = Value::Array(vec![
Value::BulkString("0-0".to_string().into_bytes()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1-0".to_string().into_bytes()),
Value::Array(vec![
Value::BulkString("field1".to_string().into_bytes()),
Value::BulkString("value1".to_string().into_bytes()),
Value::BulkString("field2".to_string().into_bytes()),
Value::BulkString("value2".to_string().into_bytes()),
]),
]),
Value::Nil, // Entry IDs that were in the Pending Entry List but no longer in the stream get a nil value.
Value::Array(vec![
Value::BulkString("1-1".to_string().into_bytes()),
Value::Array(vec![
Value::BulkString("field3".to_string().into_bytes()),
Value::BulkString("value3".to_string().into_bytes()),
]),
]),
]),
]);

let expected_v6_response = Value::Array(vec![
Value::BulkString("0-0".to_string().into_bytes()),
Value::Map(vec![
(
Value::BulkString("1-0".to_string().into_bytes()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("field1".to_string().into_bytes()),
Value::BulkString("value1".to_string().into_bytes()),
]),
Value::Array(vec![
Value::BulkString("field2".to_string().into_bytes()),
Value::BulkString("value2".to_string().into_bytes()),
]),
]),
),
(
Value::BulkString("1-1".to_string().into_bytes()),
Value::Array(vec![Value::Array(vec![
Value::BulkString("field3".to_string().into_bytes()),
Value::BulkString("value3".to_string().into_bytes()),
])]),
),
]),
]);

assert_eq!(
convert_to_expected_type(
v6_response.clone(),
Some(ExpectedReturnType::XAutoClaimReturnType)
)
.unwrap(),
expected_v6_response.clone()
);

let v7_response = Value::Array(vec![
Value::BulkString("0-0".to_string().into_bytes()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("1-0".to_string().into_bytes()),
Value::Array(vec![
Value::BulkString("field1".to_string().into_bytes()),
Value::BulkString("value1".to_string().into_bytes()),
Value::BulkString("field2".to_string().into_bytes()),
Value::BulkString("value2".to_string().into_bytes()),
]),
]),
Value::Array(vec![
Value::BulkString("1-1".to_string().into_bytes()),
Value::Array(vec![
Value::BulkString("field3".to_string().into_bytes()),
Value::BulkString("value3".to_string().into_bytes()),
]),
]),
]),
Value::Array(vec![Value::BulkString("1-2".to_string().into_bytes())]),
]);

let expected_v7_response = Value::Array(vec![
Value::BulkString("0-0".to_string().into_bytes()),
Value::Map(vec![
(
Value::BulkString("1-0".to_string().into_bytes()),
Value::Array(vec![
Value::Array(vec![
Value::BulkString("field1".to_string().into_bytes()),
Value::BulkString("value1".to_string().into_bytes()),
]),
Value::Array(vec![
Value::BulkString("field2".to_string().into_bytes()),
Value::BulkString("value2".to_string().into_bytes()),
]),
]),
),
(
Value::BulkString("1-1".to_string().into_bytes()),
Value::Array(vec![Value::Array(vec![
Value::BulkString("field3".to_string().into_bytes()),
Value::BulkString("value3".to_string().into_bytes()),
])]),
),
]),
Value::Array(vec![Value::BulkString("1-2".to_string().into_bytes())]),
]);

assert_eq!(
convert_to_expected_type(
v7_response.clone(),
Some(ExpectedReturnType::XAutoClaimReturnType)
)
.unwrap(),
expected_v7_response.clone()
);
}

#[test]
fn test_convert_empty_array_to_map_is_nil() {
let mut cmd = redis::cmd("XREAD");
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ enum RequestType {
SScan = 200;
ZScan = 201;
HScan = 202;
XAutoClaim = 203;
Wait = 208;
}

Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub enum RequestType {
SScan = 200,
ZScan = 201,
HScan = 202,
XAutoClaim = 203,
Wait = 208,
}

Expand Down Expand Up @@ -426,6 +427,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::SScan => RequestType::SScan,
ProtobufRequestType::ZScan => RequestType::ZScan,
ProtobufRequestType::HScan => RequestType::HScan,
ProtobufRequestType::XAutoClaim => RequestType::XAutoClaim,
ProtobufRequestType::Wait => RequestType::Wait,
}
}
Expand Down Expand Up @@ -639,6 +641,7 @@ impl RequestType {
RequestType::SScan => Some(cmd("SSCAN")),
RequestType::ZScan => Some(cmd("ZSCAN")),
RequestType::HScan => Some(cmd("HSCAN")),
RequestType::XAutoClaim => Some(cmd("XAUTOCLAIM")),
RequestType::Wait => Some(cmd("WAIT")),
}
}
Expand Down
Loading

0 comments on commit 6111494

Please sign in to comment.