diff --git a/esrally/client/asynchronous.py b/esrally/client/asynchronous.py index 8d82475f4..2d869390d 100644 --- a/esrally/client/asynchronous.py +++ b/esrally/client/asynchronous.py @@ -90,6 +90,17 @@ async def send(self, conn: "Connection") -> "ClientResponse": return self.response +# we use EmptyStreamReader here because it overrides all methods with +# no-op implementations that we need. +class StaticStreamReader(aiohttp.streams.EmptyStreamReader): + def __init__(self, body): + super().__init__() + self.body = body + + async def read(self, n: int = -1) -> bytes: + return self.body.encode("utf-8") + + class StaticResponse(aiohttp.ClientResponse): def __init__( self, @@ -122,12 +133,10 @@ async def start(self, connection: "Connection") -> "ClientResponse": self._protocol = connection.protocol self._connection = connection self._headers = CIMultiDictProxy(CIMultiDict()) + self.content = StaticStreamReader(self.static_body) self.status = 200 return self - async def read(self): - return self.static_body.encode("utf-8") - class ResponseMatcher: def __init__(self, responses): diff --git a/esrally/client/factory.py b/esrally/client/factory.py index 4a4c0179f..3061aedc6 100644 --- a/esrally/client/factory.py +++ b/esrally/client/factory.py @@ -225,7 +225,16 @@ async def on_request_end(session, trace_config_ctx, params): trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) - trace_config.on_request_end.append(on_request_end) + # It is tempting to register this callback on `TraceConfig.on_request_end()`. However, aiohttp will call + # `TraceConfig.on_request_end()` when the *first* chunk of the response has been received. However, this can + # skew service time significantly if the response is large *and* it is streamed by Elasticsearch + # (see ChunkedToXContent in the Elasticsearch code base). + # + # Therefore, we register for `TraceConfig.on_response_chunk_received()` which is called multiple times. As + # Rally's implementation of the `on_request_end` callback handler updates the timestamp on every call, Rally + # will ultimately record the time when it received the *last* chunk. This is what we want because any code + # that is using the Elasticsearch client library can only act on the response once it is fully received. + trace_config.on_response_chunk_received.append(on_request_end) # ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout) trace_config.on_request_exception.append(on_request_end)