Skip to content

Commit

Permalink
Revert "Add an asyncio-based load generator (#916)"
Browse files Browse the repository at this point in the history
This reverts commit c8f2de7.
  • Loading branch information
danielmitterdorfer committed Mar 9, 2020
1 parent c8f2de7 commit f4efdbe
Show file tree
Hide file tree
Showing 19 changed files with 926 additions and 2,026 deletions.
6 changes: 0 additions & 6 deletions create-notice.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,19 @@ function main {
printf "The source code can be obtained at https://github.com/certifi/python-certifi\n" >> "${OUTPUT_FILE}"
add_license "certifi" "https://raw.githubusercontent.com/certifi/python-certifi/master/LICENSE"
add_license "elasticsearch" "https://raw.githubusercontent.com/elastic/elasticsearch-py/master/LICENSE"
add_license "elasticsearch-async" "https://raw.githubusercontent.com/elastic/elasticsearch-py-async/master/LICENSE"
add_license "jinja2" "https://raw.githubusercontent.com/pallets/jinja/master/LICENSE.rst"
add_license "jsonschema" "https://raw.githubusercontent.com/Julian/jsonschema/master/COPYING"
add_license "psutil" "https://raw.githubusercontent.com/giampaolo/psutil/master/LICENSE"
add_license "py-cpuinfo" "https://raw.githubusercontent.com/workhorsy/py-cpuinfo/master/LICENSE"
add_license "tabulate" "https://bitbucket.org/astanin/python-tabulate/raw/03182bf9b8a2becbc54d17aa7e3e7dfed072c5f5/LICENSE"
add_license "thespian" "https://raw.githubusercontent.com/kquick/Thespian/master/LICENSE.txt"
add_license "boto3" "https://raw.githubusercontent.com/boto/boto3/develop/LICENSE"
add_license "yappi" "https://raw.githubusercontent.com/sumerc/yappi/master/LICENSE"

# transitive dependencies
# Jinja2 -> Markupsafe
add_license "Markupsafe" "https://raw.githubusercontent.com/pallets/markupsafe/master/LICENSE.rst"
# elasticsearch -> urllib3
add_license "urllib3" "https://raw.githubusercontent.com/shazow/urllib3/master/LICENSE.txt"
#elasticsearch_async -> aiohttp
add_license "aiohttp" "https://raw.githubusercontent.com/aio-libs/aiohttp/master/LICENSE.txt"
#elasticsearch_async -> async_timeout
add_license "async_timeout" "https://raw.githubusercontent.com/aio-libs/async-timeout/master/LICENSE"
# boto3 -> s3transfer
add_license "s3transfer" "https://raw.githubusercontent.com/boto/s3transfer/develop/LICENSE.txt"
# boto3 -> jmespath
Expand Down
46 changes: 16 additions & 30 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -881,15 +881,17 @@ In ``track.json`` set the ``operation-type`` to "percolate" (you can choose this

Then create a file ``track.py`` next to ``track.json`` and implement the following two functions::

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)
def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)


def register(registry):
registry.register_runner("percolate", percolate, async_runner=True)
registry.register_runner("percolate", percolate)


The function ``percolate`` is the actual runner and takes the following parameters:

Expand All @@ -904,25 +906,11 @@ This function can return:

Similar to a parameter source you also need to bind the name of your operation type to the function within ``register``.

To illustrate how to use custom return values, suppose we want to implement a custom runner that calls the `pending tasks API <https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-pending.html>`_ and returns the number of pending tasks as additional meta-data::

async def pending_tasks(es, params):
response = await es.cluster.pending_tasks()
return {
"weight": 1,
"unit": "ops",
"pending-tasks-count": len(response["tasks"])
}

def register(registry):
registry.register_runner("pending-tasks", pending_tasks, async_runner=True)


If you need more control, you can also implement a runner class. The example above, implemented as a class looks as follows::

