From b9c2a332029da12c2ae373614418c79604b05a8f Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 26 Sep 2023 15:48:51 -0700 Subject: [PATCH 1/7] Add cancelled response status test --- qa/L0_backend_python/lifecycle/lifecycle_test.py | 1 + qa/python_models/error_code/model.py | 1 + 2 files changed, 2 insertions(+) diff --git a/qa/L0_backend_python/lifecycle/lifecycle_test.py b/qa/L0_backend_python/lifecycle/lifecycle_test.py index 9c3bf7efa9..9d6a2f5c4d 100755 --- a/qa/L0_backend_python/lifecycle/lifecycle_test.py +++ b/qa/L0_backend_python/lifecycle/lifecycle_test.py @@ -70,6 +70,7 @@ def test_error_code(self): ("UNAVAILABLE", "[StatusCode.UNAVAILABLE]"), ("UNSUPPORTED", "[StatusCode.UNIMPLEMENTED]"), ("ALREADY_EXISTS", "[StatusCode.ALREADY_EXISTS]"), + ("CANCELLED", "[StatusCode.CANCELLED]"), ("(default)", "[StatusCode.INTERNAL] unrecognized"), ] with self._shm_leak_detector.Probe() as shm_probe: diff --git a/qa/python_models/error_code/model.py b/qa/python_models/error_code/model.py index 350457ca79..078a4afb73 100644 --- a/qa/python_models/error_code/model.py +++ b/qa/python_models/error_code/model.py @@ -37,6 +37,7 @@ def execute(self, requests): "UNAVAILABLE": pb_utils.TritonError.UNAVAILABLE, "UNSUPPORTED": pb_utils.TritonError.UNSUPPORTED, "ALREADY_EXISTS": pb_utils.TritonError.ALREADY_EXISTS, + "CANCELLED": pb_utils.TritonError.CANCELLED, } responses = [] From 6935249f474c7d88c1e85626f491a4797ca9b342 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 28 Sep 2023 15:32:27 -0700 Subject: [PATCH 2/7] Add Python backend request cancellation test --- .../lifecycle/lifecycle_test.py | 37 +++++++++++ qa/L0_backend_python/lifecycle/test.sh | 6 +- qa/python_models/execute_cancel/config.pbtxt | 47 ++++++++++++++ qa/python_models/execute_cancel/model.py | 64 +++++++++++++++++++ 4 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 qa/python_models/execute_cancel/config.pbtxt create mode 100644 qa/python_models/execute_cancel/model.py diff --git a/qa/L0_backend_python/lifecycle/lifecycle_test.py b/qa/L0_backend_python/lifecycle/lifecycle_test.py index 9d6a2f5c4d..82856bbd32 100755 --- a/qa/L0_backend_python/lifecycle/lifecycle_test.py +++ b/qa/L0_backend_python/lifecycle/lifecycle_test.py @@ -31,6 +31,7 @@ sys.path.append("../../common") import queue +import time import unittest from functools import partial @@ -92,6 +93,42 @@ def test_error_code(self): expected_grpc_error_start + " error code: " + error, ) + def test_execute_cancel(self): + model_name = "execute_cancel" + log_path = "lifecycle_server.log" + execute_delay = 4.0 # seconds + shape = [1, 1] + response = {"responded": False, "result": None, "error": None} + + def callback(result, error): + response["responded"] = True + response["result"] = result + response["error"] = error + + with self._shm_leak_detector.Probe() as shm_probe: + with grpcclient.InferenceServerClient("localhost:8001") as client: + input_data = np.array([[execute_delay]], dtype=np.float32) + inputs = [ + grpcclient.InferInput( + "EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype) + ) + ] + inputs[0].set_data_from_numpy(input_data) + exec_future = client.async_infer(model_name, inputs, callback) + time.sleep(2) # ensure the request is executing + self.assertFalse(response["responded"]) + exec_future.cancel() + time.sleep(2) # ensure the cancellation is delivered + self.assertTrue(response["responded"]) + + self.assertEqual(response["result"], None) + self.assertIsInstance(response["error"], InferenceServerException) + self.assertEqual(response["error"].status(), "StatusCode.CANCELLED") + with open(log_path, mode="r", encoding="utf-8", errors="strict") as f: + log_text = f.read() + self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) + self.assertIn("[execute_cancel] Request cancelled at ", log_text) + def test_batch_error(self): # The execute_error model returns an error for the first and third # request and successfully processes the second request. This is making diff --git a/qa/L0_backend_python/lifecycle/test.sh b/qa/L0_backend_python/lifecycle/test.sh index 2abf107813..eb7f868940 100755 --- a/qa/L0_backend_python/lifecycle/test.sh +++ b/qa/L0_backend_python/lifecycle/test.sh @@ -26,7 +26,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. CLIENT_LOG="./lifecycle_client.log" -EXPECTED_NUM_TESTS="4" +EXPECTED_NUM_TESTS="5" TEST_RESULT_FILE='test_results.txt' source ../common.sh source ../../common/util.sh @@ -44,6 +44,10 @@ mkdir -p models/error_code/1/ cp ../../python_models/error_code/model.py ./models/error_code/1/ cp ../../python_models/error_code/config.pbtxt ./models/error_code/ +mkdir -p models/execute_cancel/1/ +cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/ +cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/ + mkdir -p models/execute_error/1/ cp ../../python_models/execute_error/model.py ./models/execute_error/1/ cp ../../python_models/execute_error/config.pbtxt ./models/execute_error/ diff --git a/qa/python_models/execute_cancel/config.pbtxt b/qa/python_models/execute_cancel/config.pbtxt new file mode 100644 index 0000000000..df509863ad --- /dev/null +++ b/qa/python_models/execute_cancel/config.pbtxt @@ -0,0 +1,47 @@ +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +name: "execute_cancel" +backend: "python" +max_batch_size: 1 + +input [ + { + name: "EXECUTE_DELAY" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +output [ + { + name: "DUMMY_OUT" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +instance_group [{ kind: KIND_CPU }] diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py new file mode 100644 index 0000000000..519cd7d8bb --- /dev/null +++ b/qa/python_models/execute_cancel/model.py @@ -0,0 +1,64 @@ +# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import time + +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + def initialize(self, args): + self._logger = pb_utils.Logger + + def execute(self, requests): + responses = [] + for request in requests: + error = pb_utils.TritonError(message="not cancelled") + delay_tensor = pb_utils.get_input_tensor_by_name( + request, "EXECUTE_DELAY" + ).as_numpy() + delay = delay_tensor[0][0] # seconds + time_elapsed = 0.0 # seconds + while time_elapsed < delay: + time.sleep(1) + time_elapsed += 1.0 + if request.is_cancelled(): + self._logger.log_info( + "[execute_cancel] Request cancelled at " + + str(time_elapsed) + + " s" + ) + error = pb_utils.TritonError( + message="cancelled", code=pb_utils.TritonError.CANCELLED + ) + break + self._logger.log_info( + "[execute_cancel] Request not cancelled at " + + str(time_elapsed) + + " s" + ) + responses.append(pb_utils.InferenceResponse(error=error)) + return responses From 24fadc6ed17fe6cf3837f16be9750ce31aefb429 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Mon, 2 Oct 2023 15:04:14 -0700 Subject: [PATCH 3/7] Add Python backend decoupled request cancellation test --- .../decoupled/decoupled_test.py | 33 ++++++++++++ qa/L0_backend_python/decoupled/test.sh | 7 ++- qa/python_models/execute_cancel/model.py | 51 +++++++++++++++++-- 3 files changed, 87 insertions(+), 4 deletions(-) diff --git a/qa/L0_backend_python/decoupled/decoupled_test.py b/qa/L0_backend_python/decoupled/decoupled_test.py index 21fa8b757e..6760fb691a 100755 --- a/qa/L0_backend_python/decoupled/decoupled_test.py +++ b/qa/L0_backend_python/decoupled/decoupled_test.py @@ -256,6 +256,39 @@ def test_decoupled_send_after_close_error(self): "The completed request size must be zero.", ) + def test_decoupled_execute_cancel(self): + model_name = "execute_cancel" + log_path = "decoupled_server.log" + execute_delay = 4.0 # seconds + shape = [1, 1] + + user_data = UserData() + with grpcclient.InferenceServerClient("localhost:8001") as client: + client.start_stream(callback=partial(callback, user_data)) + input_data = np.array([[execute_delay]], dtype=np.float32) + inputs = [ + grpcclient.InferInput( + "EXECUTE_DELAY", shape, np_to_triton_dtype(input_data.dtype) + ) + ] + inputs[0].set_data_from_numpy(input_data) + client.async_stream_infer(model_name, inputs) + time.sleep(2) # model delay for decoupling request and response sender + time.sleep(2) # ensure the request is executing + client.stop_stream(cancel_requests=True) + time.sleep(2) # ensure the cancellation is delivered + + self.assertFalse(user_data._completed_requests.empty()) + while not user_data._completed_requests.empty(): + data_item = user_data._completed_requests.get() + self.assertIsInstance(data_item, InferenceServerException) + self.assertEqual(data_item.status(), "StatusCode.CANCELLED") + + with open(log_path, mode="r", encoding="utf-8", errors="strict") as f: + log_text = f.read() + self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) + self.assertIn("[execute_cancel] Request cancelled at ", log_text) + if __name__ == "__main__": unittest.main() diff --git a/qa/L0_backend_python/decoupled/test.sh b/qa/L0_backend_python/decoupled/test.sh index b4fa4ffe75..07c8f5b4ee 100755 --- a/qa/L0_backend_python/decoupled/test.sh +++ b/qa/L0_backend_python/decoupled/test.sh @@ -27,7 +27,7 @@ CLIENT_PY=./decoupled_test.py CLIENT_LOG="./decoupled_client.log" -EXPECTED_NUM_TESTS="5" +EXPECTED_NUM_TESTS="6" TEST_RESULT_FILE='test_results.txt' TRITON_DIR=${TRITON_DIR:="/opt/tritonserver"} SERVER=${TRITON_DIR}/bin/tritonserver @@ -50,6 +50,11 @@ mkdir -p models/dlpack_add_sub/1/ cp ../../python_models/dlpack_add_sub/model.py models/dlpack_add_sub/1/ cp ../../python_models/dlpack_add_sub/config.pbtxt models/dlpack_add_sub/ +mkdir -p models/execute_cancel/1/ +cp ../../python_models/execute_cancel/model.py ./models/execute_cancel/1/ +cp ../../python_models/execute_cancel/config.pbtxt ./models/execute_cancel/ +echo "model_transaction_policy { decoupled: True }" >> ./models/execute_cancel/config.pbtxt + git clone https://github.com/triton-inference-server/python_backend -b $PYTHON_BACKEND_REPO_TAG mkdir -p models/square_int32/1/ cp python_backend/examples/decoupled/square_model.py models/square_int32/1/model.py diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py index 519cd7d8bb..be2f695b7b 100644 --- a/qa/python_models/execute_cancel/model.py +++ b/qa/python_models/execute_cancel/model.py @@ -24,6 +24,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import json +import threading import time import triton_python_backend_utils as pb_utils @@ -32,20 +34,43 @@ class TritonPythonModel: def initialize(self, args): self._logger = pb_utils.Logger + self._model_config = json.loads(args["model_config"]) + self._using_decoupled = pb_utils.using_decoupled_model_transaction_policy( + self._model_config + ) def execute(self, requests): - responses = [] + processed_requests = [] for request in requests: - error = pb_utils.TritonError(message="not cancelled") delay_tensor = pb_utils.get_input_tensor_by_name( request, "EXECUTE_DELAY" ).as_numpy() delay = delay_tensor[0][0] # seconds + if self._using_decoupled: + processed_requests.append( + {"response_sender": request.get_response_sender(), "delay": delay} + ) + else: + processed_requests.append({"request": request, "delay": delay}) + if self._using_decoupled: + return self._execute_decoupled(processed_requests) + return self._execute_processed_requests(processed_requests) + + def _execute_processed_requests(self, processed_requests): + responses = [] + for processed_request in processed_requests: + error = pb_utils.TritonError(message="not cancelled") + object_to_check_cancelled = None + if "response_sender" in processed_request: + object_to_check_cancelled = processed_request["response_sender"] + elif "request" in processed_request: + object_to_check_cancelled = processed_request["request"] + delay = processed_request["delay"] # seconds time_elapsed = 0.0 # seconds while time_elapsed < delay: time.sleep(1) time_elapsed += 1.0 - if request.is_cancelled(): + if object_to_check_cancelled.is_cancelled(): self._logger.log_info( "[execute_cancel] Request cancelled at " + str(time_elapsed) @@ -62,3 +87,23 @@ def execute(self, requests): ) responses.append(pb_utils.InferenceResponse(error=error)) return responses + + def _execute_decoupled(self, processed_requests): + def response_thread(execute_processed_requests, processed_requests): + time.sleep(2) # execute after requests are released + responses = execute_processed_requests(processed_requests) + for i in range(len(responses)): # len(responses) == len(processed_requests) + response_sender = processed_requests[i]["response_sender"] + response = responses[i] + response_sender.send(response) + response_sender.send( + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) + + thread = threading.Thread( + target=response_thread, + args=(self._execute_processed_requests, processed_requests), + ) + thread.daemon = True + thread.start() + return None From eb7f913cb2b35431c96fa109826e0e288671cc8c Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 3 Oct 2023 17:28:23 -0700 Subject: [PATCH 4/7] Simplified response if cancelled --- qa/python_models/execute_cancel/model.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py index be2f695b7b..9e7c9aca33 100644 --- a/qa/python_models/execute_cancel/model.py +++ b/qa/python_models/execute_cancel/model.py @@ -59,7 +59,9 @@ def execute(self, requests): def _execute_processed_requests(self, processed_requests): responses = [] for processed_request in processed_requests: - error = pb_utils.TritonError(message="not cancelled") + response = pb_utils.InferenceResponse( + error=pb_utils.TritonError(message="not cancelled") + ) object_to_check_cancelled = None if "response_sender" in processed_request: object_to_check_cancelled = processed_request["response_sender"] @@ -76,16 +78,14 @@ def _execute_processed_requests(self, processed_requests): + str(time_elapsed) + " s" ) - error = pb_utils.TritonError( - message="cancelled", code=pb_utils.TritonError.CANCELLED - ) + response = None break self._logger.log_info( "[execute_cancel] Request not cancelled at " + str(time_elapsed) + " s" ) - responses.append(pb_utils.InferenceResponse(error=error)) + responses.append(response) return responses def _execute_decoupled(self, processed_requests): @@ -93,12 +93,13 @@ def response_thread(execute_processed_requests, processed_requests): time.sleep(2) # execute after requests are released responses = execute_processed_requests(processed_requests) for i in range(len(responses)): # len(responses) == len(processed_requests) - response_sender = processed_requests[i]["response_sender"] response = responses[i] - response_sender.send(response) - response_sender.send( - flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL - ) + if response != None: + response_sender = processed_requests[i]["response_sender"] + response_sender.send(response) + response_sender.send( + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) thread = threading.Thread( target=response_thread, From 35728b4e4865786533cb8db27f9097f241681b0a Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 3 Oct 2023 19:20:53 -0700 Subject: [PATCH 5/7] Test response_sender.send() after closed --- .../decoupled/decoupled_test.py | 4 ++-- qa/python_models/execute_cancel/model.py | 24 ++++++++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/qa/L0_backend_python/decoupled/decoupled_test.py b/qa/L0_backend_python/decoupled/decoupled_test.py index 6760fb691a..f0ca870664 100755 --- a/qa/L0_backend_python/decoupled/decoupled_test.py +++ b/qa/L0_backend_python/decoupled/decoupled_test.py @@ -286,8 +286,8 @@ def test_decoupled_execute_cancel(self): with open(log_path, mode="r", encoding="utf-8", errors="strict") as f: log_text = f.read() - self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) - self.assertIn("[execute_cancel] Request cancelled at ", log_text) + self.assertIn("[execute_cancel] Request not cancelled at 1.0 s", log_text) + self.assertIn("[execute_cancel] Request cancelled at ", log_text) if __name__ == "__main__": diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py index 9e7c9aca33..f54b27c585 100644 --- a/qa/python_models/execute_cancel/model.py +++ b/qa/python_models/execute_cancel/model.py @@ -28,6 +28,7 @@ import threading import time +import numpy as np import triton_python_backend_utils as pb_utils @@ -93,13 +94,34 @@ def response_thread(execute_processed_requests, processed_requests): time.sleep(2) # execute after requests are released responses = execute_processed_requests(processed_requests) for i in range(len(responses)): # len(responses) == len(processed_requests) + response_sender = processed_requests[i]["response_sender"] response = responses[i] if response != None: - response_sender = processed_requests[i]["response_sender"] response_sender.send(response) response_sender.send( flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL ) + else: + # test response_sender.send() after it is closed upon checking for + # confirmed request cancelled + dummy_tensor = pb_utils.Tensor( + "DUMMY_OUT", np.array([[1]], np.single) + ) + dummy_response = pb_utils.InferenceResponse([dummy_tensor]) + raised_exception = False + try: + response_sender.send(dummy_response) + except pb_utils.TritonModelException as e: + if ( + "Unable to send response. Response sender has been closed." + not in str(e) + ): + raise e + raised_exception = True + if not raised_exception: + raise pb_utils.TritonModelException( + "response_sender did not raise an exception on send after closed" + ) thread = threading.Thread( target=response_thread, From 0b62641606512af3e6d2d4cc2f671c7ef1dc6271 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 5 Oct 2023 00:16:20 -0700 Subject: [PATCH 6/7] Rollback test response_sender.send() after closed --- qa/python_models/execute_cancel/model.py | 28 +++--------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py index f54b27c585..da601e8e3d 100644 --- a/qa/python_models/execute_cancel/model.py +++ b/qa/python_models/execute_cancel/model.py @@ -28,7 +28,6 @@ import threading import time -import numpy as np import triton_python_backend_utils as pb_utils @@ -98,30 +97,9 @@ def response_thread(execute_processed_requests, processed_requests): response = responses[i] if response != None: response_sender.send(response) - response_sender.send( - flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL - ) - else: - # test response_sender.send() after it is closed upon checking for - # confirmed request cancelled - dummy_tensor = pb_utils.Tensor( - "DUMMY_OUT", np.array([[1]], np.single) - ) - dummy_response = pb_utils.InferenceResponse([dummy_tensor]) - raised_exception = False - try: - response_sender.send(dummy_response) - except pb_utils.TritonModelException as e: - if ( - "Unable to send response. Response sender has been closed." - not in str(e) - ): - raise e - raised_exception = True - if not raised_exception: - raise pb_utils.TritonModelException( - "response_sender did not raise an exception on send after closed" - ) + response_sender.send( + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + ) thread = threading.Thread( target=response_thread, From 2dc44a845587b24a59d5a00cb1336cc28e70fac6 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Thu, 5 Oct 2023 12:54:00 -0700 Subject: [PATCH 7/7] Rollback non-decoupled any response on cancel --- qa/python_models/execute_cancel/model.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/qa/python_models/execute_cancel/model.py b/qa/python_models/execute_cancel/model.py index da601e8e3d..ec7b96ec1a 100644 --- a/qa/python_models/execute_cancel/model.py +++ b/qa/python_models/execute_cancel/model.py @@ -59,9 +59,7 @@ def execute(self, requests): def _execute_processed_requests(self, processed_requests): responses = [] for processed_request in processed_requests: - response = pb_utils.InferenceResponse( - error=pb_utils.TritonError(message="not cancelled") - ) + error = pb_utils.TritonError(message="not cancelled") object_to_check_cancelled = None if "response_sender" in processed_request: object_to_check_cancelled = processed_request["response_sender"] @@ -78,14 +76,16 @@ def _execute_processed_requests(self, processed_requests): + str(time_elapsed) + " s" ) - response = None + error = pb_utils.TritonError( + message="cancelled", code=pb_utils.TritonError.CANCELLED + ) break self._logger.log_info( "[execute_cancel] Request not cancelled at " + str(time_elapsed) + " s" ) - responses.append(response) + responses.append(pb_utils.InferenceResponse(error=error)) return responses def _execute_decoupled(self, processed_requests): @@ -94,9 +94,7 @@ def response_thread(execute_processed_requests, processed_requests): responses = execute_processed_requests(processed_requests) for i in range(len(responses)): # len(responses) == len(processed_requests) response_sender = processed_requests[i]["response_sender"] - response = responses[i] - if response != None: - response_sender.send(response) + response_sender.send(responses[i]) response_sender.send( flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL )