Skip to content

Commit

Permalink
Add an asyncio-based load generator (#935)
Browse files Browse the repository at this point in the history
With this commit we add a new experimental subcommand `race-aync` to Rally. It
allows to specify significantly more clients than the current `race` subcommand.
The reason for this is that under the hood, `race-async` uses `asyncio` and runs
all clients in a single event loop. Contrary to that, `race` uses an actor
system under the hood and maps each client to one process.

As the new subcommand is very experimental and not yet meant to be used broadly,
there is no accompanying user documentation in this PR. Instead, we plan to
build on top of this PR and expand the load generator to take advantage of
multiple cores before we consider this usable in production (it will likely keep
its experimental status though).

In this PR we also implement a compatibility layer into the current load
generator so both work internally now with `asyncio`. Consequently, we have
already adapted all Rally tracks with a backwards-compatibility layer (see
elastic/rally-tracks#97 and elastic/rally-eventdata-track#80).

Closes #852
Relates #916
  • Loading branch information
danielmitterdorfer authored Mar 29, 2020
1 parent b33da19 commit 3b5eee2
Show file tree
Hide file tree
Showing 25 changed files with 2,656 additions and 1,099 deletions.
7 changes: 7 additions & 0 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,26 @@ function main {
printf "The source code can be obtained at https://github.com/certifi/python-certifi\n" >> "${OUTPUT_FILE}"
add_license "certifi" "https://raw.githubusercontent.com/certifi/python-certifi/master/LICENSE"
add_license "elasticsearch" "https://raw.githubusercontent.com/elastic/elasticsearch-py/master/LICENSE"
add_license "elasticsearch-async" "https://raw.githubusercontent.com/elastic/elasticsearch-py-async/master/LICENSE"
add_license "jinja2" "https://raw.githubusercontent.com/pallets/jinja/master/LICENSE.rst"
add_license "jsonschema" "https://raw.githubusercontent.com/Julian/jsonschema/master/COPYING"
add_license "psutil" "https://raw.githubusercontent.com/giampaolo/psutil/master/LICENSE"
add_license "py-cpuinfo" "https://raw.githubusercontent.com/workhorsy/py-cpuinfo/master/LICENSE"
add_license "tabulate" "https://bitbucket.org/astanin/python-tabulate/raw/03182bf9b8a2becbc54d17aa7e3e7dfed072c5f5/LICENSE"
add_license "thespian" "https://raw.githubusercontent.com/kquick/Thespian/master/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
add_license "yappi" "https://raw.githubusercontent.com/sumerc/yappi/master/LICENSE"
add_license "ijson" "https://raw.githubusercontent.com/ICRAR/ijson/master/LICENSE.txt"

# transitive dependencies
# Jinja2 -> Markupsafe
add_license "Markupsafe" "https://raw.githubusercontent.com/pallets/markupsafe/master/LICENSE.rst"
# elasticsearch -> urllib3
add_license "urllib3" "https://raw.githubusercontent.com/shazow/urllib3/master/LICENSE.txt"
#elasticsearch_async -> aiohttp
add_license "aiohttp" "https://raw.githubusercontent.com/aio-libs/aiohttp/master/LICENSE.txt"
#elasticsearch_async -> async_timeout
add_license "async_timeout" "https://raw.githubusercontent.com/aio-libs/async-timeout/master/LICENSE"
# boto3 -> s3transfer
add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt"
# boto3 -> jmespath
Expand Down
46 changes: 30 additions & 16 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -881,17 +881,15 @@ In ``track.json`` set the ``operation-type`` to "percolate" (you can choose this

Then create a file ``track.py`` next to ``track.json`` and implement the following two functions::

def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate)

registry.register_runner("percolate", percolate, async_runner=True)

The function ``percolate`` is the actual runner and takes the following parameters:

Expand All @@ -906,11 +904,25 @@ This function can return:

Similar to a parameter source you also need to bind the name of your operation type to the function within ``register``.

To illustrate how to use custom return values, suppose we want to implement a custom runner that calls the `pending tasks API <https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html>`_ and returns the number of pending tasks as additional meta-data::

async def pending_tasks(es, params):
response = await es.cluster.pending_tasks()
return {
"weight": 1,
"unit": "ops",
"pending-tasks-count": len(response["tasks"])
}

def register(registry):
registry.register_runner("pending-tasks", pending_tasks, async_runner=True)


If you need more control, you can also implement a runner class. The example above, implemented as a class looks as follows::

class PercolateRunner:
def __call__(self, es, params):
es.percolate(
async def __call__(self, es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
Expand All @@ -920,10 +932,12 @@ If you need more control, you can also implement a runner class. The example abo
return "percolate"

def register(registry):
registry.register_runner("percolate", PercolateRunner())
registry.register_runner("percolate", PercolateRunner(), async_runner=True)


The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function.

The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function. Runners also support Python's `context manager <https://docs.python.org/3/library/stdtypes.html#typecontextmanager>`_ interface. Rally uses a new context for each request. Implementing the context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.
Runners also support Python's `asynchronous context manager <https://docs.python.org/3/reference/datamodel.html#async-context-managers>`_ interface. Rally uses a new context for each request. Implementing the asynchronous context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.

If you have specified multiple Elasticsearch clusters using :ref:`target-hosts <command_line_reference_advanced_topics>` you can make Rally pass a dictionary of client connections instead of one for the ``default`` cluster in the ``es`` parameter.

Expand All @@ -938,14 +952,14 @@ Example (assuming Rally has been invoked specifying ``default`` and ``remote`` i
class CreateIndexInRemoteCluster:
multi_cluster = True

def __call__(self, es, params):
es['remote'].indices.create(index='remote-index')
async def __call__(self, es, params):
await es["remote"].indices.create(index="remote-index")

def __repr__(self, *args, **kwargs):
return "create-index-in-remote-cluster"

def register(registry):
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster())
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster(), async_runner=True)


.. note::
Expand Down
49 changes: 49 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,55 @@ Minimum Python version is 3.8.0

Rally 1.5.0 requires Python 3.8.0. Check the :ref:`updated installation instructions <install_python>` for more details.

Meta-Data for queries are omitted
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Rally 1.5.0 does not determine query meta-data anymore by default to reduce the risk of client-side bottlenecks. The following meta-data fields are affected:

* ``hits``
* ``hits_relation``
* ``timed_out``
* ``took``

If you still want to retrieve them (risking skewed results due to additional overhead), set the new property ``detailed-results`` to ``true`` for any operation of type ``search``.

Runner API uses asyncio
^^^^^^^^^^^^^^^^^^^^^^^

In order to support more concurrent clients in the future, Rally is moving from a synchronous model to an asynchronous model internally. With Rally 1.5.0 all custom runners need to be implemented using async APIs and a new bool argument ``async_runner=True`` needs to be provided upon registration. Below is an example how to migrate a custom runner function.

A custom runner prior to Rally 1.5.0::

def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate)

With Rally 1.5.0, the implementation changes as follows::

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate, async_runner=True)

Apply to the following changes for each custom runner:

* Prefix the function signature with ``async``.
* Add an ``await`` keyword before each Elasticsearch API call.
* Add ``async_runner=True`` as the last argument to the ``register_runner`` function.

For more details please refer to the updated documentation on :ref:`custom runners <adding_tracks_custom_runners>`.

Migrating to Rally 1.4.1
------------------------

Expand Down
8 changes: 8 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,17 @@ With the operation type ``search`` you can execute `request body searches <http:
2. Rally will not attempt to serialize the parameters and pass them as is. Always use "true" / "false" strings for boolean parameters (see example below).

* ``body`` (mandatory): The query body.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data about queries. As it analyzes the corresponding response in more detail, this might incur additional overhead which can skew measurement results. This flag is ineffective for scroll queries.
* ``pages`` (optional): Number of pages to retrieve. If this parameter is present, a scroll query will be executed. If you want to retrieve all result pages, use the value "all".
* ``results-per-page`` (optional): Number of documents to retrieve per page for scroll queries.

If ``detailed-results`` is set to ``true``, the following meta-data properties will be determined and stored:

* ``hits``
* ``hits_relation``
* ``timed_out``
* ``took``

Example::

{
Expand Down
134 changes: 134 additions & 0 deletions esrally/async_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import asyncio
import ssl
import warnings

import aiohttp
from aiohttp.client_exceptions import ServerFingerprintMismatch
import async_timeout

from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, ImproperlyConfigured, SSLError
from elasticsearch.connection import Connection
from elasticsearch.compat import urlencode
from elasticsearch.connection.http_urllib3 import create_ssl_context


# This is only needed because https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
# In addition we have raised the connection limit in TCPConnector from 100 to 10000.

# We want to keep the diff as small as possible thus suppressing pylint warnings that we would not allow in Rally
# pylint: disable=W0706
class AIOHttpConnection(Connection):
def __init__(self, host='localhost', port=9200, http_auth=None,
use_ssl=False, verify_certs=False, ca_certs=None, client_cert=None,
client_key=None, loop=None, use_dns_cache=True, headers=None,
ssl_context=None, trace_config=None, **kwargs):
super().__init__(host=host, port=port, **kwargs)

self.loop = asyncio.get_event_loop() if loop is None else loop

if http_auth is not None:
if isinstance(http_auth, str):
http_auth = tuple(http_auth.split(':', 1))

if isinstance(http_auth, (tuple, list)):
http_auth = aiohttp.BasicAuth(*http_auth)

headers = headers or {}
headers.setdefault('content-type', 'application/json')

# if providing an SSL context, raise error if any other SSL related flag is used
if ssl_context and (verify_certs or ca_certs):
raise ImproperlyConfigured("When using `ssl_context`, `use_ssl`, `verify_certs`, `ca_certs` are not permitted")

if use_ssl or ssl_context:
cafile = ca_certs
if not cafile and not ssl_context and verify_certs:
# If no ca_certs and no sslcontext passed and asking to verify certs
# raise error
raise ImproperlyConfigured("Root certificates are missing for certificate "
"validation. Either pass them in using the ca_certs parameter or "
"install certifi to use it automatically.")
if verify_certs or ca_certs:
warnings.warn('Use of `verify_certs`, `ca_certs` have been deprecated in favor of using SSLContext`', DeprecationWarning)

if not ssl_context:
# if SSLContext hasn't been passed in, create one.
# need to skip if sslContext isn't avail
try:
ssl_context = create_ssl_context(cafile=cafile)
except AttributeError:
ssl_context = None

if not verify_certs and ssl_context is not None:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
warnings.warn(
'Connecting to %s using SSL with verify_certs=False is insecure.' % host)
if ssl_context:
verify_certs = True
use_ssl = True

trace_configs = [trace_config] if trace_config else None

self.session = aiohttp.ClientSession(
auth=http_auth,
timeout=self.timeout,
connector=aiohttp.TCPConnector(
loop=self.loop,
verify_ssl=verify_certs,
use_dns_cache=use_dns_cache,
ssl_context=ssl_context,
# this has been changed from the default (100)
limit=100000
),
headers=headers,
trace_configs=trace_configs
)

self.base_url = 'http%s://%s:%d%s' % (
's' if use_ssl else '',
host, port, self.url_prefix
)

@asyncio.coroutine
def close(self):
yield from self.session.close()

@asyncio.coroutine
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None):
url_path = url
if params:
url_path = '%s?%s' % (url, urlencode(params or {}))
url = self.base_url + url_path

start = self.loop.time()
response = None
try:
with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop):
response = yield from self.session.request(method, url, data=body, headers=headers)
raw_data = yield from response.text()
duration = self.loop.time() - start

except asyncio.CancelledError:
raise

except Exception as e:
self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e)
if isinstance(e, ServerFingerprintMismatch):
raise SSLError('N/A', str(e), e)
if isinstance(e, asyncio.TimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)

finally:
if response is not None:
yield from response.release()

# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data)
self._raise_error(response.status, raw_data)

self.log_request_success(method, url, url_path, body, response.status, raw_data, duration)

return response.status, response.headers, raw_data
Loading

0 comments on commit 3b5eee2

Please sign in to comment.