class PercolateRunner:
async def __call__(self, es, params):
await es.percolate(
def __call__(self, es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
Expand All @@ -932,12 +920,10 @@ If you need more control, you can also implement a runner class. The example abo
return "percolate"

def register(registry):
registry.register_runner("percolate", PercolateRunner(), async_runner=True)

registry.register_runner("percolate", PercolateRunner())

The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function.

Runners also support Python's `asynchronous context manager <https://docs.python.org/3/reference/datamodel.html#async-context-managers>`_ interface. Rally uses a new context for each request. Implementing the asynchronous context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.
The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function. Runners also support Python's `context manager <https://docs.python.org/3/library/stdtypes.html#typecontextmanager>`_ interface. Rally uses a new context for each request. Implementing the context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.

If you have specified multiple Elasticsearch clusters using :ref:`target-hosts <command_line_reference_advanced_topics>` you can make Rally pass a dictionary of client connections instead of one for the ``default`` cluster in the ``es`` parameter.

Expand All @@ -952,14 +938,14 @@ Example (assuming Rally has been invoked specifying ``default`` and ``remote`` i
class CreateIndexInRemoteCluster:
multi_cluster = True

async def __call__(self, es, params):
await es["remote"].indices.create(index="remote-index")
def __call__(self, es, params):
es['remote'].indices.create(index='remote-index')

def __repr__(self, *args, **kwargs):
return "create-index-in-remote-cluster"

def register(registry):
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster(), async_runner=True)
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster())


.. note::
Expand Down
41 changes: 0 additions & 41 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
@@ -1,47 +1,6 @@
Migration Guide
===============

Migrating to Rally 1.5.0
------------------------

Runner API uses asyncio
^^^^^^^^^^^^^^^^^^^^^^^

In order to support more concurrent clients in the future, Rally is moving from a synchronous model to an asynchronous model internally. With Rally 1.5.0 all custom runners need to be implemented using async APIs and a new bool argument ``async_runner=True`` needs to be provided upon registration. Below is an example how to migrate a custom runner function.

A custom runner prior to Rally 1.5.0::

def percolate(es, params):
es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate)

With Rally 1.5.0, the implementation changes as follows::

async def percolate(es, params):
await es.percolate(
index="queries",
doc_type="content",
body=params["body"]
)

def register(registry):
registry.register_runner("percolate", percolate, async_runner=True)

Apply to the following changes for each custom runner:

* Prefix the function signature with ``async``.
* Add an ``await`` keyword before each Elasticsearch API call.
* Add ``async_runner=True`` as the last argument to the ``register_runner`` function.

For more details please refer to the updated documentation on :ref:`custom runners <adding_tracks_custom_runners>`.


Migrating to Rally 1.4.1
------------------------

Expand Down
131 changes: 0 additions & 131 deletions esrally/async_connection.py

This file was deleted.

37 changes: 0 additions & 37 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,43 +127,6 @@ def create(self):
import elasticsearch
return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)

def create_async(self):
# keep imports confined as we do some temporary patching to work around unsolved issues in the async ES connector
import elasticsearch
import elasticsearch_async
from aiohttp.client import ClientTimeout
import esrally.async_connection

# needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet
class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport):
def __init__(self, hosts, connection_class=esrally.async_connection.AIOHttpConnection, loop=None,
connection_pool_class=elasticsearch_async.connection_pool.AsyncConnectionPool,
sniff_on_start=False, raise_on_sniff_error=True, **kwargs):
super().__init__(hosts, connection_class, loop, connection_pool_class, sniff_on_start, raise_on_sniff_error, **kwargs)

if "timeout" in self.client_options and not isinstance(self.client_options["timeout"], ClientTimeout):
self.client_options["timeout"] = ClientTimeout(total=self.client_options["timeout"])
else:
# 10 seconds is the Elasticsearch default, ensure we always set a ClientTimeout object here
self.client_options["timeout"] = ClientTimeout(total=10)

# copy of AsyncElasticsearch as https://github.com/elastic/elasticsearch-py-async/pull/49 is not yet released.
# That PR (also) fixes the behavior reported in https://github.com/elastic/elasticsearch-py-async/issues/43.
class RallyAsyncElasticsearch(elasticsearch.Elasticsearch):
def __init__(self, hosts=None, transport_class=RallyAsyncTransport, **kwargs):
super().__init__(hosts, transport_class=transport_class, **kwargs)

async def __aenter__(self):
return self

async def __aexit__(self, _exc_type, _exc_val, _exc_tb):
yield self.transport.close()

return RallyAsyncElasticsearch(hosts=self.hosts,
transport_class=RallyAsyncTransport,
ssl_context=self.ssl_context,
**self.client_options)


def wait_for_rest_layer(es, max_attempts=40):
"""
Expand Down
3 changes: 0 additions & 3 deletions esrally/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,3 @@

# expose only the minimum API
from .driver import DriverActor, PrepareBenchmark, PreparationComplete, StartBenchmark, BenchmarkComplete, TaskFinished

# async API
from .async_driver import AsyncDriver, Timer
Loading

0 comments on commit f4efdbe

Please sign in to comment.