From 44f325887dc17905038f1b13020f5cc21a3bfa82 Mon Sep 17 00:00:00 2001 From: Jose Carrillo Date: Fri, 16 Aug 2024 12:28:45 -0500 Subject: [PATCH 1/4] Support ThreadPoolExecutor to serve simultaneous not-async requests The bigger change that comes with this is that you should NEVER call self.finish() in your code. You should just set the following to prevent further response modifications: self._finished = True --- tornado/web.py | 91 +++++++++++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/tornado/web.py b/tornado/web.py index 039396470f..aff182c3b1 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -70,6 +70,7 @@ async def main(): import hmac import http.cookies from inspect import isclass +from inspect import iscoroutinefunction from io import BytesIO import mimetypes import numbers @@ -96,6 +97,7 @@ async def main(): from tornado.log import access_log, app_log, gen_log from tornado import template from tornado.escape import utf8, _unicode +from tornado.ioloop import IOLoop from tornado.routing import ( AnyMatches, DefaultHostMatches, @@ -216,8 +218,10 @@ def __init__( self.application = application self.request = request self._headers_written = False + # When this flag is True, avoid further request writting, but finish() will be called later self._finished = False - self._auto_finish = True + self._skip_finish_fn = False + self._finish_called = False self._prepared_future = None self.ui = ObjectDict( (n, self._ui_method(m)) for n, m in application.ui_methods.items() @@ -892,7 +896,7 @@ def redirect( assert isinstance(status, int) and 300 <= status <= 399 self.set_status(status) self.set_header("Location", utf8(url)) - self.finish() + self._finished = True def write(self, chunk: Union[str, bytes, dict]) -> None: """Writes the given chunk to the output buffer. @@ -999,7 +1003,8 @@ def render(self, template_name: str, **kwargs: Any) -> "Future[None]": if html_bodies: hloc = html.index(b"") html = html[:hloc] + b"".join(html_bodies) + b"\n" + html[hloc:] - return self.finish(html) + self.write(html) + self._finished = True def render_linked_js(self, js_files: Iterable[str]) -> str: """Default method used to render the final js links for the @@ -1201,7 +1206,9 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non Now returns a `.Future` instead of ``None``. """ - if self._finished: + if self._skip_finish_fn: + return + if self._finish_called: raise RuntimeError("finish() called twice") if chunk is not None: @@ -1238,7 +1245,7 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non future = self.flush(include_footers=True) self.request.connection.finish() self._log() - self._finished = True + self._finish_called = True self.on_finish() self._break_cycles() return future @@ -1255,6 +1262,7 @@ def detach(self) -> iostream.IOStream: .. versionadded:: 5.1 """ self._finished = True + self._skip_finish_fn = True # TODO: add detach to HTTPConnection? return self.request.connection.detach() # type: ignore @@ -1276,15 +1284,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None: """ if self._headers_written: gen_log.error("Cannot send error response after headers written") - if not self._finished: - # If we get an error between writing headers and finishing, - # we are unlikely to be able to finish due to a - # Content-Length mismatch. Try anyway to release the - # socket. - try: - self.finish() - except Exception: - gen_log.error("Failed to flush partial response", exc_info=True) + self._finished = True return self.clear() @@ -1298,8 +1298,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None: self.write_error(status_code, **kwargs) except Exception: app_log.error("Uncaught exception in write_error", exc_info=True) - if not self._finished: - self.finish() + self._finished = True def write_error(self, status_code: int, **kwargs: Any) -> None: """Override to implement custom error pages. @@ -1318,13 +1317,13 @@ def write_error(self, status_code: int, **kwargs: Any) -> None: self.set_header("Content-Type", "text/plain") for line in traceback.format_exception(*kwargs["exc_info"]): self.write(line) - self.finish() else: - self.finish( + self.write( "%(code)d: %(message)s" "%(code)d: %(message)s" % {"code": status_code, "message": self._reason} ) + self._finished = True @property def locale(self) -> tornado.locale.Locale: @@ -1743,12 +1742,8 @@ def val(x: bytes) -> bytes: break return match - async def _execute( - self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes - ) -> None: - """Executes this request with the given output transforms.""" - self._transforms = transforms - try: + async def _execute_no_err(self, *args: bytes, **kwargs: bytes): + if True: if self.request.method not in self.SUPPORTED_METHODS: raise HTTPError(405) self.path_args = [self.decode_argument(arg) for arg in args] @@ -1782,27 +1777,46 @@ async def _execute( try: await self.request._body_future except iostream.StreamClosedError: - return + raise FinishExecute() + tornado_workers_executor = self.application.settings.get('tornado_workers_executor') method = getattr(self, self.request.method.lower()) - result = method(*self.path_args, **self.path_kwargs) - if result is not None: - result = await result - if self._auto_finish and not self._finished: - self.finish() + if iscoroutinefunction(method) or getattr(method, '__tornado_coroutine__', False): + result = await method(*self.path_args, **self.path_kwargs) + elif tornado_workers_executor: + result = await IOLoop.current().run_in_executor( + tornado_workers_executor, + functools.partial(method, *self.path_args, **self.path_kwargs)) + else: + result = method(*self.path_args, **self.path_kwargs) + + async def _execute( + self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes + ) -> None: + """Executes this request with the given output transforms.""" + self._transforms = transforms + try: + await self._execute_no_err(*args, **kwargs) + except FinishExecute: + return + except Finish as e: + self.write(*e.args) + self._finished = True except Exception as e: try: self._handle_request_exception(e) except Exception: app_log.error("Exception in exception handler", exc_info=True) - finally: - # Unset result to avoid circular references - result = None if self._prepared_future is not None and not self._prepared_future.done(): # In case we failed before setting _prepared_future, do it # now (to unblock the HTTP server). Note that this is not # in a finally block to avoid GC issues prior to Python 3.4. self._prepared_future.set_result(None) + finally: + try: + self.finish() + except Exception: + gen_log.error("Failed to flush response", exc_info=True) def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: """Implement this method to handle streamed request data. @@ -1830,11 +1844,6 @@ def _request_summary(self) -> str: ) def _handle_request_exception(self, e: BaseException) -> None: - if isinstance(e, Finish): - # Not an error; just finish the request without logging. - if not self._finished: - self.finish(*e.args) - return try: self.log_exception(*sys.exc_info()) except Exception: @@ -2518,6 +2527,11 @@ class Finish(Exception): pass +class FinishExecute(Exception): + """A convenience exception to just finish _execute() without calling finish()""" + pass + + class MissingArgumentError(HTTPError): """Exception raised by `RequestHandler.get_argument`. @@ -3148,6 +3162,7 @@ def initialize( def prepare(self) -> None: self.fallback(self.request) self._finished = True + self._skip_finish_fn = True self.on_finish() From 8cfd6d39332ce1b7a8286c80b5f98da07c851665 Mon Sep 17 00:00:00 2001 From: Jose Carrillo Date: Sat, 17 Aug 2024 00:30:20 -0500 Subject: [PATCH 2/4] Fix tests * render() now returns None, if it returned a future, we would not be able to put each method code inside a thread_pool_executor --- tornado/test/web_test.py | 4 ++-- tornado/web.py | 17 ++++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index fec66f39ac..6d3ba37ab3 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -262,10 +262,10 @@ def test_finish_method_return_future(self): self.assertIsInstance(self.final_return, Future) self.assertTrue(self.final_return.done()) - def test_render_method_return_future(self): + def test_render_method_return_none(self): response = self.fetch(self.get_url("/render")) self.assertEqual(response.code, 200) - self.assertIsInstance(self.final_return, Future) + self.assertTrue(self.final_return is None) class CookieTest(WebTestCase): diff --git a/tornado/web.py b/tornado/web.py index aff182c3b1..42ad340885 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -1246,6 +1246,7 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non self.request.connection.finish() self._log() self._finish_called = True + self._finished = True self.on_finish() self._break_cycles() return future @@ -1800,7 +1801,8 @@ async def _execute( except FinishExecute: return except Finish as e: - self.write(*e.args) + if e.args: + self.write(*e.args) self._finished = True except Exception as e: try: @@ -1813,10 +1815,11 @@ async def _execute( # in a finally block to avoid GC issues prior to Python 3.4. self._prepared_future.set_result(None) finally: - try: - self.finish() - except Exception: - gen_log.error("Failed to flush response", exc_info=True) + if not self._finish_called: + try: + self.finish() + except Exception: + gen_log.error("Failed to flush response", exc_info=True) def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: """Implement this method to handle streamed request data. @@ -2691,8 +2694,8 @@ def reset(cls) -> None: with cls._lock: cls._static_hashes = {} - def head(self, path: str) -> Awaitable[None]: - return self.get(path, include_body=False) + async def head(self, path: str) -> Awaitable[None]: + return await self.get(path, include_body=False) async def get(self, path: str, include_body: bool = True) -> None: # Set up our path instance variables. From 6ecc16927c366c8a9f13e614eda0b991db39bcc8 Mon Sep 17 00:00:00 2001 From: Jose Carrillo Date: Sat, 17 Aug 2024 13:05:05 -0500 Subject: [PATCH 3/4] Allow the usage of self.finish() as usual But it no longer returns a Future, just None, and it does not flush the results yet --- tornado/test/web_test.py | 24 +++++++++++++----------- tornado/web.py | 13 ++++++++----- tornado/websocket.py | 1 + 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 6d3ba37ab3..168159d7ea 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -251,16 +251,14 @@ def get(self): def get_app_kwargs(self): return dict(template_path="FinalReturnTest") - def test_finish_method_return_future(self): + def test_finish_method_return_none(self): response = self.fetch(self.get_url("/finish")) self.assertEqual(response.code, 200) - self.assertIsInstance(self.final_return, Future) - self.assertTrue(self.final_return.done()) + self.assertTrue(self.final_return is None) response = self.fetch(self.get_url("/finish"), method="POST", body=b"") self.assertEqual(response.code, 200) - self.assertIsInstance(self.final_return, Future) - self.assertTrue(self.final_return.done()) + self.assertTrue(self.final_return is None) def test_render_method_return_none(self): response = self.fetch(self.get_url("/render")) @@ -2531,8 +2529,11 @@ def get_handlers(self): class TooHigh(RequestHandler): def get(self): self.set_header("Content-Length", "42") + self.finish("ok") + + def _real_finish(self) -> "Future[None]": try: - self.finish("ok") + return super()._real_finish() except Exception as e: test.server_error = e raise @@ -2540,8 +2541,11 @@ def get(self): class TooLow(RequestHandler): def get(self): self.set_header("Content-Length", "2") + self.finish("hello") + + def _real_finish(self) -> "Future[None]": try: - self.finish("hello") + return super()._real_finish() except Exception as e: test.server_error = e raise @@ -2555,8 +2559,7 @@ def test_content_length_too_high(self): with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"): with ExpectLog( gen_log, - "(Cannot send error response after headers written" - "|Failed to flush partial response)", + "Failed to flush response", ): with self.assertRaises(HTTPClientError): self.fetch("/high", raise_error=True) @@ -2571,8 +2574,7 @@ def test_content_length_too_low(self): with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"): with ExpectLog( gen_log, - "(Cannot send error response after headers written" - "|Failed to flush partial response)", + "Failed to flush response", ): with self.assertRaises(HTTPClientError): self.fetch("/low", raise_error=True) diff --git a/tornado/web.py b/tornado/web.py index 42ad340885..56614d04f0 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -1191,7 +1191,12 @@ def flush(self, include_footers: bool = False) -> "Future[None]": future.set_result(None) return future - def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[None]": + def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> None: + if chunk is not None: + self.write(chunk) + self._finished = True + + def _real_finish(self) -> "Future[None]": """Finishes this response, ending the HTTP request. Passing a ``chunk`` to ``finish()`` is equivalent to passing that @@ -1211,9 +1216,6 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non if self._finish_called: raise RuntimeError("finish() called twice") - if chunk is not None: - self.write(chunk) - # Automatically support ETags and add the Content-Length header if # we have not flushed any content yet. if not self._headers_written: @@ -1817,8 +1819,9 @@ async def _execute( finally: if not self._finish_called: try: - self.finish() + self._real_finish() except Exception: + self.log_exception(*sys.exc_info()) gen_log.error("Failed to flush response", exc_info=True) def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]: diff --git a/tornado/websocket.py b/tornado/websocket.py index 8f0e0aefe8..d25b99b136 100644 --- a/tornado/websocket.py +++ b/tornado/websocket.py @@ -930,6 +930,7 @@ async def _accept_connection(self, handler: WebSocketHandler) -> None: handler.set_header("Connection", "Upgrade") handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler)) handler.finish() + handler._real_finish() self.stream = handler._detach_stream() From ccdd751d43cbf7c5c4f812e2074b25f25f409e23 Mon Sep 17 00:00:00 2001 From: Jose Carrillo Date: Sat, 17 Aug 2024 14:26:53 -0500 Subject: [PATCH 4/4] Await returned coroutines, but with a warning --- tornado/web.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tornado/web.py b/tornado/web.py index 56614d04f0..a7ec52d88b 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -60,6 +60,7 @@ async def main(): """ +from asyncio import iscoroutine import base64 import binascii import datetime @@ -1792,6 +1793,12 @@ async def _execute_no_err(self, *args: bytes, **kwargs: bytes): functools.partial(method, *self.path_args, **self.path_kwargs)) else: result = method(*self.path_args, **self.path_kwargs) + if result is not None: + if iscoroutine(result): + app_log.warn(f'{method} returned a coroutine, you should await your own coroutines') + await result + else: + app_log.warn(f'{method} returned {result}, it was ignored') async def _execute( self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes