Skip to content

Commit

Permalink
Merge pull request grpc#21681 from lidizheng/aio-callbacks
Browse files Browse the repository at this point in the history
[Aio] Implement add_done_callback and time_remaining
  • Loading branch information
lidizheng authored Jan 17, 2020
2 parents e6601c8 + c1eab2e commit b9083a9
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 312 deletions.
2 changes: 2 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ cdef class _AioCall(GrpcCallWrapper):
# because Core is holding a pointer for the callback handler.
bint _is_locally_cancelled

object _deadline

cdef void _create_grpc_call(self, object timeout, bytes method, CallCredentials credentials) except *
7 changes: 7 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ cdef class _AioCall(GrpcCallWrapper):
self._loop = asyncio.get_event_loop()
self._create_grpc_call(deadline, method, call_credentials)
self._is_locally_cancelled = False
self._deadline = deadline

def __dealloc__(self):
if self.call:
Expand Down Expand Up @@ -84,6 +85,12 @@ cdef class _AioCall(GrpcCallWrapper):

grpc_slice_unref(method_slice)

def time_remaining(self):
if self._deadline is None:
return None
else:
return max(0, self._deadline - time.time())

def cancel(self, AioRpcStatus status):
"""Cancels the RPC in Core with given RPC status.
Expand Down
10 changes: 5 additions & 5 deletions src/python/grpcio/grpc/experimental/aio/_base_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
"""

from abc import ABCMeta, abstractmethod
from typing import (Any, AsyncIterable, Awaitable, Callable, Generic, Optional,
Text, Union)
from typing import AsyncIterable, Awaitable, Generic, Optional, Text, Union

import grpc

from ._typing import EOFType, MetadataType, RequestType, ResponseType
from ._typing import (DoneCallbackType, EOFType, MetadataType, RequestType,
ResponseType)

__all__ = 'RpcContext', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall'

Expand Down Expand Up @@ -73,11 +73,11 @@ def cancel(self) -> bool:
"""

@abstractmethod
def add_done_callback(self, callback: Callable[[Any], None]) -> None:
def add_done_callback(self, callback: DoneCallbackType) -> None:
"""Registers a callback to be called on RPC termination.
Args:
callback: A callable object will be called with the context object as
callback: A callable object will be called with the call object as
its only argument.
"""

Expand Down
30 changes: 14 additions & 16 deletions src/python/grpcio/grpc/experimental/aio/_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
"""Invocation-side implementation of gRPC Asyncio Python."""

import asyncio
from typing import AsyncIterable, Awaitable, Dict, Optional
from typing import AsyncIterable, Awaitable, List, Dict, Optional

import grpc
from grpc import _common
from grpc._cython import cygrpc

from . import _base_call
from ._typing import (DeserializingFunction, MetadataType, RequestType,
ResponseType, SerializingFunction)
ResponseType, SerializingFunction, DoneCallbackType)

__all__ = 'AioRpcError', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall'

Expand Down Expand Up @@ -157,6 +157,7 @@ class Call(_base_call.Call):
_initial_metadata: Awaitable[MetadataType]
_locally_cancelled: bool
_cython_call: cygrpc._AioCall
_done_callbacks: List[DoneCallbackType]

def __init__(self, cython_call: cygrpc._AioCall) -> None:
self._loop = asyncio.get_event_loop()
Expand All @@ -165,6 +166,7 @@ def __init__(self, cython_call: cygrpc._AioCall) -> None:
self._initial_metadata = self._loop.create_future()
self._locally_cancelled = False
self._cython_call = cython_call
self._done_callbacks = []

def __del__(self) -> None:
if not self._status.done():
Expand Down Expand Up @@ -192,11 +194,14 @@ def cancel(self) -> bool:
def done(self) -> bool:
return self._status.done()

def add_done_callback(self, unused_callback) -> None:
raise NotImplementedError()
def add_done_callback(self, callback: DoneCallbackType) -> None:
if self.done():
callback(self)
else:
self._done_callbacks.append(callback)

def time_remaining(self) -> Optional[float]:
raise NotImplementedError()
return self._cython_call.time_remaining()

async def initial_metadata(self) -> MetadataType:
return await self._initial_metadata
Expand All @@ -220,9 +225,7 @@ def _set_initial_metadata(self, metadata: MetadataType) -> None:
def _set_status(self, status: cygrpc.AioRpcStatus) -> None:
"""Private method to set final status of the RPC.
This method may be called multiple time due to data race between local
cancellation (by application) and Core receiving status from peer. We
make no promise here which one will win.
This method should only be invoked once.
"""
# In case of local cancellation, flip the flag.
if status.details() is _LOCAL_CANCELLATION_DETAILS:
Expand All @@ -236,6 +239,9 @@ def _set_status(self, status: cygrpc.AioRpcStatus) -> None:
self._status.set_result(status)
self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()]

for callback in self._done_callbacks:
callback(self)

async def _raise_for_status(self) -> None:
if self._locally_cancelled:
raise asyncio.CancelledError()
Expand Down Expand Up @@ -265,8 +271,6 @@ def __str__(self) -> str:
return self._repr()


# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression
# pylint: disable=abstract-method
class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall):
"""Object for managing unary-unary RPC calls.
Expand Down Expand Up @@ -338,8 +342,6 @@ def __await__(self) -> ResponseType:
return response


# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression
# pylint: disable=abstract-method
class UnaryStreamCall(Call, _base_call.UnaryStreamCall):
"""Object for managing unary-stream RPC calls.
Expand Down Expand Up @@ -429,8 +431,6 @@ async def read(self) -> ResponseType:
return response_message


# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression
# pylint: disable=abstract-method
class StreamUnaryCall(Call, _base_call.StreamUnaryCall):
"""Object for managing stream-unary RPC calls.
Expand Down Expand Up @@ -550,8 +550,6 @@ async def done_writing(self) -> None:
await self._raise_for_status()


# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression
# pylint: disable=abstract-method
class StreamStreamCall(Call, _base_call.StreamStreamCall):
"""Object for managing stream-stream RPC calls.
Expand Down
1 change: 1 addition & 0 deletions src/python/grpcio/grpc/experimental/aio/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
MetadataType = Sequence[MetadatumType]
ChannelArgumentType = Sequence[Tuple[Text, Any]]
EOFType = type(EOF)
DoneCallbackType = Callable[[Any], None]
1 change: 1 addition & 0 deletions src/python/grpcio_tests/tests_aio/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"unit.channel_argument_test.TestChannelArgument",
"unit.channel_test.TestChannel",
"unit.connectivity_test.TestConnectivityState",
"unit.done_callback_test.TestDoneCallback",
"unit.init_test.TestInsecureChannel",
"unit.init_test.TestSecureChannel",
"unit.interceptor_test.TestInterceptedUnaryUnaryCall",
Expand Down
Loading

0 comments on commit b9083a9

Please sign in to comment.