From 4cf646294af7095447faf90bdfb7d5d2b340b996 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Mon, 29 Jul 2024 16:42:43 -0700 Subject: [PATCH 1/6] Move streaming cancellation check into response loop --- src/model.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/model.py b/src/model.py index c9517208..35d43116 100644 --- a/src/model.py +++ b/src/model.py @@ -287,10 +287,16 @@ def response_loop(self): # To signal shutdown a None item will be added to the queue. if item is None: break - response_sender, response, response_flag = item + response_state, response, response_flag = item del item + response_sender = response_state[0] try: response_sender.send(response, response_flag) + last_response_ready = response_state[2] + # Stop checking for cancellation if the last response is generated. + if not last_response_ready: + is_cancelled = response_sender.is_cancelled() + response_state[1] = is_cancelled except Exception as e: self.logger.log_error( f"An error occurred while sending a response: {e}" @@ -298,6 +304,7 @@ def response_loop(self): finally: if response_flag == pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL: self.ongoing_request_count -= 1 + del response_state del response_sender if self.ongoing_request_count == 0: gc.collect() @@ -342,6 +349,11 @@ async def generate(self, request): Forwards single request to LLM engine and returns responses. """ response_sender = request.get_response_sender() + response_state = [ + response_sender, + False, # is cancelled + False, # last response ready to be sent + ] self.ongoing_request_count += 1 decrement_ongoing_request_count = True try: @@ -403,10 +415,26 @@ async def generate(self, request): ) async for output in response_iterator: - if response_sender.is_cancelled(): + is_cancelled = response_state[1] + if not stream: + is_cancelled = response_sender.is_cancelled() + if is_cancelled: self.logger.log_info("[vllm] Cancelling the request") await self.llm_engine.abort(request_id) self.logger.log_info("[vllm] Successfully cancelled the request") + if stream: + response_state[2] = True # last response ready to be sent + response = pb_utils.InferenceResponse( + error=pb_utils.TritonError( + message="user cancelled", + code=pb_utils.TritonError.CANCELLED, + ) + ) + flags = pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL + decrement_ongoing_request_count = False + self._response_queue.put_nowait( + (response_state, response, flags) + ) break if stream: prev_outputs_lengths = None @@ -418,9 +446,10 @@ async def generate(self, request): response = self.create_stream_response(output, prev_outputs_lengths) flags = 0 if output.finished: + response_state[2] = True # last response ready to be sent flags = pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL decrement_ongoing_request_count = False - self._response_queue.put_nowait((response_sender, response, flags)) + self._response_queue.put_nowait((response_state, response, flags)) prev_outputs = output last_output = output @@ -447,6 +476,7 @@ async def generate(self, request): finally: if decrement_ongoing_request_count: self.ongoing_request_count -= 1 + del response_state del response_sender if self.ongoing_request_count == 0: gc.collect() From efe0419d087fc595b47c5db85f68c3898e0ce203 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Mon, 29 Jul 2024 18:02:33 -0700 Subject: [PATCH 2/6] Check for cancellation every other response --- src/model.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/model.py b/src/model.py index 35d43116..9f2844e3 100644 --- a/src/model.py +++ b/src/model.py @@ -289,12 +289,14 @@ def response_loop(self): break response_state, response, response_flag = item del item - response_sender = response_state[0] + response_sender, number_response_sent = response_state[0], response_state[3] try: response_sender.send(response, response_flag) + response_state[3] = number_response_sent + 1 + # Check for cancellation only if the last response is not yet generated + # and for every 10 response. last_response_ready = response_state[2] - # Stop checking for cancellation if the last response is generated. - if not last_response_ready: + if not last_response_ready and number_response_sent % 2 == 0: is_cancelled = response_sender.is_cancelled() response_state[1] = is_cancelled except Exception as e: @@ -353,6 +355,7 @@ async def generate(self, request): response_sender, False, # is cancelled False, # last response ready to be sent + 0, # number of response sent ] self.ongoing_request_count += 1 decrement_ongoing_request_count = True From 1187ce9cd31bf7bde1ddafda170ff0585ef3e900 Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Tue, 30 Jul 2024 12:40:39 -0700 Subject: [PATCH 3/6] Revert "Check for cancellation every other response" This reverts commit efe0419d087fc595b47c5db85f68c3898e0ce203. --- src/model.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/model.py b/src/model.py index 9f2844e3..35d43116 100644 --- a/src/model.py +++ b/src/model.py @@ -289,14 +289,12 @@ def response_loop(self): break response_state, response, response_flag = item del item - response_sender, number_response_sent = response_state[0], response_state[3] + response_sender = response_state[0] try: response_sender.send(response, response_flag) - response_state[3] = number_response_sent + 1 - # Check for cancellation only if the last response is not yet generated - # and for every 10 response. last_response_ready = response_state[2] - if not last_response_ready and number_response_sent % 2 == 0: + # Stop checking for cancellation if the last response is generated. + if not last_response_ready: is_cancelled = response_sender.is_cancelled() response_state[1] = is_cancelled except Exception as e: @@ -355,7 +353,6 @@ async def generate(self, request): response_sender, False, # is cancelled False, # last response ready to be sent - 0, # number of response sent ] self.ongoing_request_count += 1 decrement_ongoing_request_count = True From 29942e2517cae6f7c5f1595493324eafe4c5d36b Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:14:56 -0700 Subject: [PATCH 4/6] Refactor response state object --- src/model.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/model.py b/src/model.py index 35d43116..d22d6c71 100644 --- a/src/model.py +++ b/src/model.py @@ -289,14 +289,12 @@ def response_loop(self): break response_state, response, response_flag = item del item - response_sender = response_state[0] + response_sender = response_state["response_sender"] try: response_sender.send(response, response_flag) - last_response_ready = response_state[2] # Stop checking for cancellation if the last response is generated. - if not last_response_ready: - is_cancelled = response_sender.is_cancelled() - response_state[1] = is_cancelled + if not response_state["last_response_generated"]: + response_state["is_cancelled"] = response_sender.is_cancelled() except Exception as e: self.logger.log_error( f"An error occurred while sending a response: {e}" @@ -349,11 +347,11 @@ async def generate(self, request): Forwards single request to LLM engine and returns responses. """ response_sender = request.get_response_sender() - response_state = [ - response_sender, - False, # is cancelled - False, # last response ready to be sent - ] + response_state = { + "response_sender": response_sender, + "is_cancelled": False, + "last_response_generated": False, # last response ready but not yet sent + } self.ongoing_request_count += 1 decrement_ongoing_request_count = True try: @@ -415,7 +413,7 @@ async def generate(self, request): ) async for output in response_iterator: - is_cancelled = response_state[1] + is_cancelled = response_state["is_cancelled"] if not stream: is_cancelled = response_sender.is_cancelled() if is_cancelled: @@ -423,7 +421,7 @@ async def generate(self, request): await self.llm_engine.abort(request_id) self.logger.log_info("[vllm] Successfully cancelled the request") if stream: - response_state[2] = True # last response ready to be sent + response_state["last_response_generated"] = True response = pb_utils.InferenceResponse( error=pb_utils.TritonError( message="user cancelled", @@ -446,7 +444,7 @@ async def generate(self, request): response = self.create_stream_response(output, prev_outputs_lengths) flags = 0 if output.finished: - response_state[2] = True # last response ready to be sent + response_state["last_response_generated"] = True flags = pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL decrement_ongoing_request_count = False self._response_queue.put_nowait((response_state, response, flags)) From 0b996e85c011ba26e928410b2039fe7b6bffdf3a Mon Sep 17 00:00:00 2001 From: kthui <18255193+kthui@users.noreply.github.com> Date: Wed, 31 Jul 2024 16:54:59 -0700 Subject: [PATCH 5/6] Update cancellation message --- src/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/model.py b/src/model.py index d22d6c71..9bcd6895 100644 --- a/src/model.py +++ b/src/model.py @@ -424,7 +424,7 @@ async def generate(self, request): response_state["last_response_generated"] = True response = pb_utils.InferenceResponse( error=pb_utils.TritonError( - message="user cancelled", + message="cancelled on the client side", code=pb_utils.TritonError.CANCELLED, ) ) From 6b7e2417a192dc8a372aa6cf792b62e22c4f12e1 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Tue, 6 Aug 2024 16:26:46 -0700 Subject: [PATCH 6/6] Update cancellation message 2 Co-authored-by: Iman Tabrizian --- src/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/model.py b/src/model.py index 9bcd6895..8a8c754c 100644 --- a/src/model.py +++ b/src/model.py @@ -424,7 +424,7 @@ async def generate(self, request): response_state["last_response_generated"] = True response = pb_utils.InferenceResponse( error=pb_utils.TritonError( - message="cancelled on the client side", + message="Request was cancelled", code=pb_utils.TritonError.CANCELLED, ) )