Skip to content

Commit

Permalink
Python: add XCLAIM command (valkey-io#1772)
Browse files Browse the repository at this point in the history
* Python: add XCLAIM command

* add CHANGELOG

* address comments
  • Loading branch information
jamesx-improving authored Jul 3, 2024
1 parent 9fab94d commit 81ef46f
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
* Python: Add ZSCAN and HSCAN commands ([#1732](https://github.com/aws/glide-for-redis/pull/1732))
* Python: Added FCALL_RO command ([#1721](https://github.com/aws/glide-for-redis/pull/1721))
* Python: Added WATCH and UNWATCH command ([#1736](https://github.com/aws/glide-for-redis/pull/1736))
* Python: Added XCLAIM command ([#1772](https://github.com/aws/glide-for-redis/pull/1772))
* Python: Added XINFO GROUPS and XINFO CONSUMERS commands ([#1753](https://github.com/aws/glide-for-redis/pull/1753))
* Python: Added LPOS command ([#1740](https://github.com/aws/glide-for-redis/pull/1740))
* Python: Added SCAN command ([#1623](https://github.com/aws/glide-for-redis/pull/1623))
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
MaxId,
MinId,
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
Expand Down Expand Up @@ -166,6 +167,7 @@
"MaxId",
"MinId",
"StreamAddOptions",
"StreamClaimOptions",
"StreamGroupOptions",
"StreamPendingOptions",
"StreamReadGroupOptions",
Expand Down
106 changes: 106 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
Expand Down Expand Up @@ -3188,6 +3189,111 @@ async def xpending_range(
await self._execute_command(RequestType.XPending, args),
)

async def xclaim(
self,
key: TEncodable,
group: TEncodable,
consumer: TEncodable,
min_idle_time_ms: int,
ids: List[TEncodable],
options: Optional[StreamClaimOptions] = None,
) -> Mapping[bytes, List[List[bytes]]]:
"""
Changes the ownership of a pending message.
See https://valkey.io/commands/xclaim for more details.
Args:
key (TEncodable): The key of the stream.
group (TEncodable): The consumer group name.
consumer (TEncodable): The group consumer.
min_idle_time_ms (int): The minimum idle time for the message to be claimed.
ids (List[TEncodable]): A array of entry ids.
options (Optional[StreamClaimOptions]): Stream claim options.
Returns:
A Mapping of message entries with the format
{"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
Examples:
# read messages from streamId for consumer1
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "consumer1")
{
b"mystream": {
b"1-0": [[b"field1", b"value1"]],
}
}
# "1-0" is now read, and we can assign the pending messages to consumer2
>>> await client.xclaim("mystream", "mygroup", "consumer2", 0, ["1-0"])
{b"1-0": [[b"field1", b"value1"]]}
"""

args = [key, group, consumer, str(min_idle_time_ms), *ids]

if options:
args.extend(options.to_args())

return cast(
Mapping[bytes, List[List[bytes]]],
await self._execute_command(RequestType.XClaim, args),
)

async def xclaim_just_id(
self,
key: TEncodable,
group: TEncodable,
consumer: TEncodable,
min_idle_time_ms: int,
ids: List[TEncodable],
options: Optional[StreamClaimOptions] = None,
) -> List[TEncodable]:
"""
Changes the ownership of a pending message. This function returns a List with
only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
See https://valkey.io/commands/xclaim for more details.
Args:
key (TEncodable): The key of the stream.
group (TEncodable): The consumer group name.
consumer (TEncodable): The group consumer.
min_idle_time_ms (int): The minimum idle time for the message to be claimed.
ids (List[TEncodable]): A array of entry ids.
options (Optional[StreamClaimOptions]): Stream claim options.
Returns:
A List of message ids claimed by the consumer.
Examples:
# read messages from streamId for consumer1
>>> await client.xreadgroup({"mystream": ">"}, "mygroup", "consumer1")
{
b"mystream": {
b"1-0": [[b"field1", b"value1"]],
}
}
# "1-0" is now read, and we can assign the pending messages to consumer2
>>> await client.xclaim_just_id("mystream", "mygroup", "consumer2", 0, ["1-0"])
["1-0"]
"""

args = [
key,
group,
consumer,
str(min_idle_time_ms),
*ids,
StreamClaimOptions.JUST_ID_REDIS_API,
]

if options:
args.extend(options.to_args())

return cast(
List[TEncodable],
await self._execute_command(RequestType.XClaim, args),
)

async def xautoclaim(
self,
key: TEncodable,
Expand Down
63 changes: 63 additions & 0 deletions python/python/glide/async_commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,69 @@ def __init__(
self.consumer_name = consumer_name


class StreamClaimOptions:
IDLE_REDIS_API = "IDLE"
TIME_REDIS_API = "TIME"
RETRY_COUNT_REDIS_API = "RETRYCOUNT"
FORCE_REDIS_API = "FORCE"
JUST_ID_REDIS_API = "JUSTID"

def __init__(
self,
idle: Optional[int] = None,
idle_unix_time: Optional[int] = None,
retry_count: Optional[int] = None,
is_force: Optional[bool] = False,
):
"""
Options for `XCLAIM`.
Args:
idle (Optional[int]): Set the idle time (last time it was delivered) of the message in milliseconds. If idle
is not specified, an idle of `0` is assumed, that is, the time count is reset because the message now has a
new owner trying to process it.
idle_unix_time (Optional[int]): This is the same as idle but instead of a relative amount of milliseconds,
it sets the idle time to a specific Unix time (in milliseconds). This is useful in order to rewrite the AOF
file generating `XCLAIM` commands.
retry_count (Optional[int]): Set the retry counter to the specified value. This counter is incremented every
time a message is delivered again. Normally `XCLAIM` does not alter this counter, which is just served to
clients when the `XPENDING` command is called: this way clients can detect anomalies, like messages that
are never processed for some reason after a big number of delivery attempts.
is_force (Optional[bool]): Creates the pending message entry in the PEL even if certain specified IDs are not
already in the PEL assigned to a different client. However, the message must exist in the stream, otherwise
the IDs of non-existing messages are ignored.
"""
self.idle = idle
self.idle_unix_time = idle_unix_time
self.retry_count = retry_count
self.is_force = is_force

def to_args(self) -> List[TEncodable]:
"""
Converts options for `XCLAIM` into a List.
Returns:
List[str]: The options as a list of arguments for the `XCLAIM` command.
"""
args: List[TEncodable] = []
if self.idle:
args.append(self.IDLE_REDIS_API)
args.append(str(self.idle))

if self.idle_unix_time:
args.append(self.TIME_REDIS_API)
args.append(str(self.idle_unix_time))

if self.retry_count:
args.append(self.RETRY_COUNT_REDIS_API)
args.append(str(self.retry_count))

if self.is_force:
args.append(self.FORCE_REDIS_API)

return args


def _create_xpending_range_args(
key: TEncodable,
group_name: TEncodable,
Expand Down
76 changes: 76 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
)
from glide.async_commands.stream import (
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamRangeBound,
Expand Down Expand Up @@ -4550,6 +4551,81 @@ def lpos(

return self.append_command(RequestType.LPos, args)

def xclaim(
self: TTransaction,
key: TEncodable,
group: TEncodable,
consumer: TEncodable,
min_idle_time_ms: int,
ids: List[TEncodable],
options: Optional[StreamClaimOptions] = None,
) -> TTransaction:
"""
Changes the ownership of a pending message.
See https://valkey.io/commands/xclaim for more details.
Args:
key (TEncodable): The key of the stream.
group (TEncodable): The consumer group name.
consumer (TEncodable): The group consumer.
min_idle_time_ms (int): The minimum idle time for the message to be claimed.
ids (List[TEncodable]): A array of entry ids.
options (Optional[StreamClaimOptions]): Stream claim options.
Returns:
A Mapping of message entries with the format
{"entryId": [["entry", "data"], ...], ...} that are claimed by the consumer.
"""

args = [key, group, consumer, str(min_idle_time_ms), *ids]

if options:
args.extend(options.to_args())

return self.append_command(RequestType.XClaim, args)

def xclaim_just_id(
self: TTransaction,
key: TEncodable,
group: TEncodable,
consumer: TEncodable,
min_idle_time_ms: int,
ids: List[TEncodable],
options: Optional[StreamClaimOptions] = None,
) -> TTransaction:
"""
Changes the ownership of a pending message. This function returns a List with
only the message/entry IDs, and is equivalent to using JUSTID in the Redis API.
See https://valkey.io/commands/xclaim for more details.
Args:
key (TEncodable): The key of the stream.
group (TEncodable): The consumer group name.
consumer (TEncodable): The group consumer.
min_idle_time_ms (int): The minimum idle time for the message to be claimed.
ids (List[TEncodable]): A array of entry ids.
options (Optional[StreamClaimOptions]): Stream claim options.
Returns:
A List of message ids claimed by the consumer.
"""

args = [
key,
group,
consumer,
str(min_idle_time_ms),
*ids,
StreamClaimOptions.JUST_ID_REDIS_API,
]

if options:
args.extend(options.to_args())

return self.append_command(RequestType.XClaim, args)


class Transaction(BaseTransaction):
"""
Expand Down
Loading

0 comments on commit 81ef46f

Please sign in to comment.