Skip to content

Commit

Permalink
Measure time until last response chunk (#1823)
Browse files Browse the repository at this point in the history
Rally uses timers internally to measure when a request started and when
it has completed. However, it has stopped measuring as soon as the first
byte of the response has been received instead of waiting for the full
response. This can skew service time measurement if the response is
large and Elasticsearch streams it.

With this commit we wait until the last chunk of the response has been
received. This results in a more realistic service time metric in these
cases.

Closes #1822
  • Loading branch information
danielmitterdorfer authored Jan 17, 2024
1 parent 20ab3c6 commit 3833d53
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
15 changes: 12 additions & 3 deletions esrally/client/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ async def send(self, conn: "Connection") -> "ClientResponse":
return self.response


# we use EmptyStreamReader here because it overrides all methods with
# no-op implementations that we need.
class StaticStreamReader(aiohttp.streams.EmptyStreamReader):
def __init__(self, body):
super().__init__()
self.body = body

async def read(self, n: int = -1) -> bytes:
return self.body.encode("utf-8")


class StaticResponse(aiohttp.ClientResponse):
def __init__(
self,
Expand Down Expand Up @@ -122,12 +133,10 @@ async def start(self, connection: "Connection") -> "ClientResponse":
self._protocol = connection.protocol
self._connection = connection
self._headers = CIMultiDictProxy(CIMultiDict())
self.content = StaticStreamReader(self.static_body)
self.status = 200
return self

async def read(self):
return self.static_body.encode("utf-8")


class ResponseMatcher:
def __init__(self, responses):
Expand Down
11 changes: 10 additions & 1 deletion esrally/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,16 @@ async def on_request_end(session, trace_config_ctx, params):

trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
# It is tempting to register this callback on `TraceConfig.on_request_end()`. However, aiohttp will call
# `TraceConfig.on_request_end()` when the *first* chunk of the response has been received. However, this can
# skew service time significantly if the response is large *and* it is streamed by Elasticsearch
# (see ChunkedToXContent in the Elasticsearch code base).
#
# Therefore, we register for `TraceConfig.on_response_chunk_received()` which is called multiple times. As
# Rally's implementation of the `on_request_end` callback handler updates the timestamp on every call, Rally
# will ultimately record the time when it received the *last* chunk. This is what we want because any code
# that is using the Elasticsearch client library can only act on the response once it is fully received.
trace_config.on_response_chunk_received.append(on_request_end)
# ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout)
trace_config.on_request_exception.append(on_request_end)

Expand Down

0 comments on commit 3833d53

Please sign in to comment.