From adab340647fc6b1ebfea5580d60ad27c3c65b27b Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 15 Jan 2020 18:07:27 -0800 Subject: [PATCH 1/6] Simplify call_test using mixin class --- .../grpcio_tests/tests_aio/unit/call_test.py | 508 ++++++++---------- 1 file changed, 215 insertions(+), 293 deletions(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 05bd3e56897ea..7a71ddcd317be 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -14,19 +14,17 @@ """Tests behavior of the grpc.aio.UnaryUnaryCall class.""" import asyncio +import datetime import logging import unittest -import datetime import grpc - from grpc.experimental import aio -from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import test_pb2_grpc + +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc from tests.unit.framework.common import test_constants -from tests_aio.unit._test_server import start_test_server from tests_aio.unit._test_base import AioTestBase -from src.proto.grpc.testing import messages_pb2 +from tests_aio.unit._test_server import start_test_server _NUM_STREAM_RESPONSES = 5 _RESPONSE_PAYLOAD_SIZE = 42 @@ -37,44 +35,41 @@ _INFINITE_INTERVAL_US = 2**31 - 1 -class TestUnaryUnaryCall(AioTestBase): +class _MulticallableTestMixin(): async def setUp(self): - self._server_target, self._server = await start_test_server() + address, self._server = await start_test_server() + self._channel = aio.insecure_channel(address) + self._stub = test_pb2_grpc.TestServiceStub(self._channel) async def tearDown(self): + await self._channel.close() await self._server.stop(None) + +class TestUnaryUnaryCall(_MulticallableTestMixin, AioTestBase): + async def test_call_ok(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertFalse(call.done()) + self.assertFalse(call.done()) - response = await call + response = await call - self.assertTrue(call.done()) - self.assertIsInstance(response, messages_pb2.SimpleResponse) - self.assertEqual(await call.code(), grpc.StatusCode.OK) + self.assertTrue(call.done()) + self.assertIsInstance(response, messages_pb2.SimpleResponse) + self.assertEqual(await call.code(), grpc.StatusCode.OK) - # Response is cached at call object level, reentrance - # returns again the same response - response_retry = await call - self.assertIs(response, response_retry) + # Response is cached at call object level, reentrance + # returns again the same response + response_retry = await call + self.assertIs(response, response_retry) async def test_call_rpc_error(self): async with aio.insecure_channel(_UNREACHABLE_TARGET) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString, - ) + stub = test_pb2_grpc.TestServiceStub(channel) - call = hi(messages_pb2.SimpleRequest(), timeout=0.1) + call = stub.UnaryCall(messages_pb2.SimpleRequest(), timeout=0.1) with self.assertRaises(grpc.RpcError) as exception_context: await call @@ -95,327 +90,272 @@ async def test_call_rpc_error(self): exception_context_retry.exception) async def test_call_code_awaitable(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) - self.assertEqual(await call.code(), grpc.StatusCode.OK) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + self.assertEqual(await call.code(), grpc.StatusCode.OK) async def test_call_details_awaitable(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) - self.assertEqual('', await call.details()) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + self.assertEqual('', await call.details()) async def test_call_initial_metadata_awaitable(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) - self.assertEqual((), await call.initial_metadata()) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + self.assertEqual((), await call.initial_metadata()) async def test_call_trailing_metadata_awaitable(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) - self.assertEqual((), await call.trailing_metadata()) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + self.assertEqual((), await call.trailing_metadata()) async def test_cancel_unary_unary(self): - async with aio.insecure_channel(self._server_target) as channel: - hi = channel.unary_unary( - '/grpc.testing.TestService/UnaryCall', - request_serializer=messages_pb2.SimpleRequest.SerializeToString, - response_deserializer=messages_pb2.SimpleResponse.FromString) - call = hi(messages_pb2.SimpleRequest()) + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) - self.assertFalse(call.cancelled()) + self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) + self.assertTrue(call.cancel()) + self.assertFalse(call.cancel()) - with self.assertRaises(asyncio.CancelledError): - await call + with self.assertRaises(asyncio.CancelledError): + await call - # The info in the RpcError should match the info in Call object. - self.assertTrue(call.cancelled()) - self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) - self.assertEqual(await call.details(), - 'Locally cancelled by application!') + # The info in the RpcError should match the info in Call object. + self.assertTrue(call.cancelled()) + self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) + self.assertEqual(await call.details(), + 'Locally cancelled by application!') async def test_cancel_unary_unary_in_task(self): - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - coro_started = asyncio.Event() - call = stub.EmptyCall(messages_pb2.SimpleRequest()) - - async def another_coro(): - coro_started.set() - await call + coro_started = asyncio.Event() + call = self._stub.EmptyCall(messages_pb2.SimpleRequest()) - task = self.loop.create_task(another_coro()) - await coro_started.wait() + async def another_coro(): + coro_started.set() + await call - self.assertFalse(task.done()) - task.cancel() + task = self.loop.create_task(another_coro()) + await coro_started.wait() - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + self.assertFalse(task.done()) + task.cancel() - with self.assertRaises(asyncio.CancelledError): - await task + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + with self.assertRaises(asyncio.CancelledError): + await task -class TestUnaryStreamCall(AioTestBase): - async def setUp(self): - self._server_target, self._server = await start_test_server() - - async def tearDown(self): - await self._server.stop(None) +class TestUnaryStreamCall(_MulticallableTestMixin, AioTestBase): async def test_cancel_unary_stream(self): - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_RESPONSE_INTERVAL_US, + )) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) + self.assertFalse(call.cancelled()) - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + response = await call.read() + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - self.assertTrue(call.cancel()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) - self.assertFalse(call.cancel()) + self.assertTrue(call.cancel()) + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await + call.details()) + self.assertFalse(call.cancel()) - with self.assertRaises(asyncio.CancelledError): - await call.read() - self.assertTrue(call.cancelled()) + with self.assertRaises(asyncio.CancelledError): + await call.read() + self.assertTrue(call.cancelled()) async def test_multiple_cancel_unary_stream(self): - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_RESPONSE_INTERVAL_US, + )) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) + self.assertFalse(call.cancelled()) - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + response = await call.read() + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) - self.assertFalse(call.cancel()) - self.assertFalse(call.cancel()) + self.assertTrue(call.cancel()) + self.assertFalse(call.cancel()) + self.assertFalse(call.cancel()) + self.assertFalse(call.cancel()) - with self.assertRaises(asyncio.CancelledError): - await call.read() + with self.assertRaises(asyncio.CancelledError): + await call.read() async def test_early_cancel_unary_stream(self): """Test cancellation before receiving messages.""" - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_RESPONSE_INTERVAL_US, - )) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_RESPONSE_INTERVAL_US, + )) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) - self.assertTrue(call.cancel()) - self.assertFalse(call.cancel()) + self.assertFalse(call.cancelled()) + self.assertTrue(call.cancel()) + self.assertFalse(call.cancel()) - with self.assertRaises(asyncio.CancelledError): - await call.read() + with self.assertRaises(asyncio.CancelledError): + await call.read() - self.assertTrue(call.cancelled()) + self.assertTrue(call.cancelled()) - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await + call.details()) async def test_late_cancel_unary_stream(self): """Test cancellation after received all messages.""" - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE,)) - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) + for _ in range(_NUM_STREAM_RESPONSES): + response = await call.read() + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, + len(response.payload.body)) - for _ in range(_NUM_STREAM_RESPONSES): - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) - - # After all messages received, it is possible that the final state - # is received or on its way. It's basically a data race, so our - # expectation here is do not crash :) - call.cancel() - self.assertIn(await call.code(), - [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) + # After all messages received, it is possible that the final state + # is received or on its way. It's basically a data race, so our + # expectation here is do not crash :) + call.cancel() + self.assertIn(await call.code(), + [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) async def test_too_many_reads_unary_stream(self): """Test calling read after received all messages fails.""" - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE,)) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) - for _ in range(_NUM_STREAM_RESPONSES): - response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) - self.assertIs(await call.read(), aio.EOF) + for _ in range(_NUM_STREAM_RESPONSES): + response = await call.read() + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, + len(response.payload.body)) + self.assertIs(await call.read(), aio.EOF) - # After the RPC is finished, further reads will lead to exception. - self.assertEqual(await call.code(), grpc.StatusCode.OK) - self.assertIs(await call.read(), aio.EOF) + # After the RPC is finished, further reads will lead to exception. + self.assertEqual(await call.code(), grpc.StatusCode.OK) + self.assertIs(await call.read(), aio.EOF) async def test_unary_stream_async_generator(self): """Sunny day test case for unary_stream.""" - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - - # Prepares the request - request = messages_pb2.StreamingOutputCallRequest() - for _ in range(_NUM_STREAM_RESPONSES): - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + # Prepares the request + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE,)) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) - self.assertFalse(call.cancelled()) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) + self.assertFalse(call.cancelled()) - async for response in call: - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + async for response in call: + self.assertIs(type(response), + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, + len(response.payload.body)) - self.assertEqual(await call.code(), grpc.StatusCode.OK) + self.assertEqual(await call.code(), grpc.StatusCode.OK) async def test_cancel_unary_stream_in_task_using_read(self): - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - coro_started = asyncio.Event() + coro_started = asyncio.Event() - # Configs the server method to block forever - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_INFINITE_INTERVAL_US, - )) + # Configs the server method to block forever + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) - async def another_coro(): - coro_started.set() - await call.read() + async def another_coro(): + coro_started.set() + await call.read() - task = self.loop.create_task(another_coro()) - await coro_started.wait() + task = self.loop.create_task(another_coro()) + await coro_started.wait() - self.assertFalse(task.done()) - task.cancel() + self.assertFalse(task.done()) + task.cancel() - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - with self.assertRaises(asyncio.CancelledError): - await task + with self.assertRaises(asyncio.CancelledError): + await task async def test_cancel_unary_stream_in_task_using_async_for(self): - async with aio.insecure_channel(self._server_target) as channel: - stub = test_pb2_grpc.TestServiceStub(channel) - coro_started = asyncio.Event() + coro_started = asyncio.Event() - # Configs the server method to block forever - request = messages_pb2.StreamingOutputCallRequest() - request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_INFINITE_INTERVAL_US, - )) + # Configs the server method to block forever + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) - # Invokes the actual RPC - call = stub.StreamingOutputCall(request) + # Invokes the actual RPC + call = self._stub.StreamingOutputCall(request) - async def another_coro(): - coro_started.set() - async for _ in call: - pass + async def another_coro(): + coro_started.set() + async for _ in call: + pass - task = self.loop.create_task(another_coro()) - await coro_started.wait() + task = self.loop.create_task(another_coro()) + await coro_started.wait() - self.assertFalse(task.done()) - task.cancel() + self.assertFalse(task.done()) + task.cancel() - self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) + self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) - with self.assertRaises(asyncio.CancelledError): - await task + with self.assertRaises(asyncio.CancelledError): + await task def test_call_credentials(self): @@ -445,16 +385,7 @@ async def coro(): self.loop.run_until_complete(coro()) -class TestStreamUnaryCall(AioTestBase): - - async def setUp(self): - self._server_target, self._server = await start_test_server() - self._channel = aio.insecure_channel(self._server_target) - self._stub = test_pb2_grpc.TestServiceStub(self._channel) - - async def tearDown(self): - await self._channel.close() - await self._server.stop(None) +class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): async def test_cancel_stream_unary(self): call = self._stub.StreamingInputCall() @@ -564,16 +495,7 @@ async def cancel_later(): messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) -class TestStreamStreamCall(AioTestBase): - - async def setUp(self): - self._server_target, self._server = await start_test_server() - self._channel = aio.insecure_channel(self._server_target) - self._stub = test_pb2_grpc.TestServiceStub(self._channel) - - async def tearDown(self): - await self._channel.close() - await self._server.stop(None) +class TestStreamStreamCall(_MulticallableTestMixin, AioTestBase): async def test_cancel(self): # Invokes the actual RPC From 1c78ccd44eda86f48ec96d2c7735d298873a56d0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Wed, 15 Jan 2020 18:07:45 -0800 Subject: [PATCH 2/6] Implement add_done_callbacks and time_remaining --- .../grpc/_cython/_cygrpc/aio/call.pxd.pxi | 2 + .../grpc/_cython/_cygrpc/aio/call.pyx.pxi | 7 + .../grpc/experimental/aio/_base_call.py | 6 +- .../grpcio/grpc/experimental/aio/_call.py | 22 ++- .../grpcio/grpc/experimental/aio/_typing.py | 1 + .../tests_aio/unit/done_callback_test.py | 160 ++++++++++++++++++ 6 files changed, 187 insertions(+), 11 deletions(-) create mode 100644 src/python/grpcio_tests/tests_aio/unit/done_callback_test.py diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi index d95cf5c52c93b..fffa61075ab17 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi @@ -28,4 +28,6 @@ cdef class _AioCall(GrpcCallWrapper): # because Core is holding a pointer for the callback handler. bint _is_locally_cancelled + object _deadline + cdef void _create_grpc_call(self, object timeout, bytes method, CallCredentials credentials) except * diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi index 279c416a923f8..6f7346dd63096 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi @@ -36,6 +36,7 @@ cdef class _AioCall(GrpcCallWrapper): self._loop = asyncio.get_event_loop() self._create_grpc_call(deadline, method, call_credentials) self._is_locally_cancelled = False + self._deadline = deadline def __dealloc__(self): if self.call: @@ -84,6 +85,12 @@ cdef class _AioCall(GrpcCallWrapper): grpc_slice_unref(method_slice) + def time_remaining(self): + if self._deadline is None: + return None + else: + return max(0, self._deadline - time.time()) + def cancel(self, AioRpcStatus status): """Cancels the RPC in Core with given RPC status. diff --git a/src/python/grpcio/grpc/experimental/aio/_base_call.py b/src/python/grpcio/grpc/experimental/aio/_base_call.py index bdd6902d89371..fb0108de2e12b 100644 --- a/src/python/grpcio/grpc/experimental/aio/_base_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_base_call.py @@ -24,7 +24,7 @@ import grpc -from ._typing import EOFType, MetadataType, RequestType, ResponseType +from ._typing import EOFType, MetadataType, RequestType, ResponseType, DoneCallbackType __all__ = 'RpcContext', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall' @@ -73,11 +73,11 @@ def cancel(self) -> bool: """ @abstractmethod - def add_done_callback(self, callback: Callable[[Any], None]) -> None: + def add_done_callback(self, callback: DoneCallbackType) -> None: """Registers a callback to be called on RPC termination. Args: - callback: A callable object will be called with the context object as + callback: A callable object will be called with the call object as its only argument. """ diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index c8bd10244b4d4..d443834accc27 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -14,7 +14,7 @@ """Invocation-side implementation of gRPC Asyncio Python.""" import asyncio -from typing import AsyncIterable, Awaitable, Dict, Optional +from typing import AsyncIterable, Awaitable, List, Dict, Optional import grpc from grpc import _common @@ -22,7 +22,7 @@ from . import _base_call from ._typing import (DeserializingFunction, MetadataType, RequestType, - ResponseType, SerializingFunction) + ResponseType, SerializingFunction, DoneCallbackType) __all__ = 'AioRpcError', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall' @@ -157,6 +157,7 @@ class Call(_base_call.Call): _initial_metadata: Awaitable[MetadataType] _locally_cancelled: bool _cython_call: cygrpc._AioCall + _done_callbacks: List[DoneCallbackType] def __init__(self, cython_call: cygrpc._AioCall) -> None: self._loop = asyncio.get_event_loop() @@ -165,6 +166,7 @@ def __init__(self, cython_call: cygrpc._AioCall) -> None: self._initial_metadata = self._loop.create_future() self._locally_cancelled = False self._cython_call = cython_call + self._done_callbacks = [] def __del__(self) -> None: if not self._status.done(): @@ -192,11 +194,14 @@ def cancel(self) -> bool: def done(self) -> bool: return self._status.done() - def add_done_callback(self, unused_callback) -> None: - raise NotImplementedError() + def add_done_callback(self, callback: DoneCallbackType) -> None: + if self.done(): + callback(self) + else: + self._done_callbacks.append(callback) def time_remaining(self) -> Optional[float]: - raise NotImplementedError() + return self._cython_call.time_remaining() async def initial_metadata(self) -> MetadataType: return await self._initial_metadata @@ -220,9 +225,7 @@ def _set_initial_metadata(self, metadata: MetadataType) -> None: def _set_status(self, status: cygrpc.AioRpcStatus) -> None: """Private method to set final status of the RPC. - This method may be called multiple time due to data race between local - cancellation (by application) and Core receiving status from peer. We - make no promise here which one will win. + This method should only be invoked once. """ # In case of local cancellation, flip the flag. if status.details() is _LOCAL_CANCELLATION_DETAILS: @@ -236,6 +239,9 @@ def _set_status(self, status: cygrpc.AioRpcStatus) -> None: self._status.set_result(status) self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[status.code()] + for callback in self._done_callbacks: + callback(self) + async def _raise_for_status(self) -> None: if self._locally_cancelled: raise asyncio.CancelledError() diff --git a/src/python/grpcio/grpc/experimental/aio/_typing.py b/src/python/grpcio/grpc/experimental/aio/_typing.py index c60eab854499f..15583754a630a 100644 --- a/src/python/grpcio/grpc/experimental/aio/_typing.py +++ b/src/python/grpcio/grpc/experimental/aio/_typing.py @@ -24,3 +24,4 @@ MetadataType = Sequence[MetadatumType] ChannelArgumentType = Sequence[Tuple[Text, Any]] EOFType = type(EOF) +DoneCallbackType = Callable[[Any], None] diff --git a/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py new file mode 100644 index 0000000000000..f5f6a75974a58 --- /dev/null +++ b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py @@ -0,0 +1,160 @@ +# Copyright 2020 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Testing the done callbacks mechanism.""" + + +# Copyright 2019 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import unittest +import time +import gc + +import grpc +from grpc.experimental import aio +from tests_aio.unit._test_base import AioTestBase +from tests.unit.framework.common import test_constants +from src.proto.grpc.testing import messages_pb2, test_pb2_grpc +from tests_aio.unit._test_server import start_test_server + + +_NUM_STREAM_RESPONSES = 5 +_REQUEST_PAYLOAD_SIZE = 7 +_RESPONSE_PAYLOAD_SIZE = 42 + + +def _inject_callbacks(call): + first_callback_ran = asyncio.Event() + def first_callback(unused_call): + first_callback_ran.set() + + second_callback_ran = asyncio.Event() + def second_callback(unused_call): + second_callback_ran.set() + + call.add_done_callback(first_callback) + call.add_done_callback(second_callback) + + async def validation(): + await asyncio.wait_for( + asyncio.gather(first_callback_ran.wait(), second_callback_ran.wait()), + test_constants.SHORT_TIMEOUT + ) + + return validation() + + +class TestDoneCallback(AioTestBase): + + async def setUp(self): + address, self._server = await start_test_server() + self._channel = aio.insecure_channel(address) + self._stub = test_pb2_grpc.TestServiceStub(self._channel) + + async def tearDown(self): + await self._channel.close() + await self._server.stop(None) + + async def test_add_after_done(self): + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + validation = _inject_callbacks(call) + await validation + + async def test_unary_unary(self): + call = self._stub.UnaryCall(messages_pb2.SimpleRequest()) + validation = _inject_callbacks(call) + + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + await validation + + async def test_unary_stream(self): + request = messages_pb2.StreamingOutputCallRequest() + for _ in range(_NUM_STREAM_RESPONSES): + request.response_parameters.append( + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) + + call = self._stub.StreamingOutputCall(request) + validation = _inject_callbacks(call) + + response_cnt = 0 + async for response in call: + response_cnt += 1 + self.assertIsInstance(response, + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + self.assertEqual(_NUM_STREAM_RESPONSES, response_cnt) + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + await validation + + async def test_stream_unary(self): + payload = messages_pb2.Payload(body=b'\0' * _REQUEST_PAYLOAD_SIZE) + request = messages_pb2.StreamingInputCallRequest(payload=payload) + + async def gen(): + for _ in range(_NUM_STREAM_RESPONSES): + yield request + + call = self._stub.StreamingInputCall(gen()) + validation = _inject_callbacks(call) + + response = await call + self.assertIsInstance(response, messages_pb2.StreamingInputCallResponse) + self.assertEqual(_NUM_STREAM_RESPONSES * _REQUEST_PAYLOAD_SIZE, + response.aggregated_payload_size) + self.assertEqual(grpc.StatusCode.OK, await call.code()) + + await validation + + async def test_stream_stream(self): + call = self._stub.FullDuplexCall() + validation = _inject_callbacks(call) + + request = messages_pb2.StreamingOutputCallRequest() + request.response_parameters.append( + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE)) + + for _ in range(_NUM_STREAM_RESPONSES): + await call.write(request) + response = await call.read() + self.assertIsInstance(response, + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + await call.done_writing() + + self.assertEqual(grpc.StatusCode.OK, await call.code()) + await validation + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + unittest.main(verbosity=2) From 21670ed1a8c79726e7073e9622d262eb6c4eb07c Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 16 Jan 2020 11:59:36 -0800 Subject: [PATCH 3/6] Add time remaining test case --- .../grpcio_tests/tests_aio/unit/call_test.py | 67 ++++++++++++------- .../tests_aio/unit/done_callback_test.py | 10 +-- 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 7a71ddcd317be..6fc5b43dec1df 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -120,7 +120,7 @@ async def test_cancel_unary_unary(self): self.assertTrue(call.cancelled()) self.assertEqual(await call.code(), grpc.StatusCode.CANCELLED) self.assertEqual(await call.details(), - 'Locally cancelled by application!') + 'Locally cancelled by application!') async def test_cancel_unary_unary_in_task(self): coro_started = asyncio.Event() @@ -159,14 +159,13 @@ async def test_cancel_unary_stream(self): self.assertFalse(call.cancelled()) response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) + self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertTrue(call.cancel()) self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) + call.details()) self.assertFalse(call.cancel()) with self.assertRaises(asyncio.CancelledError): @@ -188,8 +187,7 @@ async def test_multiple_cancel_unary_stream(self): self.assertFalse(call.cancelled()) response = await call.read() - self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) + self.assertIs(type(response), messages_pb2.StreamingOutputCallResponse) self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertTrue(call.cancel()) @@ -225,7 +223,7 @@ async def test_early_cancel_unary_stream(self): self.assertEqual(grpc.StatusCode.CANCELLED, await call.code()) self.assertEqual(_LOCAL_CANCEL_DETAILS_EXPECTATION, await - call.details()) + call.details()) async def test_late_cancel_unary_stream(self): """Test cancellation after received all messages.""" @@ -233,8 +231,7 @@ async def test_late_cancel_unary_stream(self): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -242,16 +239,15 @@ async def test_late_cancel_unary_stream(self): for _ in range(_NUM_STREAM_RESPONSES): response = await call.read() self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) # After all messages received, it is possible that the final state # is received or on its way. It's basically a data race, so our # expectation here is do not crash :) call.cancel() self.assertIn(await call.code(), - [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) + [grpc.StatusCode.OK, grpc.StatusCode.CANCELLED]) async def test_too_many_reads_unary_stream(self): """Test calling read after received all messages fails.""" @@ -259,8 +255,7 @@ async def test_too_many_reads_unary_stream(self): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -268,9 +263,8 @@ async def test_too_many_reads_unary_stream(self): for _ in range(_NUM_STREAM_RESPONSES): response = await call.read() self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertIs(await call.read(), aio.EOF) # After the RPC is finished, further reads will lead to exception. @@ -283,8 +277,7 @@ async def test_unary_stream_async_generator(self): request = messages_pb2.StreamingOutputCallRequest() for _ in range(_NUM_STREAM_RESPONSES): request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE,)) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Invokes the actual RPC call = self._stub.StreamingOutputCall(request) @@ -292,9 +285,8 @@ async def test_unary_stream_async_generator(self): async for response in call: self.assertIs(type(response), - messages_pb2.StreamingOutputCallResponse) - self.assertEqual(_RESPONSE_PAYLOAD_SIZE, - len(response.payload.body)) + messages_pb2.StreamingOutputCallResponse) + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) self.assertEqual(await call.code(), grpc.StatusCode.OK) @@ -384,6 +376,35 @@ async def coro(): self.loop.run_until_complete(coro()) + async def test_time_remaining(self): + request = messages_pb2.StreamingOutputCallRequest() + # First message comes back immediately + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_INFINITE_INTERVAL_US, + )) + # Second message comes back after a unit of wait time + request.response_parameters.append( + messages_pb2.ResponseParameters( + size=_RESPONSE_PAYLOAD_SIZE, + interval_us=_RESPONSE_INTERVAL_US, + )) + + call = self._stub.StreamingOutputCall( + request, timeout=test_constants.SHORT_TIMEOUT*2) + + response = await call.read() + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + remained_time = call.time_remaining() + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT//2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*3//2) + + response = await call.read() + self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + + self.assertEqual(grpc.StatusCode.OK, await call.code()) class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): diff --git a/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py index f5f6a75974a58..93eddcf091739 100644 --- a/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/done_callback_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Testing the done callbacks mechanism.""" - # Copyright 2019 The gRPC Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -41,7 +40,6 @@ from src.proto.grpc.testing import messages_pb2, test_pb2_grpc from tests_aio.unit._test_server import start_test_server - _NUM_STREAM_RESPONSES = 5 _REQUEST_PAYLOAD_SIZE = 7 _RESPONSE_PAYLOAD_SIZE = 42 @@ -49,10 +47,12 @@ def _inject_callbacks(call): first_callback_ran = asyncio.Event() + def first_callback(unused_call): first_callback_ran.set() second_callback_ran = asyncio.Event() + def second_callback(unused_call): second_callback_ran.set() @@ -61,9 +61,9 @@ def second_callback(unused_call): async def validation(): await asyncio.wait_for( - asyncio.gather(first_callback_ran.wait(), second_callback_ran.wait()), - test_constants.SHORT_TIMEOUT - ) + asyncio.gather(first_callback_ran.wait(), + second_callback_ran.wait()), + test_constants.SHORT_TIMEOUT) return validation() From 780bff32eba754d774bf21eb2f01744370ba472d Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 16 Jan 2020 12:06:08 -0800 Subject: [PATCH 4/6] Fix the test case --- src/python/grpcio_tests/tests_aio/unit/call_test.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 6fc5b43dec1df..7901df750dc03 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -382,7 +382,6 @@ async def test_time_remaining(self): request.response_parameters.append( messages_pb2.ResponseParameters( size=_RESPONSE_PAYLOAD_SIZE, - interval_us=_INFINITE_INTERVAL_US, )) # Second message comes back after a unit of wait time request.response_parameters.append( @@ -397,13 +396,19 @@ async def test_time_remaining(self): response = await call.read() self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + # Should be around the same as the timeout remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT//2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*3//2) + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT*3//2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*2) response = await call.read() self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) + # Should be around the timeout minus a unit of wait time + remained_time = call.time_remaining() + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT//2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*3//2) + self.assertEqual(grpc.StatusCode.OK, await call.code()) class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): From 0fc1d212aa44ba608e119f30c8c59bfcf3369312 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 16 Jan 2020 12:13:36 -0800 Subject: [PATCH 5/6] Make sanity tests happy --- .../grpcio/grpc/experimental/aio/_base_call.py | 6 +++--- src/python/grpcio_tests/tests_aio/tests.json | 1 + .../grpcio_tests/tests_aio/unit/call_test.py | 15 +++++++-------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_base_call.py b/src/python/grpcio/grpc/experimental/aio/_base_call.py index fb0108de2e12b..318a1edfcc1df 100644 --- a/src/python/grpcio/grpc/experimental/aio/_base_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_base_call.py @@ -19,12 +19,12 @@ """ from abc import ABCMeta, abstractmethod -from typing import (Any, AsyncIterable, Awaitable, Callable, Generic, Optional, - Text, Union) +from typing import AsyncIterable, Awaitable, Generic, Optional, Text, Union import grpc -from ._typing import EOFType, MetadataType, RequestType, ResponseType, DoneCallbackType +from ._typing import (DoneCallbackType, EOFType, MetadataType, RequestType, + ResponseType) __all__ = 'RpcContext', 'Call', 'UnaryUnaryCall', 'UnaryStreamCall' diff --git a/src/python/grpcio_tests/tests_aio/tests.json b/src/python/grpcio_tests/tests_aio/tests.json index e3afb39a50237..884d7c98f1c8f 100644 --- a/src/python/grpcio_tests/tests_aio/tests.json +++ b/src/python/grpcio_tests/tests_aio/tests.json @@ -9,6 +9,7 @@ "unit.channel_argument_test.TestChannelArgument", "unit.channel_test.TestChannel", "unit.connectivity_test.TestConnectivityState", + "unit.done_callback_test.TestDoneCallback", "unit.init_test.TestInsecureChannel", "unit.init_test.TestSecureChannel", "unit.interceptor_test.TestInterceptedUnaryUnaryCall", diff --git a/src/python/grpcio_tests/tests_aio/unit/call_test.py b/src/python/grpcio_tests/tests_aio/unit/call_test.py index 7901df750dc03..9482a0e19574e 100644 --- a/src/python/grpcio_tests/tests_aio/unit/call_test.py +++ b/src/python/grpcio_tests/tests_aio/unit/call_test.py @@ -380,9 +380,7 @@ async def test_time_remaining(self): request = messages_pb2.StreamingOutputCallRequest() # First message comes back immediately request.response_parameters.append( - messages_pb2.ResponseParameters( - size=_RESPONSE_PAYLOAD_SIZE, - )) + messages_pb2.ResponseParameters(size=_RESPONSE_PAYLOAD_SIZE,)) # Second message comes back after a unit of wait time request.response_parameters.append( messages_pb2.ResponseParameters( @@ -391,26 +389,27 @@ async def test_time_remaining(self): )) call = self._stub.StreamingOutputCall( - request, timeout=test_constants.SHORT_TIMEOUT*2) + request, timeout=test_constants.SHORT_TIMEOUT * 2) response = await call.read() self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) # Should be around the same as the timeout remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT*3//2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*2) + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 2) response = await call.read() self.assertEqual(_RESPONSE_PAYLOAD_SIZE, len(response.payload.body)) # Should be around the timeout minus a unit of wait time remained_time = call.time_remaining() - self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT//2) - self.assertLess(remained_time, test_constants.SHORT_TIMEOUT*3//2) + self.assertGreater(remained_time, test_constants.SHORT_TIMEOUT // 2) + self.assertLess(remained_time, test_constants.SHORT_TIMEOUT * 3 // 2) self.assertEqual(grpc.StatusCode.OK, await call.code()) + class TestStreamUnaryCall(_MulticallableTestMixin, AioTestBase): async def test_cancel_stream_unary(self): From c1eab2e95f3f3d2ea3d1fd43a21f06fe5c67ac3f Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Thu, 16 Jan 2020 12:15:12 -0800 Subject: [PATCH 6/6] Remove the suppression --- src/python/grpcio/grpc/experimental/aio/_call.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/python/grpcio/grpc/experimental/aio/_call.py b/src/python/grpcio/grpc/experimental/aio/_call.py index d443834accc27..827d027ba043d 100644 --- a/src/python/grpcio/grpc/experimental/aio/_call.py +++ b/src/python/grpcio/grpc/experimental/aio/_call.py @@ -271,8 +271,6 @@ def __str__(self) -> str: return self._repr() -# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression -# pylint: disable=abstract-method class UnaryUnaryCall(Call, _base_call.UnaryUnaryCall): """Object for managing unary-unary RPC calls. @@ -344,8 +342,6 @@ def __await__(self) -> ResponseType: return response -# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression -# pylint: disable=abstract-method class UnaryStreamCall(Call, _base_call.UnaryStreamCall): """Object for managing unary-stream RPC calls. @@ -435,8 +431,6 @@ async def read(self) -> ResponseType: return response_message -# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression -# pylint: disable=abstract-method class StreamUnaryCall(Call, _base_call.StreamUnaryCall): """Object for managing stream-unary RPC calls. @@ -556,8 +550,6 @@ async def done_writing(self) -> None: await self._raise_for_status() -# TODO(https://github.com/grpc/grpc/issues/21623) remove this suppression -# pylint: disable=abstract-method class StreamStreamCall(Call, _base_call.StreamStreamCall): """Object for managing stream-stream RPC calls.