From 18eefe44682f200305c9702c1778bbd25f4bb829 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Fri, 13 Oct 2023 13:56:30 -0700 Subject: [PATCH] Add gRPC AsyncIO request cancellation tests (#6408) * Fix gRPC test failure and refactor * Add gRPC AsyncIO cancellation tests * Better check if a request is cancelled * Use f-string --- .../client_cancellation_test.py | 249 ------------------ .../grpc_cancellation_test.py | 141 ++++++++++ qa/L0_request_cancellation/test.sh | 47 ++-- 3 files changed, 162 insertions(+), 275 deletions(-) delete mode 100755 qa/L0_request_cancellation/client_cancellation_test.py create mode 100755 qa/L0_request_cancellation/grpc_cancellation_test.py diff --git a/qa/L0_request_cancellation/client_cancellation_test.py b/qa/L0_request_cancellation/client_cancellation_test.py deleted file mode 100755 index c2bc0a0bf9..0000000000 --- a/qa/L0_request_cancellation/client_cancellation_test.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import sys - -sys.path.append("../common") - -import asyncio -import queue -import time -import unittest -from functools import partial - -import numpy as np -import test_util as tu -import tritonclient.grpc as grpcclient -import tritonclient.grpc.aio as aiogrpcclient -from tritonclient.utils import InferenceServerException - - -class UserData: - def __init__(self): - self._completed_requests = queue.Queue() - - -def callback(user_data, result, error): - if error: - user_data._completed_requests.put(error) - else: - user_data._completed_requests.put(result) - - -class ClientCancellationTest(tu.TestResultCollector): - def setUp(self): - self.model_name_ = "custom_identity_int32" - self.input0_data_ = np.array([[10]], dtype=np.int32) - self._start_time_ms = 0 - self._end_time_ms = 0 - - def _record_start_time_ms(self): - self._start_time_ms = int(round(time.time() * 1000)) - - def _record_end_time_ms(self): - self._end_time_ms = int(round(time.time() * 1000)) - - def _test_runtime_duration(self, upper_limit): - self.assertTrue( - (self._end_time_ms - self._start_time_ms) < upper_limit, - "test runtime expected less than " - + str(upper_limit) - + "ms response time, got " - + str(self._end_time_ms - self._start_time_ms) - + " ms", - ) - - def _prepare_request(self): - self.inputs_ = [] - self.inputs_.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32")) - self.outputs_ = [] - self.outputs_.append(grpcclient.InferRequestedOutput("OUTPUT0")) - - self.inputs_[0].set_data_from_numpy(self.input0_data_) - - def test_grpc_async_infer(self): - # Sends a request using async_infer to a - # model that takes 10s to execute. Issues - # a cancellation request after 2s. The client - # should return with appropriate exception within - # 5s. - triton_client = grpcclient.InferenceServerClient( - url="localhost:8001", verbose=True - ) - self._prepare_request() - - user_data = UserData() - - self._record_start_time_ms() - - with self.assertRaises(InferenceServerException) as cm: - future = triton_client.async_infer( - model_name=self.model_name_, - inputs=self.inputs_, - callback=partial(callback, user_data), - outputs=self.outputs_, - ) - time.sleep(2) - future.cancel() - - data_item = user_data._completed_requests.get() - if type(data_item) == InferenceServerException: - raise data_item - self.assertIn("Locally cancelled by application!", str(cm.exception)) - - self._record_end_time_ms() - self._test_runtime_duration(5000) - - def test_grpc_stream_infer(self): - # Sends a request using async_stream_infer to a - # model that takes 10s to execute. Issues stream - # closure with cancel_requests=True. The client - # should return with appropriate exception within - # 5s. - triton_client = grpcclient.InferenceServerClient( - url="localhost:8001", verbose=True - ) - - self._prepare_request() - user_data = UserData() - - triton_client.start_stream(callback=partial(callback, user_data)) - self._record_start_time_ms() - - with self.assertRaises(InferenceServerException) as cm: - for i in range(1): - triton_client.async_stream_infer( - model_name=self.model_name_, - inputs=self.inputs_, - outputs=self.outputs_, - ) - time.sleep(2) - triton_client.stop_stream(cancel_requests=True) - data_item = user_data._completed_requests.get() - if type(data_item) == InferenceServerException: - raise data_item - self.assertIn("Locally cancelled by application!", str(cm.exception)) - - self._record_end_time_ms() - self._test_runtime_duration(5000) - - -# Disabling AsyncIO cancellation testing. Enable once -# DLIS-5476 is implemented. -# def test_aio_grpc_async_infer(self): -# # Sends a request using infer of grpc.aio to a -# # model that takes 10s to execute. Issues -# # a cancellation request after 2s. The client -# # should return with appropriate exception within -# # 5s. -# async def cancel_request(call): -# await asyncio.sleep(2) -# self.assertTrue(call.cancel()) -# -# async def handle_response(generator): -# with self.assertRaises(asyncio.exceptions.CancelledError) as cm: -# _ = await anext(generator) -# -# async def test_aio_infer(self): -# triton_client = aiogrpcclient.InferenceServerClient( -# url="localhost:8001", verbose=True -# ) -# self._prepare_request() -# self._record_start_time_ms() -# -# generator = triton_client.infer( -# model_name=self.model_name_, -# inputs=self.inputs_, -# outputs=self.outputs_, -# get_call_obj=True, -# ) -# grpc_call = await anext(generator) -# -# tasks = [] -# tasks.append(asyncio.create_task(handle_response(generator))) -# tasks.append(asyncio.create_task(cancel_request(grpc_call))) -# -# for task in tasks: -# await task -# -# self._record_end_time_ms() -# self._test_runtime_duration(5000) -# -# asyncio.run(test_aio_infer(self)) -# -# def test_aio_grpc_stream_infer(self): -# # Sends a request using stream_infer of grpc.aio -# # library model that takes 10s to execute. Issues -# # stream closure with cancel_requests=True. The client -# # should return with appropriate exception within -# # 5s. -# async def test_aio_streaming_infer(self): -# async with aiogrpcclient.InferenceServerClient( -# url="localhost:8001", verbose=True -# ) as triton_client: -# -# async def async_request_iterator(): -# for i in range(1): -# await asyncio.sleep(1) -# yield { -# "model_name": self.model_name_, -# "inputs": self.inputs_, -# "outputs": self.outputs_, -# } -# -# self._prepare_request() -# self._record_start_time_ms() -# response_iterator = triton_client.stream_infer( -# inputs_iterator=async_request_iterator(), get_call_obj=True -# ) -# streaming_call = await anext(response_iterator) -# -# async def cancel_streaming(streaming_call): -# await asyncio.sleep(2) -# streaming_call.cancel() -# -# async def handle_response(response_iterator): -# with self.assertRaises(asyncio.exceptions.CancelledError) as cm: -# async for response in response_iterator: -# self.assertTrue(False, "Received an unexpected response!") -# -# tasks = [] -# tasks.append(asyncio.create_task(handle_response(response_iterator))) -# tasks.append(asyncio.create_task(cancel_streaming(streaming_call))) -# -# for task in tasks: -# await task -# -# self._record_end_time_ms() -# self._test_runtime_duration(5000) -# -# asyncio.run(test_aio_streaming_infer(self)) - - -if __name__ == "__main__": - unittest.main() diff --git a/qa/L0_request_cancellation/grpc_cancellation_test.py b/qa/L0_request_cancellation/grpc_cancellation_test.py new file mode 100755 index 0000000000..fadaa291e8 --- /dev/null +++ b/qa/L0_request_cancellation/grpc_cancellation_test.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import asyncio +import queue +import time +import unittest +from functools import partial + +import numpy as np +import tritonclient.grpc as grpcclient +import tritonclient.grpc.aio as grpcclientaio +from tritonclient.utils import InferenceServerException + + +class UserData: + def __init__(self): + self._completed_requests = queue.Queue() + + +def callback(user_data, result, error): + if error: + user_data._completed_requests.put(error) + else: + user_data._completed_requests.put(result) + + +class GrpcCancellationTest(unittest.IsolatedAsyncioTestCase): + _model_name = "custom_identity_int32" + _model_delay = 10.0 # seconds + _grpc_params = {"url": "localhost:8001", "verbose": True} + + def setUp(self): + self._client = grpcclient.InferenceServerClient(**self._grpc_params) + self._client_aio = grpcclientaio.InferenceServerClient(**self._grpc_params) + self._user_data = UserData() + self._callback = partial(callback, self._user_data) + self._prepare_request() + self._start_time = time.time() # seconds + + def tearDown(self): + self._end_time = time.time() # seconds + self._assert_max_duration() + + def _prepare_request(self): + self._inputs = [] + self._inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32")) + self._outputs = [] + self._outputs.append(grpcclient.InferRequestedOutput("OUTPUT0")) + self._inputs[0].set_data_from_numpy(np.array([[10]], dtype=np.int32)) + + def _assert_max_duration(self): + max_duration = self._model_delay * 0.5 # seconds + duration = self._end_time - self._start_time # seconds + self.assertLess( + duration, + max_duration, + f"test runtime expected less than {max_duration}s response time, got {duration}s", + ) + + def _assert_callback_cancelled(self): + self.assertFalse(self._user_data._completed_requests.empty()) + data_item = self._user_data._completed_requests.get() + self.assertIsInstance(data_item, InferenceServerException) + self.assertIn("Locally cancelled by application!", str(data_item)) + + def test_grpc_async_infer(self): + future = self._client.async_infer( + model_name=self._model_name, + inputs=self._inputs, + callback=self._callback, + outputs=self._outputs, + ) + time.sleep(2) # ensure the inference has started + future.cancel() + time.sleep(0.1) # context switch + self._assert_callback_cancelled() + + def test_grpc_stream_infer(self): + self._client.start_stream(callback=self._callback) + self._client.async_stream_infer( + model_name=self._model_name, inputs=self._inputs, outputs=self._outputs + ) + time.sleep(2) # ensure the inference has started + self._client.stop_stream(cancel_requests=True) + self._assert_callback_cancelled() + + async def test_aio_grpc_async_infer(self): + infer_task = asyncio.create_task( + self._client_aio.infer( + model_name=self._model_name, inputs=self._inputs, outputs=self._outputs + ) + ) + await asyncio.sleep(2) # ensure the inference has started + infer_task.cancel() + with self.assertRaises(asyncio.CancelledError): + await infer_task + + async def test_aio_grpc_stream_infer(self): + async def requests_generator(): + yield { + "model_name": self._model_name, + "inputs": self._inputs, + "outputs": self._outputs, + } + + responses_iterator = self._client_aio.stream_infer(requests_generator()) + await asyncio.sleep(2) # ensure the inference has started + self.assertTrue(responses_iterator.cancel()) + with self.assertRaises(asyncio.CancelledError): + async for result, error in responses_iterator: + self._callback(result, error) + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_request_cancellation/test.sh b/qa/L0_request_cancellation/test.sh index 989d7ddd6d..4929be3a5f 100755 --- a/qa/L0_request_cancellation/test.sh +++ b/qa/L0_request_cancellation/test.sh @@ -60,13 +60,13 @@ mkdir -p models/model/1 && (cd models/model && \ SERVER_LOG=server.log LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG if [ $? -ne 0 ]; then + echo -e "\n***\n*** Unit Tests Failed\n***" cat $SERVER_LOG - RET=1 + RET=1 fi - # -# gRPC client tests +# gRPC cancellation tests # rm -rf models && mkdir models mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \ @@ -78,16 +78,12 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \ echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \ echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt) -CLIENT_CANCELLATION_TEST=grpc_client_test.client_cancellation_test.py -TEST_RESULT_FILE='grpc_client_test.test_results.txt' -CLIENT_LOG=`pwd`/grpc_client_test.client.log -SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1" -# gRPC client-side cancellation tests... -for i in test_grpc_async_infer \ - test_grpc_stream_infer \ - ; do +for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc_async_infer" "test_aio_grpc_stream_infer"; do + + TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log" + SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log" - SERVER_LOG=grpc_client_test.${i}.server.log + SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -96,27 +92,26 @@ for i in test_grpc_async_infer \ fi set +e - python $CLIENT_CANCELLATION_TEST ClientCancellationTest.$i >>$CLIENT_LOG 2>&1 + python grpc_cancellation_test.py GrpcCancellationTest.$TEST_CASE > $TEST_LOG 2>&1 if [ $? -ne 0 ]; then - echo -e "\n***\n*** Test $i Failed\n***" >>$CLIENT_LOG - echo -e "\n***\n*** Test $i Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Result Verification Failed\n***" - RET=1 - fi + echo -e "\n***\n*** gRPC Cancellation Tests Failed on $TEST_CASE\n***" + cat $TEST_LOG + RET=1 + fi + grep "Cancellation notification received for" $SERVER_LOG + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***" + cat $SERVER_LOG + RET=1 fi - set -e + kill $SERVER_PID wait $SERVER_PID done # -# End-to-end tests +# End-to-end scheduler tests # rm -rf models && mkdir models mkdir -p models/dynamic_batch/1 && (cd models/dynamic_batch && \ @@ -171,8 +166,8 @@ fi set +e python scheduler_test.py > $TEST_LOG 2>&1 if [ $? -ne 0 ]; then + echo -e "\n***\n*** Scheduler Tests Failed\n***" cat $TEST_LOG - echo -e "\n***\n*** Test Failed\n***" RET=1 fi set -e