From 85be200d5e044a99fe3594083ff4146ddbeca523 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Mon, 9 Oct 2023 19:10:47 -0700 Subject: [PATCH 1/4] Fix gRPC test failure and refactor --- ...tion_test.py => grpc_cancellation_test.py} | 157 +++++++----------- qa/L0_request_cancellation/test.sh | 41 ++--- 2 files changed, 77 insertions(+), 121 deletions(-) rename qa/L0_request_cancellation/{client_cancellation_test.py => grpc_cancellation_test.py} (60%) diff --git a/qa/L0_request_cancellation/client_cancellation_test.py b/qa/L0_request_cancellation/grpc_cancellation_test.py similarity index 60% rename from qa/L0_request_cancellation/client_cancellation_test.py rename to qa/L0_request_cancellation/grpc_cancellation_test.py index c2bc0a0bf9..55027d5b08 100755 --- a/qa/L0_request_cancellation/client_cancellation_test.py +++ b/qa/L0_request_cancellation/grpc_cancellation_test.py @@ -26,10 +26,6 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import sys - -sys.path.append("../common") - import asyncio import queue import time @@ -37,9 +33,8 @@ from functools import partial import numpy as np -import test_util as tu import tritonclient.grpc as grpcclient -import tritonclient.grpc.aio as aiogrpcclient +import tritonclient.grpc.aio as grpcclientaio from tritonclient.utils import InferenceServerException @@ -55,102 +50,74 @@ def callback(user_data, result, error): user_data._completed_requests.put(result) -class ClientCancellationTest(tu.TestResultCollector): +class GrpcCancellationTest(unittest.TestCase): + _model_name = "custom_identity_int32" + _model_delay = 10.0 # seconds + _grpc_params = {"url": "localhost:8001", "verbose": True} + def setUp(self): - self.model_name_ = "custom_identity_int32" - self.input0_data_ = np.array([[10]], dtype=np.int32) - self._start_time_ms = 0 - self._end_time_ms = 0 + self._client = grpcclient.InferenceServerClient(**self._grpc_params) + self._client_aio = grpcclientaio.InferenceServerClient(**self._grpc_params) + self._user_data = UserData() + self._callback = partial(callback, self._user_data) + self._prepare_request() + self._record_start_time() - def _record_start_time_ms(self): - self._start_time_ms = int(round(time.time() * 1000)) + def tearDown(self): + self._record_end_time() + self._assert_max_duration() + self._assert_cancelled_by_client() - def _record_end_time_ms(self): - self._end_time_ms = int(round(time.time() * 1000)) + def _record_start_time(self): + self._start_time = time.time() # seconds - def _test_runtime_duration(self, upper_limit): - self.assertTrue( - (self._end_time_ms - self._start_time_ms) < upper_limit, - "test runtime expected less than " - + str(upper_limit) - + "ms response time, got " - + str(self._end_time_ms - self._start_time_ms) - + " ms", - ) + def _record_end_time(self): + self._end_time = time.time() # seconds def _prepare_request(self): - self.inputs_ = [] - self.inputs_.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32")) - self.outputs_ = [] - self.outputs_.append(grpcclient.InferRequestedOutput("OUTPUT0")) + self._inputs = [] + self._inputs.append(grpcclient.InferInput("INPUT0", [1, 1], "INT32")) + self._outputs = [] + self._outputs.append(grpcclient.InferRequestedOutput("OUTPUT0")) + self._inputs[0].set_data_from_numpy(np.array([[10]], dtype=np.int32)) + + def _assert_max_duration(self): + max_duration = self._model_delay * 0.5 # seconds + duration = self._end_time - self._start_time # seconds + self.assertLess( + duration, + max_duration, + "test runtime expected less than " + + str(max_duration) + + "s response time, got " + + str(duration) + + "s", + ) - self.inputs_[0].set_data_from_numpy(self.input0_data_) + def _assert_cancelled_by_client(self): + self.assertFalse(self._user_data._completed_requests.empty()) + data_item = self._user_data._completed_requests.get() + self.assertIsInstance(data_item, InferenceServerException) + self.assertIn("Locally cancelled by application!", str(data_item)) def test_grpc_async_infer(self): - # Sends a request using async_infer to a - # model that takes 10s to execute. Issues - # a cancellation request after 2s. The client - # should return with appropriate exception within - # 5s. - triton_client = grpcclient.InferenceServerClient( - url="localhost:8001", verbose=True + future = self._client.async_infer( + model_name=self._model_name, + inputs=self._inputs, + callback=self._callback, + outputs=self._outputs, ) - self._prepare_request() - - user_data = UserData() - - self._record_start_time_ms() - - with self.assertRaises(InferenceServerException) as cm: - future = triton_client.async_infer( - model_name=self.model_name_, - inputs=self.inputs_, - callback=partial(callback, user_data), - outputs=self.outputs_, - ) - time.sleep(2) - future.cancel() - - data_item = user_data._completed_requests.get() - if type(data_item) == InferenceServerException: - raise data_item - self.assertIn("Locally cancelled by application!", str(cm.exception)) - - self._record_end_time_ms() - self._test_runtime_duration(5000) + time.sleep(2) # ensure the inference has started + future.cancel() + time.sleep(0.1) # context switch def test_grpc_stream_infer(self): - # Sends a request using async_stream_infer to a - # model that takes 10s to execute. Issues stream - # closure with cancel_requests=True. The client - # should return with appropriate exception within - # 5s. - triton_client = grpcclient.InferenceServerClient( - url="localhost:8001", verbose=True + self._client.start_stream(callback=self._callback) + self._client.async_stream_infer( + model_name=self._model_name, inputs=self._inputs, outputs=self._outputs ) - - self._prepare_request() - user_data = UserData() - - triton_client.start_stream(callback=partial(callback, user_data)) - self._record_start_time_ms() - - with self.assertRaises(InferenceServerException) as cm: - for i in range(1): - triton_client.async_stream_infer( - model_name=self.model_name_, - inputs=self.inputs_, - outputs=self.outputs_, - ) - time.sleep(2) - triton_client.stop_stream(cancel_requests=True) - data_item = user_data._completed_requests.get() - if type(data_item) == InferenceServerException: - raise data_item - self.assertIn("Locally cancelled by application!", str(cm.exception)) - - self._record_end_time_ms() - self._test_runtime_duration(5000) + time.sleep(2) # ensure the inference has started + self._client.stop_stream(cancel_requests=True) # Disabling AsyncIO cancellation testing. Enable once @@ -170,8 +137,8 @@ def test_grpc_stream_infer(self): # _ = await anext(generator) # # async def test_aio_infer(self): -# triton_client = aiogrpcclient.InferenceServerClient( -# url="localhost:8001", verbose=True +# triton_client = grpcclientaio.InferenceServerClient( +# url=self._triton_grpc_url, verbose=True # ) # self._prepare_request() # self._record_start_time_ms() @@ -192,7 +159,7 @@ def test_grpc_stream_infer(self): # await task # # self._record_end_time_ms() -# self._test_runtime_duration(5000) +# self._assert_runtime_duration(5000) # # asyncio.run(test_aio_infer(self)) # @@ -203,8 +170,8 @@ def test_grpc_stream_infer(self): # # should return with appropriate exception within # # 5s. # async def test_aio_streaming_infer(self): -# async with aiogrpcclient.InferenceServerClient( -# url="localhost:8001", verbose=True +# async with grpcclientaio.InferenceServerClient( +# url=self._triton_grpc_url, verbose=True # ) as triton_client: # # async def async_request_iterator(): @@ -240,7 +207,7 @@ def test_grpc_stream_infer(self): # await task # # self._record_end_time_ms() -# self._test_runtime_duration(5000) +# self._assert_runtime_duration(5000) # # asyncio.run(test_aio_streaming_infer(self)) diff --git a/qa/L0_request_cancellation/test.sh b/qa/L0_request_cancellation/test.sh index 989d7ddd6d..bc36625bce 100755 --- a/qa/L0_request_cancellation/test.sh +++ b/qa/L0_request_cancellation/test.sh @@ -60,13 +60,13 @@ mkdir -p models/model/1 && (cd models/model && \ SERVER_LOG=server.log LD_LIBRARY_PATH=/opt/tritonserver/lib:$LD_LIBRARY_PATH ./request_cancellation_test > $SERVER_LOG if [ $? -ne 0 ]; then + echo -e "\n***\n*** Unit Tests Failed\n***" cat $SERVER_LOG - RET=1 + RET=1 fi - # -# gRPC client tests +# gRPC cancellation tests # rm -rf models && mkdir models mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \ @@ -78,16 +78,12 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \ echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \ echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt) -CLIENT_CANCELLATION_TEST=grpc_client_test.client_cancellation_test.py -TEST_RESULT_FILE='grpc_client_test.test_results.txt' -CLIENT_LOG=`pwd`/grpc_client_test.client.log -SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1" -# gRPC client-side cancellation tests... -for i in test_grpc_async_infer \ - test_grpc_stream_infer \ - ; do +for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer"; do + + TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log" + SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log" - SERVER_LOG=grpc_client_test.${i}.server.log + SERVER_ARGS="--model-repository=`pwd`/models --log-verbose=1" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -96,27 +92,20 @@ for i in test_grpc_async_infer \ fi set +e - python $CLIENT_CANCELLATION_TEST ClientCancellationTest.$i >>$CLIENT_LOG 2>&1 + python grpc_cancellation_test.py GrpcCancellationTest.$TEST_CASE > $TEST_LOG 2>&1 if [ $? -ne 0 ]; then - echo -e "\n***\n*** Test $i Failed\n***" >>$CLIENT_LOG - echo -e "\n***\n*** Test $i Failed\n***" - RET=1 - else - check_test_results $TEST_RESULT_FILE 1 - if [ $? -ne 0 ]; then - cat $CLIENT_LOG - echo -e "\n***\n*** Test Result Verification Failed\n***" - RET=1 - fi + echo -e "\n***\n*** gRPC Cancellation Tests Failed on $TEST_CASE\n***" + cat $TEST_LOG + RET=1 fi - set -e + kill $SERVER_PID wait $SERVER_PID done # -# End-to-end tests +# End-to-end scheduler tests # rm -rf models && mkdir models mkdir -p models/dynamic_batch/1 && (cd models/dynamic_batch && \ @@ -171,8 +160,8 @@ fi set +e python scheduler_test.py > $TEST_LOG 2>&1 if [ $? -ne 0 ]; then + echo -e "\n***\n*** Scheduler Tests Failed\n***" cat $TEST_LOG - echo -e "\n***\n*** Test Failed\n***" RET=1 fi set -e From c98e5ed34d415fdbdf7e657d75ebf612979950e5 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 10 Oct 2023 18:27:33 -0700 Subject: [PATCH 2/4] Add gRPC AsyncIO cancellation tests --- .../grpc_cancellation_test.py | 133 ++++-------------- qa/L0_request_cancellation/test.sh | 2 +- 2 files changed, 32 insertions(+), 103 deletions(-) diff --git a/qa/L0_request_cancellation/grpc_cancellation_test.py b/qa/L0_request_cancellation/grpc_cancellation_test.py index 55027d5b08..5be82141a0 100755 --- a/qa/L0_request_cancellation/grpc_cancellation_test.py +++ b/qa/L0_request_cancellation/grpc_cancellation_test.py @@ -50,7 +50,7 @@ def callback(user_data, result, error): user_data._completed_requests.put(result) -class GrpcCancellationTest(unittest.TestCase): +class GrpcCancellationTest(unittest.IsolatedAsyncioTestCase): _model_name = "custom_identity_int32" _model_delay = 10.0 # seconds _grpc_params = {"url": "localhost:8001", "verbose": True} @@ -61,18 +61,11 @@ def setUp(self): self._user_data = UserData() self._callback = partial(callback, self._user_data) self._prepare_request() - self._record_start_time() - - def tearDown(self): - self._record_end_time() - self._assert_max_duration() - self._assert_cancelled_by_client() - - def _record_start_time(self): self._start_time = time.time() # seconds - def _record_end_time(self): + def tearDown(self): self._end_time = time.time() # seconds + self._assert_max_duration() def _prepare_request(self): self._inputs = [] @@ -94,7 +87,7 @@ def _assert_max_duration(self): + "s", ) - def _assert_cancelled_by_client(self): + def _assert_callback_cancelled(self): self.assertFalse(self._user_data._completed_requests.empty()) data_item = self._user_data._completed_requests.get() self.assertIsInstance(data_item, InferenceServerException) @@ -110,6 +103,7 @@ def test_grpc_async_infer(self): time.sleep(2) # ensure the inference has started future.cancel() time.sleep(0.1) # context switch + self._assert_callback_cancelled() def test_grpc_stream_infer(self): self._client.start_stream(callback=self._callback) @@ -118,98 +112,33 @@ def test_grpc_stream_infer(self): ) time.sleep(2) # ensure the inference has started self._client.stop_stream(cancel_requests=True) + self._assert_callback_cancelled() - -# Disabling AsyncIO cancellation testing. Enable once -# DLIS-5476 is implemented. -# def test_aio_grpc_async_infer(self): -# # Sends a request using infer of grpc.aio to a -# # model that takes 10s to execute. Issues -# # a cancellation request after 2s. The client -# # should return with appropriate exception within -# # 5s. -# async def cancel_request(call): -# await asyncio.sleep(2) -# self.assertTrue(call.cancel()) -# -# async def handle_response(generator): -# with self.assertRaises(asyncio.exceptions.CancelledError) as cm: -# _ = await anext(generator) -# -# async def test_aio_infer(self): -# triton_client = grpcclientaio.InferenceServerClient( -# url=self._triton_grpc_url, verbose=True -# ) -# self._prepare_request() -# self._record_start_time_ms() -# -# generator = triton_client.infer( -# model_name=self.model_name_, -# inputs=self.inputs_, -# outputs=self.outputs_, -# get_call_obj=True, -# ) -# grpc_call = await anext(generator) -# -# tasks = [] -# tasks.append(asyncio.create_task(handle_response(generator))) -# tasks.append(asyncio.create_task(cancel_request(grpc_call))) -# -# for task in tasks: -# await task -# -# self._record_end_time_ms() -# self._assert_runtime_duration(5000) -# -# asyncio.run(test_aio_infer(self)) -# -# def test_aio_grpc_stream_infer(self): -# # Sends a request using stream_infer of grpc.aio -# # library model that takes 10s to execute. Issues -# # stream closure with cancel_requests=True. The client -# # should return with appropriate exception within -# # 5s. -# async def test_aio_streaming_infer(self): -# async with grpcclientaio.InferenceServerClient( -# url=self._triton_grpc_url, verbose=True -# ) as triton_client: -# -# async def async_request_iterator(): -# for i in range(1): -# await asyncio.sleep(1) -# yield { -# "model_name": self.model_name_, -# "inputs": self.inputs_, -# "outputs": self.outputs_, -# } -# -# self._prepare_request() -# self._record_start_time_ms() -# response_iterator = triton_client.stream_infer( -# inputs_iterator=async_request_iterator(), get_call_obj=True -# ) -# streaming_call = await anext(response_iterator) -# -# async def cancel_streaming(streaming_call): -# await asyncio.sleep(2) -# streaming_call.cancel() -# -# async def handle_response(response_iterator): -# with self.assertRaises(asyncio.exceptions.CancelledError) as cm: -# async for response in response_iterator: -# self.assertTrue(False, "Received an unexpected response!") -# -# tasks = [] -# tasks.append(asyncio.create_task(handle_response(response_iterator))) -# tasks.append(asyncio.create_task(cancel_streaming(streaming_call))) -# -# for task in tasks: -# await task -# -# self._record_end_time_ms() -# self._assert_runtime_duration(5000) -# -# asyncio.run(test_aio_streaming_infer(self)) + async def test_aio_grpc_async_infer(self): + infer_task = asyncio.create_task( + self._client_aio.infer( + model_name=self._model_name, inputs=self._inputs, outputs=self._outputs + ) + ) + await asyncio.sleep(2) # ensure the inference has started + self.assertTrue(infer_task.cancel()) + with self.assertRaises(asyncio.CancelledError): + await infer_task + + async def test_aio_grpc_stream_infer(self): + async def requests_generator(): + yield { + "model_name": self._model_name, + "inputs": self._inputs, + "outputs": self._outputs, + } + + responses_iterator = self._client_aio.stream_infer(requests_generator()) + await asyncio.sleep(2) # ensure the inference has started + self.assertTrue(responses_iterator.cancel()) + with self.assertRaises(asyncio.CancelledError): + async for result, error in responses_iterator: + self._callback(result, error) if __name__ == "__main__": diff --git a/qa/L0_request_cancellation/test.sh b/qa/L0_request_cancellation/test.sh index bc36625bce..23917ec16f 100755 --- a/qa/L0_request_cancellation/test.sh +++ b/qa/L0_request_cancellation/test.sh @@ -78,7 +78,7 @@ mkdir -p models/custom_identity_int32/1 && (cd models/custom_identity_int32 && \ echo 'instance_group [{ kind: KIND_CPU }]' >> config.pbtxt && \ echo -e 'parameters [{ key: "execute_delay_ms" \n value: { string_value: "10000" } }]' >> config.pbtxt) -for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer"; do +for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc_async_infer" "test_aio_grpc_stream_infer"; do TEST_LOG="./grpc_cancellation_test.$TEST_CASE.log" SERVER_LOG="grpc_cancellation_test.$TEST_CASE.server.log" From d17eb17a90c000c76f957e7cc63cc330a04d5413 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 16:50:18 -0700 Subject: [PATCH 3/4] Better check if a request is cancelled --- qa/L0_request_cancellation/grpc_cancellation_test.py | 2 +- qa/L0_request_cancellation/test.sh | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/qa/L0_request_cancellation/grpc_cancellation_test.py b/qa/L0_request_cancellation/grpc_cancellation_test.py index 5be82141a0..4a104dc389 100755 --- a/qa/L0_request_cancellation/grpc_cancellation_test.py +++ b/qa/L0_request_cancellation/grpc_cancellation_test.py @@ -121,7 +121,7 @@ async def test_aio_grpc_async_infer(self): ) ) await asyncio.sleep(2) # ensure the inference has started - self.assertTrue(infer_task.cancel()) + infer_task.cancel() with self.assertRaises(asyncio.CancelledError): await infer_task diff --git a/qa/L0_request_cancellation/test.sh b/qa/L0_request_cancellation/test.sh index 23917ec16f..4929be3a5f 100755 --- a/qa/L0_request_cancellation/test.sh +++ b/qa/L0_request_cancellation/test.sh @@ -98,6 +98,12 @@ for TEST_CASE in "test_grpc_async_infer" "test_grpc_stream_infer" "test_aio_grpc cat $TEST_LOG RET=1 fi + grep "Cancellation notification received for" $SERVER_LOG + if [ $? -ne 0 ]; then + echo -e "\n***\n*** Cancellation not received by server on $TEST_CASE\n***" + cat $SERVER_LOG + RET=1 + fi set -e kill $SERVER_PID From df3e06d9f742868a3bd8d952bcee58f85729d349 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 12 Oct 2023 17:21:31 -0700 Subject: [PATCH 4/4] Use f-string --- qa/L0_request_cancellation/grpc_cancellation_test.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/qa/L0_request_cancellation/grpc_cancellation_test.py b/qa/L0_request_cancellation/grpc_cancellation_test.py index 4a104dc389..fadaa291e8 100755 --- a/qa/L0_request_cancellation/grpc_cancellation_test.py +++ b/qa/L0_request_cancellation/grpc_cancellation_test.py @@ -80,11 +80,7 @@ def _assert_max_duration(self): self.assertLess( duration, max_duration, - "test runtime expected less than " - + str(max_duration) - + "s response time, got " - + str(duration) - + "s", + f"test runtime expected less than {max_duration}s response time, got {duration}s", ) def _assert_callback_cancelled(self):