Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to cancel AsyncIO gRPC stream requests #417

Merged
merged 5 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,26 @@ sent via this stream.
See more details about these APIs in
[grpc/\_client.py](src/python/library/tritonclient/grpc/_client.py).

For gRPC AsyncIO requests, an AsyncIO task wrapping an `infer()` coroutine can
be safely cancelled.

```python
infer_task = asyncio.create_task(aio_client.infer())
kthui marked this conversation as resolved.
Show resolved Hide resolved
await something_else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this await something_else necessary/useful to the example?

Copy link
Contributor Author

@kthui kthui Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to demonstrate cancellation, so removed for simplicity.

A side note: we are assuming our users should know AsyncIO is single threaded. Without await between creating the task and cancelling the task, there is no context switching, so the task is cancelled without given a chance to run. Thus, the task should not have been created in the first place, in the real world.

infer_task.cancel()
```

For gRPC AsyncIO streaming requests, `cancel()` can be called on the
asynchronous iterator returned by `stream_infer()` API.

```python
responses_iterator = aio_client.stream_infer()
kthui marked this conversation as resolved.
Show resolved Hide resolved
responses_iterator.cancel()
```

See more details about these APIs in
tanmayv25 marked this conversation as resolved.
Show resolved Hide resolved
[grpc/\aio/\__init__.py](src/python/library/tritonclient/grpc/aio/__init__.py).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rendered weirdly, might not need backslashes, please double check it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Updated.


See [request_cancellation](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/request_cancellation.md)
in the server user-guide to learn about how this is handled on the
server side.
Expand Down
44 changes: 32 additions & 12 deletions src/python/library/tritonclient/grpc/aio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ async def infer(
except grpc.RpcError as rpc_error:
raise_error_grpc(rpc_error)

async def stream_infer(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be an "asynchronous generator function" which the user calls like:

async for response in stream_infer(...):
    ...

Now, it is a normal function that returns an "asynchronous iterator", which the user can call it in the exact same way above.

def stream_infer(
self,
inputs_iterator,
stream_timeout=None,
Expand All @@ -636,7 +636,7 @@ async def stream_infer(

Parameters
----------
inputs_iterator : async_generator
inputs_iterator : asynchronous iterator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is always "asynchronous iterator" not "asynchronous generator function", because inputs_iterator is used directly without inputs_iterator().

Async iterator that yields a dict(s) consists of the input
parameters to the async_stream_infer function defined in
tritonclient.grpc.InferenceServerClient.
Expand All @@ -653,9 +653,15 @@ async def stream_infer(

Returns
-------
async_generator
asynchronous iterator
Yield tuple holding (InferResult, InferenceServerException) objects.

This object can be used to cancel the inference request like below:
----------
it = stream_infer(...)
ret = it.cancel()
----------

Raises
------
InferenceServerException
Expand Down Expand Up @@ -708,21 +714,35 @@ async def _request_iterator(inputs_iterator):
parameters=inputs["parameters"],
)

try:
response_iterator = self._client_stub.ModelStreamInfer(
_request_iterator(inputs_iterator),
metadata=metadata,
timeout=stream_timeout,
compression=_grpc_compression_type(compression_algorithm),
)
async for response in response_iterator:
class _ResponseIterator:
def __init__(self, grpc_call, verbose):
self._grpc_call = grpc_call
self._verbose = verbose

def __aiter__(self):
return self

async def __anext__(self):
response = await self._grpc_call.__aiter__().__anext__()
tanmayv25 marked this conversation as resolved.
Show resolved Hide resolved
if self._verbose:
print(response)
result = error = None
if response.error_message != "":
error = InferenceServerException(msg=response.error_message)
else:
result = InferResult(response.infer_response)
yield (result, error)
return result, error

def cancel(self):
return self._grpc_call.cancel()

try:
grpc_call = self._client_stub.ModelStreamInfer(
_request_iterator(inputs_iterator),
metadata=metadata,
timeout=stream_timeout,
compression=_grpc_compression_type(compression_algorithm),
)
return _ResponseIterator(grpc_call, self._verbose)
except grpc.RpcError as rpc_error:
raise_error_grpc(rpc_error)
Loading