diff --git a/docs/command_line_reference.rst b/docs/command_line_reference.rst index b4595de87..4e96a1abb 100644 --- a/docs/command_line_reference.rst +++ b/docs/command_line_reference.rst @@ -765,31 +765,34 @@ Define a JSON file containing a list of objects with the following properties: * ``path``: A path or path pattern that should be matched. Only leading and trailing wildcards (``*``) are supported. A path containing only a wildcard acts matches any path. * ``body``: The respective response body. +* ``body-encoding``: Either ``raw`` or ``json``. Use ``json`` by default and ``raw`` for the operation-type ``bulk`` and ``search``. Here we define the necessary responses for a track that bulk-indexes data:: [ - { - "path": "/_cluster/settings", - "body": { - "transient": { - "action.destructive_requires_name": "true" - } - } - }, { "path": "*/_bulk", "body": { "errors": false, "took": 1 - } + }, + "body-encoding": "raw" }, { "path": "/_cluster/health*", "body": { "status": "green", "relocating_shards": 0 - } + }, + "body-encoding": "json" + }, + { + "path": "/_cluster/settings", + "body": { + "persistent": {}, + "transient": {} + }, + "body-encoding": "json" }, { "path": "/_all/_stats/_all", @@ -801,11 +804,13 @@ Here we define the necessary responses for a track that bulk-indexes data:: } } } - } + }, + "body-encoding": "json" }, { "path": "*", - "body": {} + "body": {}, + "body-encoding": "json" } ] diff --git a/docs/migrate.rst b/docs/migrate.rst index 5401bfadd..c33322a20 100644 --- a/docs/migrate.rst +++ b/docs/migrate.rst @@ -4,10 +4,6 @@ Migration Guide Migrating to Rally 2.7.1 ------------------------ -Elasticsearch client logs are now captured by the `elastic_transport `_ logger -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Rally migrated to the 8.x version of the ``elasticsearch-py`` library which uses a new logger named ``elastic_transport``. Rally will automatically configure this logger to only emit logs of level ``WARNING`` and above, even if a past Rally version configured logging using the ``~./rally/logging.json`` file without that logger. - Snapshot repository plugins are no longer built from source ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/esrally/client/asynchronous.py b/esrally/client/asynchronous.py index 7ca1c3f9b..e76f83995 100644 --- a/esrally/client/asynchronous.py +++ b/esrally/client/asynchronous.py @@ -18,34 +18,18 @@ import asyncio import json import logging -import warnings -from typing import Any, Iterable, List, Mapping, Optional +from typing import List, Optional import aiohttp +import elasticsearch from aiohttp import BaseConnector, RequestInfo from aiohttp.client_proto import ResponseHandler from aiohttp.helpers import BaseTimerContext -from elastic_transport import ( - AiohttpHttpNode, - ApiResponse, - AsyncTransport, - BinaryApiResponse, - HeadApiResponse, - ListApiResponse, - ObjectApiResponse, - TextApiResponse, -) -from elastic_transport.client_utils import DEFAULT -from elasticsearch import AsyncElasticsearch -from elasticsearch._async.client import IlmClient -from elasticsearch.compat import warn_stacklevel -from elasticsearch.exceptions import HTTP_EXCEPTIONS, ApiError, ElasticsearchWarning from multidict import CIMultiDict, CIMultiDictProxy from yarl import URL -from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query from esrally.client.context import RequestContextHolder -from esrally.utils import io, versions +from esrally.utils import io class StaticTransport: @@ -58,9 +42,6 @@ def is_closing(self): def close(self): self.closed = True - def abort(self): - self.close() - class StaticConnector(BaseConnector): async def _create_connection(self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout") -> ResponseHandler: @@ -125,8 +106,21 @@ async def start(self, connection: "Connection") -> "ClientResponse": self.status = 200 return self - async def read(self): - return self.static_body.encode("utf-8") + async def text(self, encoding=None, errors="strict"): + return self.static_body + + +class RawClientResponse(aiohttp.ClientResponse): + """ + Returns the body as bytes object (instead of a str) to avoid decoding overhead. + """ + + async def text(self, encoding=None, errors="strict"): + """Read response payload and decode.""" + if self._body is None: + await self.read() + + return self._body class ResponseMatcher: @@ -145,7 +139,14 @@ def __init__(self, responses): else: matcher = ResponseMatcher.equals(path) - body = json.dumps(response["body"]) + body = response["body"] + body_encoding = response.get("body-encoding", "json") + if body_encoding == "raw": + body = json.dumps(body).encode("utf-8") + elif body_encoding == "json": + body = json.dumps(body) + else: + raise ValueError(f"Unknown body encoding [{body_encoding}] for path [{path}]") self.responses.append((path, matcher, body)) @@ -184,192 +185,94 @@ def response(self, path): return body -class RallyAiohttpHttpNode(AiohttpHttpNode): - def __init__(self, config): - super().__init__(config) - self._loop = None - self.trace_configs = None - self.enable_cleanup_closed = None - self._static_responses = None - self._request_class = aiohttp.ClientRequest - self._response_class = aiohttp.ClientResponse - - @property - def static_responses(self): - return self._static_responses - - @static_responses.setter - def static_responses(self, static_responses): - self._static_responses = static_responses - if self._static_responses: +class AIOHttpConnection(elasticsearch.AIOHttpConnection): + def __init__( + self, + host="localhost", + port=None, + http_auth=None, + use_ssl=False, + ssl_assert_fingerprint=None, + headers=None, + ssl_context=None, + http_compress=None, + cloud_id=None, + api_key=None, + opaque_id=None, + loop=None, + trace_config=None, + **kwargs, + ): + super().__init__( + host=host, + port=port, + http_auth=http_auth, + use_ssl=use_ssl, + ssl_assert_fingerprint=ssl_assert_fingerprint, + # provided to the base class via `maxsize` to keep base class state consistent despite Rally + # calling the attribute differently. + maxsize=kwargs.get("max_connections", 0), + headers=headers, + ssl_context=ssl_context, + http_compress=http_compress, + cloud_id=cloud_id, + api_key=api_key, + opaque_id=opaque_id, + loop=loop, + **kwargs, + ) + + self._trace_configs = [trace_config] if trace_config else None + self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", True) + + static_responses = kwargs.get("static_responses") + self.use_static_responses = static_responses is not None + + if self.use_static_responses: # read static responses once and reuse them if not StaticRequest.RESPONSES: - with open(io.normalize_path(self._static_responses)) as f: + with open(io.normalize_path(static_responses)) as f: StaticRequest.RESPONSES = ResponseMatcher(json.load(f)) self._request_class = StaticRequest self._response_class = StaticResponse + else: + self._request_class = aiohttp.ClientRequest + self._response_class = RawClientResponse - def _create_aiohttp_session(self): - if self._loop is None: - self._loop = asyncio.get_running_loop() + async def _create_aiohttp_session(self): + if self.loop is None: + self.loop = asyncio.get_running_loop() - if self._static_responses: - connector = StaticConnector(limit_per_host=self._connections_per_node, enable_cleanup_closed=self.enable_cleanup_closed) + if self.use_static_responses: + connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed) else: connector = aiohttp.TCPConnector( - limit_per_host=self._connections_per_node, - use_dns_cache=True, - ssl=self._ssl_context, - enable_cleanup_closed=self.enable_cleanup_closed, + limit=self._limit, use_dns_cache=True, ssl=self._ssl_context, enable_cleanup_closed=self._enable_cleanup_closed ) self.session = aiohttp.ClientSession( headers=self.headers, auto_decompress=True, - loop=self._loop, + loop=self.loop, cookie_jar=aiohttp.DummyCookieJar(), request_class=self._request_class, response_class=self._response_class, connector=connector, - trace_configs=self.trace_configs, + trace_configs=self._trace_configs, ) -class RallyAsyncTransport(AsyncTransport): - def __init__(self, *args, **kwargs): - super().__init__(*args, node_class=RallyAiohttpHttpNode, **kwargs) - - -class RallyIlmClient(IlmClient): - async def put_lifecycle(self, *args, **kwargs): - """ - The 'elasticsearch-py' 8.x method signature renames the 'policy' param to 'name', and the previously so-called - 'body' param becomes 'policy' - """ - if args: - kwargs["name"] = args[0] - - if body := kwargs.pop("body", None): - kwargs["policy"] = body.get("policy", {}) - # pylint: disable=missing-kwoa - return await IlmClient.put_lifecycle(self, **kwargs) - - -class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder): +class VerifiedAsyncTransport(elasticsearch.AsyncTransport): def __init__(self, *args, **kwargs): - distribution_version = kwargs.pop("distribution_version", None) super().__init__(*args, **kwargs) # skip verification at this point; we've already verified this earlier with the synchronous client. # The async client is used in the hot code path and we use customized overrides (such as that we don't # parse response bodies in some cases for performance reasons, e.g. when using the bulk API). self._verified_elasticsearch = True - if distribution_version: - self.distribution_version = versions.Version.from_string(distribution_version) - else: - self.distribution_version = None - - # some ILM method signatures changed in 'elasticsearch-py' 8.x, - # so we override method(s) here to provide BWC for any custom - # runners that aren't using the new kwargs - self.ilm = RallyIlmClient(self) - async def perform_request( - self, - method: str, - path: str, - *, - params: Optional[Mapping[str, Any]] = None, - headers: Optional[Mapping[str, str]] = None, - body: Optional[Any] = None, - ) -> ApiResponse[Any]: - # We need to ensure that we provide content-type and accept headers - if body is not None: - if headers is None: - headers = {"content-type": "application/json", "accept": "application/json"} - else: - if headers.get("content-type") is None: - headers["content-type"] = "application/json" - if headers.get("accept") is None: - headers["accept"] = "application/json" - - if headers: - request_headers = self._headers.copy() - request_headers.update(headers) - else: - request_headers = self._headers - - # Converts all parts of a Accept/Content-Type headers - # from application/X -> application/vnd.elasticsearch+X - # see https://github.com/elastic/elasticsearch/issues/51816 - if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): - _mimetype_header_to_compat("Accept", request_headers) - _mimetype_header_to_compat("Content-Type", request_headers) - - if params: - target = f"{path}?{_quote_query(params)}" - else: - target = path - - meta, resp_body = await self.transport.perform_request( - method, - target, - headers=request_headers, - body=body, - request_timeout=self._request_timeout, - max_retries=self._max_retries, - retry_on_status=self._retry_on_status, - retry_on_timeout=self._retry_on_timeout, - client_meta=self._client_meta, - ) - - # HEAD with a 404 is returned as a normal response - # since this is used as an 'exists' functionality. - if not (method == "HEAD" and meta.status == 404) and ( - not 200 <= meta.status < 299 - and (self._ignore_status is DEFAULT or self._ignore_status is None or meta.status not in self._ignore_status) - ): - message = str(resp_body) - - # If the response is an error response try parsing - # the raw Elasticsearch error before raising. - if isinstance(resp_body, dict): - try: - error = resp_body.get("error", message) - if isinstance(error, dict) and "type" in error: - error = error["type"] - message = error - except (ValueError, KeyError, TypeError): - pass - - raise HTTP_EXCEPTIONS.get(meta.status, ApiError)(message=message, meta=meta, body=resp_body) - - # 'Warning' headers should be reraised as 'ElasticsearchWarning' - if "warning" in meta.headers: - warning_header = (meta.headers.get("warning") or "").strip() - warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (warning_header,) - stacklevel = warn_stacklevel() - for warning_message in warning_messages: - warnings.warn( - warning_message, - category=ElasticsearchWarning, - stacklevel=stacklevel, - ) - - if method == "HEAD": - response = HeadApiResponse(meta=meta) - elif isinstance(resp_body, dict): - response = ObjectApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - elif isinstance(resp_body, list): - response = ListApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - elif isinstance(resp_body, str): - response = TextApiResponse( # type: ignore[assignment] - body=resp_body, - meta=meta, - ) - elif isinstance(resp_body, bytes): - response = BinaryApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - else: - response = ApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - return response +class RallyAsyncElasticsearch(elasticsearch.AsyncElasticsearch, RequestContextHolder): + def perform_request(self, *args, **kwargs): + kwargs["url"] = kwargs.pop("path") + return self.transport.perform_request(*args, **kwargs) diff --git a/esrally/client/common.py b/esrally/client/common.py deleted file mode 100644 index 7702fb4de..000000000 --- a/esrally/client/common.py +++ /dev/null @@ -1,58 +0,0 @@ -import re -from datetime import date, datetime -from typing import Any, Mapping - -from elastic_transport.client_utils import percent_encode -from elasticsearch import VERSION - - -def _client_major_version_to_str(version: tuple) -> str: - return str(version[0]) - - -_WARNING_RE = re.compile(r"\"([^\"]*)\"") -_COMPAT_MIMETYPE_TEMPLATE = "application/vnd.elasticsearch+%s; compatible-with=" + _client_major_version_to_str(VERSION) -_COMPAT_MIMETYPE_RE = re.compile(r"application/(json|x-ndjson|vnd\.mapbox-vector-tile)") -_COMPAT_MIMETYPE_SUB = _COMPAT_MIMETYPE_TEMPLATE % (r"\g<1>",) - - -def _mimetype_header_to_compat(header, request_headers): - # Converts all parts of a Accept/Content-Type headers - # from application/X -> application/vnd.elasticsearch+X - mimetype = request_headers.get(header, None) - if mimetype: - request_headers[header] = _COMPAT_MIMETYPE_RE.sub(_COMPAT_MIMETYPE_SUB, mimetype) - - -def _escape(value: Any) -> str: - """ - Escape a single value of a URL string or a query parameter. If it is a list - or tuple, turn it into a comma-separated string first. - """ - - # make sequences into comma-separated stings - if isinstance(value, (list, tuple)): - value = ",".join([_escape(item) for item in value]) - - # dates and datetimes into isoformat - elif isinstance(value, (date, datetime)): - value = value.isoformat() - - # make bools into true/false strings - elif isinstance(value, bool): - value = str(value).lower() - - elif isinstance(value, bytes): - return value.decode("utf-8", "surrogatepass") - - if not isinstance(value, str): - return str(value) - return value - - -def _quote(value: Any) -> str: - return percent_encode(_escape(value), ",*") - - -def _quote_query(query: Mapping[str, Any]) -> str: - return "&".join([f"{k}={_quote(v)}" for k, v in query.items()]) diff --git a/esrally/client/factory.py b/esrally/client/factory.py index 201fc0e53..f8a70f644 100644 --- a/esrally/client/factory.py +++ b/esrally/client/factory.py @@ -19,6 +19,7 @@ import time import certifi +import urllib3 from urllib3.connection import is_ipaddress from esrally import doc_link, exceptions @@ -27,22 +28,13 @@ class EsClientFactory: """ - Abstracts how the Elasticsearch client is created and customizes the client for backwards - compatibility guarantees that are broader than the library's defaults. + Abstracts how the Elasticsearch client is created. Intended for testing. """ - def __init__(self, hosts, client_options, distribution_version=None): - def host_string(host): - # protocol can be set at either host or client opts level - protocol = "https" if client_options.get("use_ssl") or host.get("use_ssl") else "http" - return f"{protocol}://{host['host']}:{host['port']}" - - self.hosts = [host_string(h) for h in hosts] + def __init__(self, hosts, client_options): + self.hosts = hosts self.client_options = dict(client_options) self.ssl_context = None - # This attribute is necessary for the backwards-compatibility logic contained in - # RallySyncElasticsearch.perform_request() and RallyAsyncElasticsearch.perform_request(). - self.distribution_version = distribution_version self.logger = logging.getLogger(__name__) masked_client_options = dict(client_options) @@ -58,24 +50,26 @@ def host_string(host): import ssl self.logger.debug("SSL support: on") + self.client_options["scheme"] = "https" self.ssl_context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, cafile=self.client_options.pop("ca_certs", certifi.where()) ) - # We call get() here instead of pop() in order to pass verify_certs through as a kwarg - # to the elasticsearch.Elasticsearch constructor. Setting the ssl_context's verify_mode to - # ssl.CERT_NONE is insufficient with version 8.0+ of elasticsearch-py. - if not self.client_options.get("verify_certs", True): + if not self.client_options.pop("verify_certs", True): self.logger.debug("SSL certificate verification: off") # order matters to avoid ValueError: check_hostname needs a SSL context with either CERT_OPTIONAL or CERT_REQUIRED self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE - self.client_options["ssl_show_warn"] = False self.logger.warning( - "User has enabled SSL but disabled certificate verification. This is dangerous but may be ok for a benchmark." + "User has enabled SSL but disabled certificate verification. This is dangerous but may be ok for a " + "benchmark. Disabling urllib warnings now to avoid a logging storm. " + "See https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings for details." ) + # disable: "InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly \ + # advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings" + urllib3.disable_warnings() else: # check_hostname should not be set when host is an IP address self.ssl_context.check_hostname = self._only_hostnames(hosts) @@ -111,9 +105,9 @@ def host_string(host): self.ssl_context.load_cert_chain(certfile=client_cert, keyfile=client_key) else: self.logger.debug("SSL support: off") + self.client_options["scheme"] = "http" if self._is_set(self.client_options, "create_api_key_per_client"): - self.client_options.pop("create_api_key_per_client") basic_auth_user = self.client_options.get("basic_auth_user", False) basic_auth_password = self.client_options.get("basic_auth_password", False) provided_auth = {"basic_auth_user": basic_auth_user, "basic_auth_password": basic_auth_password} @@ -133,8 +127,8 @@ def host_string(host): self.logger.debug("Automatic creation of client API keys: off") if self._is_set(self.client_options, "basic_auth_user") and self._is_set(self.client_options, "basic_auth_password"): - self.client_options["basic_auth"] = (self.client_options.pop("basic_auth_user"), self.client_options.pop("basic_auth_password")) self.logger.debug("HTTP basic authentication: on") + self.client_options["http_auth"] = (self.client_options.pop("basic_auth_user"), self.client_options.pop("basic_auth_password")) else: self.logger.debug("HTTP basic authentication: off") @@ -147,12 +141,8 @@ def host_string(host): else: self.logger.debug("HTTP compression: off") - self.enable_cleanup_closed = convert.to_bool(self.client_options.pop("enable_cleanup_closed", True)) - self.max_connections = max(256, self.client_options.pop("max_connections", 0)) - self.static_responses = self.client_options.pop("static_responses", None) - - if self._is_set(self.client_options, "timeout"): - self.client_options["request_timeout"] = self.client_options.pop("timeout") + if self._is_set(self.client_options, "enable_cleanup_closed"): + self.client_options["enable_cleanup_closed"] = convert.to_bool(self.client_options.pop("enable_cleanup_closed")) @staticmethod def _only_hostnames(hosts): @@ -180,9 +170,7 @@ def create(self): # pylint: disable=import-outside-toplevel from esrally.client.synchronous import RallySyncElasticsearch - return RallySyncElasticsearch( - distribution_version=self.distribution_version, hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options - ) + return RallySyncElasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options) def create_async(self, api_key=None): # pylint: disable=import-outside-toplevel @@ -192,17 +180,18 @@ def create_async(self, api_key=None): from elasticsearch.serializer import JSONSerializer from esrally.client.asynchronous import ( + AIOHttpConnection, RallyAsyncElasticsearch, - RallyAsyncTransport, + VerifiedAsyncTransport, ) class LazyJSONSerializer(JSONSerializer): - def loads(self, data): + def loads(self, s): meta = RallyAsyncElasticsearch.request_context.get() if "raw_response" in meta: - return io.BytesIO(data) + return io.BytesIO(s) else: - return super().loads(data) + return super().loads(s) async def on_request_start(session, trace_config_ctx, params): RallyAsyncElasticsearch.on_request_start() @@ -218,30 +207,20 @@ async def on_request_end(session, trace_config_ctx, params): # override the builtin JSON serializer self.client_options["serializer"] = LazyJSONSerializer() + self.client_options["trace_config"] = trace_config if api_key is not None: - self.client_options.pop("http_auth", None) - self.client_options.pop("basic_auth", None) + self.client_options.pop("http_auth") self.client_options["api_key"] = api_key - async_client = RallyAsyncElasticsearch( - distribution_version=self.distribution_version, + return RallyAsyncElasticsearch( hosts=self.hosts, - transport_class=RallyAsyncTransport, + transport_class=VerifiedAsyncTransport, + connection_class=AIOHttpConnection, ssl_context=self.ssl_context, - maxsize=self.max_connections, **self.client_options, ) - # the AsyncElasticsearch constructor automatically creates the corresponding NodeConfig objects, so we set - # their instance attributes after they've been instantiated - for node_connection in async_client.transport.node_pool.all(): - node_connection.trace_configs = [trace_config] - node_connection.enable_cleanup_closed = self.enable_cleanup_closed - node_connection.static_responses = self.static_responses - - return async_client - def wait_for_rest_layer(es, max_attempts=40): """ @@ -254,19 +233,11 @@ def wait_for_rest_layer(es, max_attempts=40): # assume that at least the hosts that we expect to contact should be available. Note that this is not 100% # bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts # but this is still better than just checking for any random node's REST API being reachable. - expected_node_count = len(es.transport.node_pool) + expected_node_count = len(es.transport.hosts) logger = logging.getLogger(__name__) - attempt = 0 - while attempt <= max_attempts: - attempt += 1 + for attempt in range(max_attempts): # pylint: disable=import-outside-toplevel - from elastic_transport import ( - ApiError, - ConnectionError, - SerializationError, - TlsError, - TransportError, - ) + import elasticsearch try: # see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also @@ -274,44 +245,19 @@ def wait_for_rest_layer(es, max_attempts=40): es.cluster.health(wait_for_nodes=f">={expected_node_count}") logger.debug("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt) return True - except SerializationError as e: - if "Client sent an HTTP request to an HTTPS server" in str(e): - raise exceptions.SystemSetupError( - "Rally sent an HTTP request to an HTTPS server. Are you sure this is an HTTP endpoint?", e - ) - - if attempt <= max_attempts: - logger.debug("Got serialization error [%s] on attempt [%s]. Sleeping...", e, attempt) - time.sleep(3) - else: - raise - except TlsError as e: - raise exceptions.SystemSetupError("Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", e) - except ConnectionError as e: - if "ProtocolError" in str(e): - raise exceptions.SystemSetupError( - "Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", e - ) - - if attempt <= max_attempts: - logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt) - time.sleep(3) - else: - raise - except TransportError as e: - if attempt <= max_attempts: - logger.debug("Got transport error on attempt [%s]. Sleeping...", attempt) - time.sleep(3) - else: - raise - except ApiError as e: + except elasticsearch.ConnectionError as e: + if "SSL: UNKNOWN_PROTOCOL" in str(e): + raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e) + logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt) + time.sleep(3) + except elasticsearch.TransportError as e: # cluster block, x-pack not initialized yet, our wait condition is not reached - if e.status_code in (503, 401, 408) and attempt <= max_attempts: - logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.message, attempt) + if e.status_code in (503, 401, 408): + logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.status_code, attempt) time.sleep(3) else: - logger.warning("Got unexpected status code [%s] on attempt [%s].", e.message, attempt) - raise + logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt) + raise e return False @@ -332,12 +278,9 @@ def create_api_key(es, client_id, max_attempts=5): try: logger.debug("Creating ES API key for client ID [%s]", client_id) - return es.security.create_api_key(name=f"rally-client-{client_id}") + return es.security.create_api_key({"name": f"rally-client-{client_id}"}) except elasticsearch.TransportError as e: - logger.debug("Got transport error [%s] on attempt [%s]. Sleeping...", str(e), attempt) - time.sleep(1) - except elasticsearch.ApiError as e: - if e.meta.status == 405: + if e.status_code == 405: # We don't retry on 405 since it indicates a misconfigured benchmark candidate and isn't recoverable raise exceptions.SystemSetupError( "Got status code 405 when attempting to create API keys. Is Elasticsearch Security enabled?", e @@ -377,7 +320,7 @@ def raise_exception(failed_ids, cause=None): try: if current_version >= minimum_version: - resp = es.security.invalidate_api_key(ids=remaining) + resp = es.security.invalidate_api_key({"ids": remaining}) deleted += resp["invalidated_api_keys"] remaining = [i for i in ids if i not in deleted] # Like bulk indexing requests, we can get an HTTP 200, but the @@ -403,16 +346,16 @@ def raise_exception(failed_ids, cause=None): remaining = [i for i in ids if i not in deleted] if attempt < max_attempts: for i in remaining: - es.security.invalidate_api_key(id=i) + es.security.invalidate_api_key({"id": i}) deleted.append(i) else: if remaining: raise_exception(remaining) return True - except elasticsearch.ApiError as e: + except elasticsearch.TransportError as e: if attempt < max_attempts: - logger.debug("Got status code [%s] on attempt [%s] of [%s]. Sleeping...", e.meta.status, attempt, max_attempts) + logger.debug("Got status code [%s] on attempt [%s] of [%s]. Sleeping...", e.status_code, attempt, max_attempts) time.sleep(1) else: raise_exception(remaining, cause=e) diff --git a/esrally/client/synchronous.py b/esrally/client/synchronous.py index 486ccb7a2..90e3d8a5f 100644 --- a/esrally/client/synchronous.py +++ b/esrally/client/synchronous.py @@ -15,231 +15,10 @@ # specific language governing permissions and limitations # under the License. -import re -import warnings -from typing import Any, Iterable, Mapping, Optional +import elasticsearch -from elastic_transport import ( - ApiResponse, - BinaryApiResponse, - HeadApiResponse, - ListApiResponse, - ObjectApiResponse, - TextApiResponse, -) -from elastic_transport.client_utils import DEFAULT -from elasticsearch import Elasticsearch -from elasticsearch.compat import warn_stacklevel -from elasticsearch.exceptions import ( - HTTP_EXCEPTIONS, - ApiError, - ElasticsearchWarning, - UnsupportedProductError, -) -from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query -from esrally.utils import versions - - -# This reproduces the product verification behavior of v7.14.0 of the client: -# https://github.com/elastic/elasticsearch-py/blob/v7.14.0/elasticsearch/transport.py#L606 -# -# As of v8.0.0, the client determines whether the server is Elasticsearch by checking -# whether HTTP responses contain the `X-elastic-product` header. If they do not, it raises -# an `UnsupportedProductError`. This header was only introduced in Elasticsearch 7.14.0, -# however, so the client will consider any version of ES prior to 7.14.0 unsupported due to -# responses not including it. -# -# Because Rally needs to support versions of ES >= 6.8.0, we resurrect the previous -# logic for determining the authenticity of the server, which does not rely exclusively -# on this header. -class _ProductChecker: - """Class which verifies we're connected to a supported product""" - - # States that can be returned from 'check_product' - SUCCESS = True - UNSUPPORTED_PRODUCT = 2 - UNSUPPORTED_DISTRIBUTION = 3 - - @classmethod - def raise_error(cls, state, meta, body): - # These states mean the product_check() didn't fail so do nothing. - if state in (None, True): - return - - if state == cls.UNSUPPORTED_DISTRIBUTION: - message = "The client noticed that the server is not a supported distribution of Elasticsearch" - else: # UNSUPPORTED_PRODUCT - message = "The client noticed that the server is not Elasticsearch and we do not support this unknown product" - raise UnsupportedProductError(message, meta=meta, body=body) - - @classmethod - def check_product(cls, headers, response): - # type: (dict[str, str], dict[str, str]) -> int - """Verifies that the server we're talking to is Elasticsearch. - Does this by checking HTTP headers and the deserialized - response to the 'info' API. Returns one of the states above. - """ - try: - version = response.get("version", {}) - version_number = tuple( - int(x) if x is not None else 999 for x in re.search(r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?", version["number"]).groups() - ) - except (KeyError, TypeError, ValueError, AttributeError): - # No valid 'version.number' field, effectively 0.0.0 - version = {} - version_number = (0, 0, 0) - - # Check all of the fields and headers for missing/valid values. - try: - bad_tagline = response.get("tagline", None) != "You Know, for Search" - bad_build_flavor = version.get("build_flavor", None) != "default" - bad_product_header = headers.get("x-elastic-product", None) != "Elasticsearch" - except (AttributeError, TypeError): - bad_tagline = True - bad_build_flavor = True - bad_product_header = True - - # 7.0-7.13 and there's a bad 'tagline' or unsupported 'build_flavor' - if (7, 0, 0) <= version_number < (7, 14, 0): - if bad_tagline: - return cls.UNSUPPORTED_PRODUCT - elif bad_build_flavor: - return cls.UNSUPPORTED_DISTRIBUTION - - elif ( - # No version or version less than 6.x - version_number < (6, 0, 0) - # 6.x and there's a bad 'tagline' - or ((6, 0, 0) <= version_number < (7, 0, 0) and bad_tagline) - # 7.14+ and there's a bad 'X-Elastic-Product' HTTP header - or ((7, 14, 0) <= version_number and bad_product_header) - ): - return cls.UNSUPPORTED_PRODUCT - - return True - - -class RallySyncElasticsearch(Elasticsearch): - def __init__(self, *args, **kwargs): - distribution_version = kwargs.pop("distribution_version", None) - super().__init__(*args, **kwargs) - self._verified_elasticsearch = None - - if distribution_version: - self.distribution_version = versions.Version.from_string(distribution_version) - else: - self.distribution_version = None - - def perform_request( - self, - method: str, - path: str, - *, - params: Optional[Mapping[str, Any]] = None, - headers: Optional[Mapping[str, str]] = None, - body: Optional[Any] = None, - ) -> ApiResponse[Any]: - # We need to ensure that we provide content-type and accept headers - if body is not None: - if headers is None: - headers = {"content-type": "application/json", "accept": "application/json"} - else: - if headers.get("content-type") is None: - headers["content-type"] = "application/json" - if headers.get("accept") is None: - headers["accept"] = "application/json" - - if headers: - request_headers = self._headers.copy() - request_headers.update(headers) - else: - request_headers = self._headers - - if self._verified_elasticsearch is None: - info = self.transport.perform_request(method="GET", target="/", headers=request_headers) - info_meta = info.meta - info_body = info.body - - if not 200 <= info_meta.status < 299: - raise HTTP_EXCEPTIONS.get(info_meta.status, ApiError)(message=str(info_body), meta=info_meta, body=info_body) - - self._verified_elasticsearch = _ProductChecker.check_product(info_meta.headers, info_body) - - if self._verified_elasticsearch is not True: - _ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body) - - # Converts all parts of a Accept/Content-Type headers - # from application/X -> application/vnd.elasticsearch+X - # see https://github.com/elastic/elasticsearch/issues/51816 - if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): - _mimetype_header_to_compat("Accept", request_headers) - _mimetype_header_to_compat("Content-Type", request_headers) - - if params: - target = f"{path}?{_quote_query(params)}" - else: - target = path - - meta, resp_body = self.transport.perform_request( - method, - target, - headers=request_headers, - body=body, - request_timeout=self._request_timeout, - max_retries=self._max_retries, - retry_on_status=self._retry_on_status, - retry_on_timeout=self._retry_on_timeout, - client_meta=self._client_meta, - ) - - # HEAD with a 404 is returned as a normal response - # since this is used as an 'exists' functionality. - if not (method == "HEAD" and meta.status == 404) and ( - not 200 <= meta.status < 299 - and (self._ignore_status is DEFAULT or self._ignore_status is None or meta.status not in self._ignore_status) - ): - message = str(resp_body) - - # If the response is an error response try parsing - # the raw Elasticsearch error before raising. - if isinstance(resp_body, dict): - try: - error = resp_body.get("error", message) - if isinstance(error, dict) and "type" in error: - error = error["type"] - message = error - except (ValueError, KeyError, TypeError): - pass - - raise HTTP_EXCEPTIONS.get(meta.status, ApiError)(message=message, meta=meta, body=resp_body) - - # 'Warning' headers should be reraised as 'ElasticsearchWarning' - if "warning" in meta.headers: - warning_header = (meta.headers.get("warning") or "").strip() - warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (warning_header,) - stacklevel = warn_stacklevel() - for warning_message in warning_messages: - warnings.warn( - warning_message, - category=ElasticsearchWarning, - stacklevel=stacklevel, - ) - - if method == "HEAD": - response = HeadApiResponse(meta=meta) - elif isinstance(resp_body, dict): - response = ObjectApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - elif isinstance(resp_body, list): - response = ListApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - elif isinstance(resp_body, str): - response = TextApiResponse( # type: ignore[assignment] - body=resp_body, - meta=meta, - ) - elif isinstance(resp_body, bytes): - response = BinaryApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - else: - response = ApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] - - return response +class RallySyncElasticsearch(elasticsearch.Elasticsearch): + def perform_request(self, *args, **kwargs): + kwargs["url"] = kwargs.pop("path") + return self.transport.perform_request(*args, **kwargs) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index a10d626e5..e7212d118 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -28,7 +28,6 @@ import time from dataclasses import dataclass from enum import Enum -from io import BytesIO from typing import Callable import thespian.actors @@ -598,16 +597,13 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor def create_es_clients(self): all_hosts = self.config.opts("client", "hosts").all_hosts - distribution_version = self.config.opts("mechanic", "distribution.version", mandatory=False) es = {} for cluster_name, cluster_hosts in all_hosts.items(): all_client_options = self.config.opts("client", "options").all_client_options cluster_client_options = dict(all_client_options[cluster_name]) # Use retries to avoid aborts on long living connections for telemetry devices - cluster_client_options["retry_on_timeout"] = True - es[cluster_name] = self.es_client_factory( - cluster_hosts, cluster_client_options, distribution_version=distribution_version - ).create() + cluster_client_options["retry-on-timeout"] = True + es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create() return es def prepare_telemetry(self, es, enable, index_names, data_stream_names): @@ -1732,14 +1728,12 @@ def _logging_exception_handler(self, loop, context): self.logger.error("Uncaught exception in event loop: %s", context) async def run(self): - def es_clients(client_id, all_hosts, all_client_options, distribution_version): + def es_clients(client_id, all_hosts, all_client_options): es = {} context = self.client_contexts.get(client_id) api_key = context.api_key for cluster_name, cluster_hosts in all_hosts.items(): - es[cluster_name] = client.EsClientFactory( - cluster_hosts, all_client_options[cluster_name], distribution_version=distribution_version - ).create_async(api_key=api_key) + es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async(api_key=api_key) return es if self.assertions_enabled: @@ -1756,12 +1750,7 @@ def es_clients(client_id, all_hosts, all_client_options, distribution_version): param_source = track.operation_parameters(self.track, task) params_per_task[task] = param_source schedule = schedule_for(task_allocation, params_per_task[task]) - es = es_clients( - client_id, - self.cfg.opts("client", "hosts").all_hosts, - self.cfg.opts("client", "options"), - self.cfg.opts("mechanic", "distribution.version", mandatory=False), - ) + es = es_clients(client_id, self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options")) clients.append(es) async_executor = AsyncExecutor( client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error) @@ -1992,47 +1981,20 @@ async def execute_single(runner, es, params, on_error): total_ops = 0 total_ops_unit = "ops" request_meta_data = {"success": False, "error-type": "transport"} - # For the 'errors' attribute, errors are ordered from - # most recently raised (index=0) to least recently raised (index=N) - # - # If an HTTP status code is available with the error it - # will be stored under 'status'. If HTTP headers are available - # they are stored under 'headers'. - if e.errors: - if hasattr(e.errors[0], "status"): - request_meta_data["http-status"] = e.errors[0].status + # The ES client will sometimes return string like "N/A" or "TIMEOUT" for connection errors. + if isinstance(e.status_code, int): + request_meta_data["http-status"] = e.status_code # connection timeout errors don't provide a helpful description if isinstance(e, elasticsearch.ConnectionTimeout): request_meta_data["error-description"] = "network connection timed out" + elif e.info: + request_meta_data["error-description"] = f"{e.error} ({e.info})" else: - error_description = e.message + if isinstance(e.error, bytes): + error_description = e.error.decode("utf-8") + else: + error_description = str(e.error) request_meta_data["error-description"] = error_description - except elasticsearch.ApiError as e: - total_ops = 0 - total_ops_unit = "ops" - request_meta_data = {"success": False, "error-type": "api"} - - if isinstance(e.error, bytes): - error_message = e.error.decode("utf-8") - elif isinstance(e.error, BytesIO): - error_message = e.error.read().decode("utf-8") - else: - error_message = e.error - - if isinstance(e.info, bytes): - error_body = e.info.decode("utf-8") - elif isinstance(e.info, BytesIO): - error_body = e.info.read().decode("utf-8") - else: - error_body = e.info - - if error_body: - error_message += f" ({error_body})" - error_description = error_message - - request_meta_data["error-description"] = error_description - if e.status_code: - request_meta_data["http-status"] = e.status_code except KeyError as e: logging.getLogger(__name__).exception("Cannot execute runner [%s]; most likely due to missing parameters.", str(runner)) msg = "Cannot execute [%s]. Provided parameters are: %s. Error: [%s]." % (str(runner), list(params.keys()), str(e)) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index bd8167453..7c9d3655a 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -730,7 +730,7 @@ class NodeStats(Runner): async def __call__(self, es, params): request_timeout = params.get("request-timeout") - await es.options(request_timeout=request_timeout).nodes.stats(metric="_all") + await es.nodes.stats(metric="_all", request_timeout=request_timeout) def __repr__(self, *args, **kwargs): return "node-stats" @@ -848,9 +848,6 @@ def __init__(self): async def __call__(self, es, params): request_params, headers = self._transport_request_params(params) - request_timeout = request_params.pop("request_timeout", None) - if request_timeout is not None: - es.options(request_timeout=request_timeout) # Mandatory to ensure it is always provided. This is especially important when this runner is used in a # composite context where there is no actual parameter source and the entire request structure must be provided # by the composite's parameter source. @@ -1398,10 +1395,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete(index=index_name, params=request_params) ops += 1 - elif only_if_exists and await es.indices.exists(index=index_name): - self.logger.info("Index [%s] already exists. Deleting it.", index_name) - await es.indices.delete(index=index_name, params=request_params) - ops += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + get_response = await es.indices.get(index=index_name, ignore=[404]) + if not get_response.get("status") == 404: + self.logger.info("Index [%s] already exists. Deleting it.", index_name) + await es.indices.delete(index=index_name, params=request_params) + ops += 1 finally: await set_destructive_requires_name(es, prior_destructive_setting) return { @@ -1430,10 +1432,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 - elif only_if_exists and await es.indices.exists(index=data_stream): - self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) - await es.indices.delete_data_stream(name=data_stream, params=request_params) - ops += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + get_response = await es.indices.get(index=data_stream, ignore=[404]) + if not get_response.get("status") == 404: + self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + await es.indices.delete_data_stream(name=data_stream, params=request_params) + ops += 1 return { "weight": ops, @@ -1482,10 +1489,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.cluster.exists_component_template(name=template_name): - self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) - await es.cluster.delete_component_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_component_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + component_template_exists = await es.cluster.get_component_template(name=template_name, ignore=[404]) + if not component_template_exists.get("status") == 404: + self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) + await es.cluster.delete_component_template(name=template_name, params=request_params) + ops_count += 1 return { "weight": ops_count, "unit": "ops", @@ -1532,10 +1544,15 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists and await es.indices.exists_index_template(name=template_name): - self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_index_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists: + # here we use .get() and check for 404 instead of exists_index_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + index_template_exists = await es.indices.get_index_template(name=template_name, ignore=[404]) + if not index_template_exists.get("status") == 404: + self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_index_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) @@ -1587,7 +1604,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 - elif only_if_exists and await es.indices.exists_template(name=template_name): + # here we use .get_template() and check for empty instead of exists_template due to a bug in some versions + # of elasticsearch-py/elastic-transport with HEAD calls. + # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 + elif only_if_exists and await es.indices.get_template(name=template_name, ignore=[404]): self.logger.info("Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 @@ -1701,13 +1721,16 @@ async def __call__(self, es, params): body = mandatory(params, "body", self) try: await es.ml.put_datafeed(datafeed_id=datafeed_id, body=body) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="PUT", - path=f"/_xpack/ml/datafeeds/{datafeed_id}", - body=body, - ) + except elasticsearch.TransportError as e: + # fallback to old path + if e.status_code == 400: + await es.perform_request( + method="PUT", + path=f"/_xpack/ml/datafeeds/{datafeed_id}", + body=body, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "create-ml-datafeed" @@ -1727,13 +1750,16 @@ async def __call__(self, es, params): try: # we don't want to fail if a datafeed does not exist, thus we ignore 404s. await es.ml.delete_datafeed(datafeed_id=datafeed_id, force=force, ignore=[404]) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="DELETE", - path=f"/_xpack/ml/datafeeds/{datafeed_id}", - params={"force": escape(force), "ignore": 404}, - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + await es.perform_request( + method="DELETE", + path=f"/_xpack/ml/datafeeds/{datafeed_id}", + params={"force": escape(force), "ignore": 404}, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "delete-ml-datafeed" @@ -1755,13 +1781,16 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.start_datafeed(datafeed_id=datafeed_id, body=body, start=start, end=end, timeout=timeout) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="POST", - path=f"/_xpack/ml/datafeeds/{datafeed_id}/_start", - body=body, - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + await es.perform_request( + method="POST", + path=f"/_xpack/ml/datafeeds/{datafeed_id}/_start", + body=body, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "start-ml-datafeed" @@ -1781,18 +1810,21 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.stop_datafeed(datafeed_id=datafeed_id, force=force, timeout=timeout) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - request_params = { - "force": escape(force), - } - if timeout: - request_params["timeout"] = escape(timeout) - await es.perform_request( - method="POST", - path=f"/_xpack/ml/datafeeds/{datafeed_id}/_stop", - params=request_params, - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + request_params = { + "force": escape(force), + } + if timeout: + request_params["timeout"] = escape(timeout) + await es.perform_request( + method="POST", + path=f"/_xpack/ml/datafeeds/{datafeed_id}/_stop", + params=request_params, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "stop-ml-datafeed" @@ -1811,13 +1843,16 @@ async def __call__(self, es, params): body = mandatory(params, "body", self) try: await es.ml.put_job(job_id=job_id, body=body) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="PUT", - path=f"/_xpack/ml/anomaly_detectors/{job_id}", - body=body, - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + await es.perform_request( + method="PUT", + path=f"/_xpack/ml/anomaly_detectors/{job_id}", + body=body, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "create-ml-job" @@ -1837,13 +1872,16 @@ async def __call__(self, es, params): # we don't want to fail if a job does not exist, thus we ignore 404s. try: await es.ml.delete_job(job_id=job_id, force=force, ignore=[404]) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="DELETE", - path=f"/_xpack/ml/anomaly_detectors/{job_id}", - params={"force": escape(force), "ignore": 404}, - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + await es.perform_request( + method="DELETE", + path=f"/_xpack/ml/anomaly_detectors/{job_id}", + params={"force": escape(force), "ignore": 404}, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "delete-ml-job" @@ -1861,12 +1899,15 @@ async def __call__(self, es, params): job_id = mandatory(params, "job-id", self) try: await es.ml.open_job(job_id=job_id) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - await es.perform_request( - method="POST", - path=f"/_xpack/ml/anomaly_detectors/{job_id}/_open", - ) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + await es.perform_request( + method="POST", + path=f"/_xpack/ml/anomaly_detectors/{job_id}/_open", + ) + else: + raise e def __repr__(self, *args, **kwargs): return "open-ml-job" @@ -1886,19 +1927,22 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.close_job(job_id=job_id, force=force, timeout=timeout) - except elasticsearch.BadRequestError: - # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 - request_params = { - "force": escape(force), - } - if timeout: - request_params["timeout"] = escape(timeout) + except elasticsearch.TransportError as e: + # fallback to old path (ES < 7) + if e.status_code == 400: + request_params = { + "force": escape(force), + } + if timeout: + request_params["timeout"] = escape(timeout) - await es.perform_request( - method="POST", - path=f"/_xpack/ml/anomaly_detectors/{job_id}/_close", - params=request_params, - ) + await es.perform_request( + method="POST", + path=f"/_xpack/ml/anomaly_detectors/{job_id}/_close", + params=request_params, + ) + else: + raise e def __repr__(self, *args, **kwargs): return "close-ml-job" @@ -1963,7 +2007,7 @@ class CreateSnapshotRepository(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) await es.snapshot.create_repository( - name=mandatory(params, "repository", repr(self)), body=mandatory(params, "body", repr(self)), params=request_params + repository=mandatory(params, "repository", repr(self)), body=mandatory(params, "body", repr(self)), params=request_params ) def __repr__(self, *args, **kwargs): @@ -2053,15 +2097,32 @@ async def __call__(self, es, params): wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() es_version = Version.from_string(es_info["version"]["number"]) + api = es.snapshot.get request_args = {"repository": repository, "snapshot": "_current", "verbose": False} # significantly reduce response size when lots of snapshots have been taken # only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269) if (es_version.major, es_version.minor) >= (8, 3): - request_args["index_names"] = False + request_params, headers = self._transport_request_params(params) + headers["Content-Type"] = "application/json" + + request_params["index_names"] = "false" + request_params["verbose"] = "false" + + request_args = { + "method": "GET", + "path": f"_snapshot/{repository}/_current", + "headers": headers, + "params": request_params, + } + + # TODO: Switch to native es.snapshot.get once `index_names` becomes supported in + # `es.snapshot.get` of the elasticsearch-py client and we've upgraded the client in Rally, see: + # https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch.client.SnapshotClient.get + api = es.perform_request while True: - response = await es.snapshot.get(**request_args) + response = await api(**request_args) if int(response.get("total")) == 0: break @@ -2608,21 +2669,8 @@ class CreateIlmPolicy(Runner): async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) body = mandatory(params, "body", self) - policy = mandatory(body, "policy", self) request_params = params.get("request-params", {}) - error_trace = request_params.get("error_trace", None) - filter_path = request_params.get("filter_path", None) - master_timeout = request_params.get("master_timeout", None) - timeout = request_params.get("timeout", None) - - await es.ilm.put_lifecycle( - name=policy_name, - policy=policy, - error_trace=error_trace, - filter_path=filter_path, - master_timeout=master_timeout, - timeout=timeout, - ) + await es.ilm.put_lifecycle(policy=policy_name, body=body, params=request_params) return { "weight": 1, "unit": "ops", @@ -2642,14 +2690,7 @@ class DeleteIlmPolicy(Runner): async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) request_params = params.get("request-params", {}) - error_trace = request_params.get("error_trace", None) - filter_path = request_params.get("filter_path", None) - master_timeout = request_params.get("master_timeout", None) - timeout = request_params.get("timeout", None) - - await es.ilm.delete_lifecycle( - name=policy_name, error_trace=error_trace, filter_path=filter_path, master_timeout=master_timeout, timeout=timeout - ) + await es.ilm.delete_lifecycle(policy=policy_name, params=request_params) return { "weight": 1, "unit": "ops", @@ -2702,6 +2743,7 @@ class Downsample(Runner): """ async def __call__(self, es, params): + request_params, request_headers = self._transport_request_params(params) fixed_interval = mandatory(params, "fixed-interval", self) @@ -2869,7 +2911,7 @@ async def __call__(self, es, params): if last_attempt or not retry_on_timeout: raise await asyncio.sleep(sleep_time) - except elasticsearch.ApiError as e: + except elasticsearch.exceptions.TransportError as e: if last_attempt or not retry_on_timeout: raise e @@ -2879,16 +2921,6 @@ async def __call__(self, es, params): else: raise e - except elasticsearch.exceptions.ConnectionTimeout as e: - if last_attempt or not retry_on_timeout: - raise e - - self.logger.info("[%s] has timed out. Retrying in [%.2f] seconds.", repr(self.delegate), sleep_time) - await asyncio.sleep(sleep_time) - except elasticsearch.exceptions.TransportError as e: - if last_attempt or not retry_on_timeout: - raise e - async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) diff --git a/esrally/log.py b/esrally/log.py index 189a1b8c5..0bc8409e4 100644 --- a/esrally/log.py +++ b/esrally/log.py @@ -43,40 +43,6 @@ def log_config_path(): return os.path.join(paths.rally_confdir(), "logging.json") -def add_missing_loggers_to_config(): - """ - Ensures that any missing top level loggers in resources/logging.json are - appended to an existing log configuration - """ - - def _missing_loggers(source, target): - """ - Returns any top-level loggers present in 'source', but not in 'target' - :return: A dict of all loggers present in 'source', but not in 'target' - """ - missing_loggers = {} - for logger in source: - if logger in source and logger in target: - continue - else: - missing_loggers[logger] = source[logger] - return missing_loggers - - source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json")) - - with open(log_config_path(), encoding="UTF-8") as target: - with open(source_path, encoding="UTF-8") as src: - template = json.load(src) - existing_logging_config = json.load(target) - if missing_loggers := _missing_loggers(source=template["loggers"], target=existing_logging_config["loggers"]): - existing_logging_config["loggers"].update(missing_loggers) - updated_config = json.dumps(existing_logging_config, indent=2) - - if missing_loggers: - with open(log_config_path(), "w", encoding="UTF-8") as target: - target.write(updated_config) - - def install_default_log_config(): """ Ensures a log configuration file is present on this machine. The default @@ -97,7 +63,6 @@ def install_default_log_config(): log_path = io.escape_path(log_path) contents = src.read().replace("${LOG_PATH}", log_path) target.write(contents) - add_missing_loggers_to_config() io.ensure_dir(paths.logs()) diff --git a/esrally/metrics.py b/esrally/metrics.py index 84dcd4762..b6482de54 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -30,6 +30,7 @@ import uuid import zlib from enum import Enum, IntEnum +from http.client import responses import tabulate @@ -58,8 +59,7 @@ def probe_version(self): raise exceptions.RallyError(msg) def put_template(self, name, template): - tmpl = json.loads(template) - return self.guarded(self._client.indices.put_template, name=name, **tmpl) + return self.guarded(self._client.indices.put_template, name=name, body=template) def template_exists(self, name): return self.guarded(self._client.indices.exists_template, name=name) @@ -103,79 +103,70 @@ def search(self, index, body): def guarded(self, target, *args, **kwargs): # pylint: disable=import-outside-toplevel import elasticsearch - from elastic_transport import ApiError, TransportError - max_execution_count = 10 + max_execution_count = 11 execution_count = 0 - while execution_count <= max_execution_count: + while execution_count < max_execution_count: time_to_sleep = 2**execution_count + random.random() execution_count += 1 try: return target(*args, **kwargs) - except elasticsearch.exceptions.ConnectionTimeout as e: - if execution_count <= max_execution_count: - self.logger.debug( - "Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.", - e.message, - execution_count, - max_execution_count, - time_to_sleep, - ) - time.sleep(time_to_sleep) - else: - operation = target.__name__ - self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count) - node = self._client.transport.node_pool.get() - msg = ( - "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on " - "host [%s] at port [%s]." % (operation, node.host, node.port) - ) - raise exceptions.RallyError(msg) - except elasticsearch.exceptions.ConnectionError as e: - if execution_count <= max_execution_count: - self.logger.debug( - "Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.", - e.message, - execution_count, - max_execution_count, - time_to_sleep, - ) - time.sleep(time_to_sleep) - else: - node = self._client.transport.node_pool.get() - msg = ( - "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]" - " or fix the configuration in [%s]." % (node.host, node.port, config.ConfigFile().location) - ) - self.logger.exception(msg) - # connection errors doesn't neccessarily mean it's during setup - raise exceptions.RallyError(msg) except elasticsearch.exceptions.AuthenticationException: # we know that it is just one host (see EsClientFactory) - node = self._client.transport.node_pool.get() + node = self._client.transport.hosts[0] msg = ( "The configured user could not authenticate against your Elasticsearch metrics store running on host [%s] at " "port [%s] (wrong password?). Please fix the configuration in [%s]." - % (node.host, node.port, config.ConfigFile().location) + % (node["host"], node["port"], config.ConfigFile().location) ) self.logger.exception(msg) raise exceptions.SystemSetupError(msg) except elasticsearch.exceptions.AuthorizationException: - node = self._client.transport.node_pool.get() + node = self._client.transport.hosts[0] msg = ( "The configured user does not have enough privileges to run the operation [%s] against your Elasticsearch metrics " "store running on host [%s] at port [%s]. Please adjust your x-pack configuration or specify a user with enough " - "privileges in the configuration in [%s]." % (target.__name__, node.host, node.port, config.ConfigFile().location) + "privileges in the configuration in [%s]." % (target.__name__, node["host"], node["port"], config.ConfigFile().location) + ) + self.logger.exception(msg) + raise exceptions.SystemSetupError(msg) + except elasticsearch.exceptions.ConnectionTimeout: + if execution_count < max_execution_count: + self.logger.debug("Connection timeout in attempt [%d/%d].", execution_count, max_execution_count) + time.sleep(time_to_sleep) + else: + operation = target.__name__ + self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count) + node = self._client.transport.hosts[0] + msg = ( + "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on " + "host [%s] at port [%s]." % (operation, node["host"], node["port"]) + ) + raise exceptions.RallyError(msg) + except elasticsearch.exceptions.ConnectionError: + node = self._client.transport.hosts[0] + msg = ( + "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]" + " or fix the configuration in [%s]." % (node["host"], node["port"], config.ConfigFile().location) ) self.logger.exception(msg) raise exceptions.SystemSetupError(msg) - except ApiError as e: - if e.status_code in (502, 503, 504, 429) and execution_count <= max_execution_count: + except elasticsearch.TransportError as e: + if e.status_code == 404 and e.error == "index_not_found_exception": + node = self._client.transport.hosts[0] + msg = ( + "The operation [%s] against your Elasticsearch metrics store on " + "host [%s] at port [%s] failed because index [%s] does not exist." + % (target.__name__, node["host"], node["port"], kwargs.get("index")) + ) + self.logger.exception(msg) + raise exceptions.RallyError(msg) + if e.status_code in (502, 503, 504, 429) and execution_count < max_execution_count: self.logger.debug( "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.", - e.error, + responses[e.status_code], e.status_code, execution_count, max_execution_count, @@ -183,24 +174,19 @@ def guarded(self, target, *args, **kwargs): ) time.sleep(time_to_sleep) else: - node = self._client.transport.node_pool.get() + node = self._client.transport.hosts[0] msg = ( - "An error [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] " - "at port [%s]." % (e.error, target.__name__, node.host, node.port) + "A transport error occurred while running the operation [%s] against your Elasticsearch metrics store on " + "host [%s] at port [%s]." % (target.__name__, node["host"], node["port"]) ) self.logger.exception(msg) - # this does not necessarily mean it's a system setup problem... raise exceptions.RallyError(msg) - except TransportError as e: - node = self._client.transport.node_pool.get() - if e.errors: - err = e.errors - else: - err = e + except elasticsearch.exceptions.ElasticsearchException: + node = self._client.transport.hosts[0] msg = ( - "Transport error(s) [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on " - "host [%s] at port [%s]." % (err, target.__name__, node.host, node.port) + "An unknown error occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] " + "at port [%s]." % (target.__name__, node["host"], node["port"]) ) self.logger.exception(msg) # this does not necessarily mean it's a system setup problem... diff --git a/esrally/resources/logging.json b/esrally/resources/logging.json index ba559e235..b58721060 100644 --- a/esrally/resources/logging.json +++ b/esrally/resources/logging.json @@ -47,11 +47,6 @@ "handlers": ["rally_profile_handler"], "level": "INFO", "propagate": false - }, - "elastic_transport": { - "handlers": ["rally_log_handler"], - "level": "WARNING", - "propagate": false } } } diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 5d09bd412..7eecdc626 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -2284,7 +2284,7 @@ def on_benchmark_stop(self): for index in self.indices.split(","): self.logger.debug("Gathering disk usage for [%s]", index) try: - response = self.client.indices.disk_usage(index=index, run_expensive_tasks=True) + response = self.client.perform_request(method="POST", path=f"/{index}/_disk_usage", params={"run_expensive_tasks": "true"}) except elasticsearch.RequestError: msg = f"A transport error occurred while collecting disk usage for {index}" self.logger.exception(msg) @@ -2307,10 +2307,8 @@ def handle_telemetry_usage(self, response): self.logger.exception(msg) raise exceptions.RallyError(msg) + del response["_shards"] for index, idx_fields in response.items(): - if index == "_shards": - continue - for field, field_info in idx_fields["fields"].items(): meta = {"index": index, "field": field} self.metrics_store.put_value_cluster_level("disk_usage_total", field_info["total_in_bytes"], meta_data=meta, unit="byte") diff --git a/esrally/tracker/tracker.py b/esrally/tracker/tracker.py index cc0a30db5..d8daa06e1 100644 --- a/esrally/tracker/tracker.py +++ b/esrally/tracker/tracker.py @@ -18,7 +18,7 @@ import logging import os -from elastic_transport import ApiError, TransportError +from elasticsearch import ElasticsearchException from jinja2 import Environment, FileSystemLoader from esrally import PROGRAM_NAME @@ -42,7 +42,7 @@ def extract_indices_from_data_streams(client, data_streams_to_extract): for data_stream_name in data_streams_to_extract: try: indices += index.extract_indices_from_data_stream(client, data_stream_name) - except (ApiError, TransportError): + except ElasticsearchException: logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name) return indices @@ -56,7 +56,7 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract): for index_name in indices_to_extract: try: indices += index.extract(client, output_path, index_name) - except (ApiError, TransportError): + except ElasticsearchException: logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) # That list only contains valid indices (with index patterns already resolved) diff --git a/esrally/utils/opts.py b/esrally/utils/opts.py index a51a0f883..7b8d2b9e1 100644 --- a/esrally/utils/opts.py +++ b/esrally/utils/opts.py @@ -171,51 +171,6 @@ def __init__(self, argvalue): self.parse_options() - @classmethod - def _normalize_hosts(cls, hosts): - # pylint: disable=import-outside-toplevel - from urllib.parse import unquote, urlparse - - string_types = str, bytes - # if hosts are empty, just defer to defaults down the line - if hosts is None: - return [{}] - - # passed in just one string - if isinstance(hosts, string_types): - hosts = [hosts] - - out = [] - # normalize hosts to dicts - for host in hosts: - if isinstance(host, string_types): - if "://" not in host: - host = "//%s" % host - - parsed_url = urlparse(host) - h = {"host": parsed_url.hostname} - - if parsed_url.port: - h["port"] = parsed_url.port - - if parsed_url.scheme == "https": - h["port"] = parsed_url.port or 443 - h["use_ssl"] = True - - if parsed_url.username or parsed_url.password: - h["http_auth"] = "%s:%s" % ( - unquote(parsed_url.username), - unquote(parsed_url.password), - ) - - if parsed_url.path and parsed_url.path != "/": - h["url_prefix"] = parsed_url.path - - out.append(h) - else: - out.append(host) - return out - def parse_options(self): def normalize_to_dict(arg): """ @@ -223,14 +178,12 @@ def normalize_to_dict(arg): This is needed to support backwards compatible --target-hosts for single clusters that are not defined as a json string or file. """ - return {TargetHosts.DEFAULT: self._normalize_hosts(arg)} + # pylint: disable=import-outside-toplevel + from elasticsearch.client import _normalize_hosts - parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) - p_opts_copy = parsed_options.copy() - for cluster_name, nodes in p_opts_copy.items(): - parsed_options[cluster_name] = self._normalize_hosts(nodes) + return {TargetHosts.DEFAULT: _normalize_hosts(arg)} - self.parsed_options = parsed_options + self.parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) @property def all_hosts(self): @@ -265,6 +218,7 @@ def parse_options(self): @staticmethod def normalize_to_dict(arg): + """ When --client-options is a non-json csv string (single cluster mode), return parsed client options as dict with "default" key diff --git a/it/__init__.py b/it/__init__.py index 9e6d5b4c8..f3d77b619 100644 --- a/it/__init__.py +++ b/it/__init__.py @@ -21,10 +21,8 @@ import os import platform import random -import shutil import socket import subprocess -import tempfile import time import pytest @@ -267,27 +265,3 @@ def setup_module(): def teardown_module(): ES_METRICS_STORE.stop() remove_integration_test_config() - - -# ensures that a fresh log file is available -@pytest.fixture(scope="function") -def fresh_log_file(): - cfg = ConfigFile(config_name=None) - log_file = os.path.join(cfg.rally_home, "logs", "rally.log") - - if os.path.exists(log_file): - bak = os.path.join(tempfile.mkdtemp(), "rally.log") - shutil.move(log_file, bak) - yield log_file - # append log lines to the original file and move it back to its original - with open(log_file) as src: - with open(bak, "a") as dst: - dst.write(src.read()) - shutil.move(bak, log_file) - else: - yield log_file - - -def check_log_line_present(log_file, text): - with open(log_file) as f: - return any(text in line for line in f) diff --git a/it/esrallyd_test.py b/it/esrallyd_test.py deleted file mode 100644 index b523fafa1..000000000 --- a/it/esrallyd_test.py +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import pytest - -import it - -# pylint: disable=unused-import -from it import fresh_log_file - - -@pytest.fixture(autouse=True) -def setup_esrallyd(): - it.wait_until_port_is_free(1900) - it.shell_cmd("esrallyd start --node-ip 127.0.0.1 --coordinator-ip 127.0.0.1") - yield - it.shell_cmd("esrallyd stop") - - -@it.rally_in_mem -def test_elastic_transport_module_does_not_log_at_info_level(cfg, fresh_log_file): - """ - The 'elastic_transport' module logs at 'INFO' by default and is _very_ noisy, so we explicitly set the threshold to - 'WARNING' to avoid perturbing benchmarking results due to the high volume of logging calls by the client itself. - - Unfortunately, due to the underlying double-fork behaviour of the ActorSystem, it's possible for this module's logger - threshold to be overriden and reset to the default 'INFO' level via eager top level imports (i.e at the top of a module). - - Therefore we try to tightly control the imports of `elastic_transport` and `elasticsearch` throughout the codebase, but - it is very easy to reintroduce this 'bug' by simply putting the import statement in the 'wrong' spot, thus this IT - attempts to ensure this doesn't happen. - - See https://github.com/elastic/rally/pull/1669#issuecomment-1442783985 for more details. - """ - port = 19200 - it.wait_until_port_is_free(port_number=port) - dist = it.DISTRIBUTIONS[-1] - it.race( - cfg, - f'--distribution-version={dist} --track="geonames" --include-tasks=delete-index ' - f"--test-mode --car=4gheap,trial-license --target-hosts=127.0.0.1:{port} ", - ) - assert not it.check_log_line_present(fresh_log_file, "elastic_transport.transport INFO") diff --git a/it/proxy_test.py b/it/proxy_test.py index e38750221..7c73fb502 100644 --- a/it/proxy_test.py +++ b/it/proxy_test.py @@ -17,15 +17,14 @@ import collections import os +import shutil +import tempfile import pytest import it from esrally.utils import process -# pylint: disable=unused-import -from it import fresh_log_file - HttpProxy = collections.namedtuple("HttpProxy", ["authenticated_url", "anonymous_url"]) @@ -45,10 +44,34 @@ def http_proxy(): process.run_subprocess(f"docker stop {proxy_container_id}") +# ensures that a fresh log file is available +@pytest.fixture(scope="function") +def fresh_log_file(): + cfg = it.ConfigFile(config_name=None) + log_file = os.path.join(cfg.rally_home, "logs", "rally.log") + + if os.path.exists(log_file): + bak = os.path.join(tempfile.mkdtemp(), "rally.log") + shutil.move(log_file, bak) + yield log_file + # append log lines to the original file and move it back to its original + with open(log_file) as src: + with open(bak, "a") as dst: + dst.write(src.read()) + shutil.move(bak, log_file) + else: + yield log_file + + +def assert_log_line_present(log_file, text): + with open(log_file) as f: + assert any(text in line for line in f), f"Could not find [{text}] in [{log_file}]." + + @it.rally_in_mem def test_run_with_direct_internet_connection(cfg, http_proxy, fresh_log_file): assert it.esrally(cfg, "list tracks") == 0 - assert it.check_log_line_present(fresh_log_file, "Connecting directly to the Internet") + assert_log_line_present(fresh_log_file, "Connecting directly to the Internet") @it.rally_in_mem diff --git a/pyproject.toml b/pyproject.toml index 63fabb849..953dcfe6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,9 +42,7 @@ dependencies = [ # transitive dependencies: # urllib3: MIT # aiohttp: Apache 2.0 - - "elasticsearch[async]==8.6.1", - "elastic-transport==8.4.0", + "elasticsearch[async]==7.14.0", "urllib3==1.26.9", "docker==6.0.0", # License: BSD diff --git a/tests/client/common_test.py b/tests/client/common_test.py deleted file mode 100644 index dac449c60..000000000 --- a/tests/client/common_test.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from esrally.client import common - - -# pylint: disable=protected-access -def test_client_major_version_to_str(): - version = (8, 2, 0) - assert common._client_major_version_to_str(version) == "8" diff --git a/tests/client/factory_test.py b/tests/client/factory_test.py index 5cb9d0f56..42f3a993e 100644 --- a/tests/client/factory_test.py +++ b/tests/client/factory_test.py @@ -29,18 +29,13 @@ import pytest import trustme import urllib3.exceptions -from elastic_transport import ApiResponseMeta from pytest_httpserver import HTTPServer from esrally import client, doc_link, exceptions -from esrally.client.asynchronous import RallyAsyncTransport +from esrally.client.asynchronous import AIOHttpConnection, VerifiedAsyncTransport from esrally.utils import console -def _api_error(status, message): - return elasticsearch.ApiError(message, ApiResponseMeta(status=status, http_version="1.1", headers={}, duration=0.0, node=None), None) - - class TestEsClientFactory: cwd = os.path.dirname(__file__) @@ -52,9 +47,10 @@ def test_create_http_connection(self): f = client.EsClientFactory(hosts, client_options) - assert f.hosts == ["http://localhost:9200"] + assert f.hosts == hosts assert f.ssl_context is None - assert "basic_auth" not in f.client_options + assert f.client_options["scheme"] == "http" + assert "http_auth" not in f.client_options assert client_options == original_client_options @@ -64,7 +60,7 @@ def test_create_https_connection_verify_server(self, mocked_load_cert_chain): client_options = { "use_ssl": True, "verify_certs": True, - "basic_auth": ("user", "password"), + "http_auth": ("user", "password"), } # make a copy so we can verify later that the factory did not modify it original_client_options = deepcopy(client_options) @@ -84,13 +80,14 @@ def test_create_https_connection_verify_server(self, mocked_load_cert_chain): not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == ["https://localhost:9200"] + assert f.hosts == hosts assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["basic_auth"] == ("user", "password") - assert f.client_options["verify_certs"] + assert f.client_options["scheme"] == "https" + assert f.client_options["http_auth"] == ("user", "password") assert "use_ssl" not in f.client_options + assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert client_options == original_client_options @@ -101,7 +98,7 @@ def test_create_https_connection_verify_self_signed_server_and_client_certificat client_options = { "use_ssl": True, "verify_certs": True, - "basic_auth": ("user", "password"), + "http_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), "client_cert": os.path.join(self.cwd, "../utils/resources/certs/client.crt"), "client_key": os.path.join(self.cwd, "../utils/resources/certs/client.key"), @@ -125,14 +122,14 @@ def test_create_https_connection_verify_self_signed_server_and_client_certificat keyfile=client_options["client_key"], ) - assert f.hosts == ["https://localhost:9200"] + assert f.hosts == hosts assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["basic_auth"] == ("user", "password") - assert f.client_options["verify_certs"] - + assert f.client_options["scheme"] == "https" + assert f.client_options["http_auth"] == ("user", "password") assert "use_ssl" not in f.client_options + assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert "client_cert" not in f.client_options assert "client_key" not in f.client_options @@ -145,7 +142,7 @@ def test_create_https_connection_only_verify_self_signed_server_certificate(self client_options = { "use_ssl": True, "verify_certs": True, - "basic_auth": ("user", "password"), + "http_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), } # make a copy so we can verify later that the factory did not modify it @@ -165,13 +162,14 @@ def test_create_https_connection_only_verify_self_signed_server_certificate(self assert ( not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == ["https://localhost:9200"] + assert f.hosts == hosts assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["basic_auth"] == ("user", "password") - assert f.client_options["verify_certs"] + assert f.client_options["scheme"] == "https" + assert f.client_options["http_auth"] == ("user", "password") assert "use_ssl" not in f.client_options + assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert client_options == original_client_options @@ -181,7 +179,7 @@ def test_raises_error_when_only_one_of_client_cert_and_client_key_defined(self): client_options = { "use_ssl": True, "verify_certs": True, - "basic_auth": ("user", "password"), + "http_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), } @@ -236,14 +234,14 @@ def test_create_https_connection_unverified_certificate(self, mocked_load_cert_c not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == ["https://localhost:9200"] + assert f.hosts == hosts assert not f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_NONE - assert f.client_options["basic_auth"] == ("user", "password") - assert not f.client_options["verify_certs"] - + assert f.client_options["scheme"] == "https" + assert f.client_options["http_auth"] == ("user", "password") assert "use_ssl" not in f.client_options + assert "verify_certs" not in f.client_options assert "basic_auth_user" not in f.client_options assert "basic_auth_password" not in f.client_options @@ -255,7 +253,7 @@ def test_create_https_connection_unverified_certificate_present_client_certifica client_options = { "use_ssl": True, "verify_certs": False, - "basic_auth": ("user", "password"), + "http_auth": ("user", "password"), "client_cert": os.path.join(self.cwd, "../utils/resources/certs/client.crt"), "client_key": os.path.join(self.cwd, "../utils/resources/certs/client.key"), } @@ -277,13 +275,14 @@ def test_create_https_connection_unverified_certificate_present_client_certifica keyfile=client_options["client_key"], ) - assert f.hosts == ["https://localhost:9200"] + assert f.hosts == hosts assert not f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_NONE - assert f.client_options["basic_auth"] == ("user", "password") + assert f.client_options["scheme"] == "https" + assert f.client_options["http_auth"] == ("user", "password") assert "use_ssl" not in f.client_options - assert not f.client_options["verify_certs"] + assert "verify_certs" not in f.client_options assert "basic_auth_user" not in f.client_options assert "basic_auth_password" not in f.client_options assert "ca_certs" not in f.client_options @@ -315,7 +314,7 @@ def test_check_hostname_false_when_host_is_ip(self): } f = client.EsClientFactory(hosts, client_options) - assert f.hosts == ["https://127.0.0.1:9200"] + assert f.hosts == hosts assert f.ssl_context.check_hostname is False assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED @@ -326,7 +325,6 @@ def test_create_async_client_with_api_key_auth_override(self, es): "use_ssl": True, "verify_certs": True, "http_auth": ("user", "password"), - "max_connections": 600, } # make a copy so we can verify later that the factory did not modify it original_client_options = deepcopy(client_options) @@ -337,17 +335,16 @@ def test_create_async_client_with_api_key_auth_override(self, es): assert f.create_async(api_key=api_key) assert "http_auth" not in f.client_options assert f.client_options["api_key"] == api_key - assert client_options["max_connections"] == f.max_connections assert client_options == original_client_options es.assert_called_once_with( - distribution_version=None, - hosts=["https://localhost:9200"], - transport_class=RallyAsyncTransport, + hosts=hosts, + transport_class=VerifiedAsyncTransport, + connection_class=AIOHttpConnection, ssl_context=f.ssl_context, - maxsize=f.max_connections, - verify_certs=True, + scheme="https", serializer=f.client_options["serializer"], + trace_config=f.client_options["trace_config"], api_key=api_key, ) @@ -447,7 +444,10 @@ async def test_propagates_nested_context(self): class TestRestLayer: @mock.patch("elasticsearch.Elasticsearch") def test_successfully_waits_for_rest_layer(self, es): - es.transport.node_pool.__len__ = mock.Mock(return_value=2) + es.transport.hosts = [ + {"host": "node-a.example.org", "port": 9200}, + {"host": "node-b.example.org", "port": 9200}, + ] assert client.wait_for_rest_layer(es, max_attempts=3) es.cluster.health.assert_has_calls( [ @@ -460,10 +460,10 @@ def test_successfully_waits_for_rest_layer(self, es): @mock.patch("elasticsearch.Elasticsearch") def test_retries_on_transport_errors(self, es, sleep): es.cluster.health.side_effect = [ - _api_error(503, "Service Unavailable"), - _api_error(401, "Unauthorized"), - elasticsearch.TransportError("Connection timed out"), - elasticsearch.TransportError("Connection timed out"), + elasticsearch.TransportError(503, "Service Unavailable"), + elasticsearch.TransportError(401, "Unauthorized"), + elasticsearch.TransportError(408, "Timed Out"), + elasticsearch.TransportError(408, "Timed Out"), {"version": {"number": "5.0.0", "build_hash": "abc123"}}, ] assert client.wait_for_rest_layer(es, max_attempts=5) @@ -471,45 +471,18 @@ def test_retries_on_transport_errors(self, es, sleep): # don't sleep in realtime @mock.patch("time.sleep") @mock.patch("elasticsearch.Elasticsearch") - def test_dont_retry_eternally_on_api_errors(self, es, sleep): - es.cluster.health.side_effect = _api_error(401, "Unauthorized") - es.transport.node_pool = ["node_1"] - with pytest.raises(elasticsearch.ApiError, match=r"Unauthorized"): - client.wait_for_rest_layer(es, max_attempts=3) - es.cluster.health.assert_has_calls( - [mock.call(wait_for_nodes=">=1"), mock.call(wait_for_nodes=">=1"), mock.call(wait_for_nodes=">=1")] - ) - - @mock.patch("elasticsearch.Elasticsearch") - def test_ssl_serialization_error(self, es): - es.cluster.health.side_effect = elasticsearch.SerializationError(message="Client sent an HTTP request to an HTTPS server") - with pytest.raises( - exceptions.SystemSetupError, - match="Rally sent an HTTP request to an HTTPS server. Are you sure this is an HTTP endpoint?", - ): - client.wait_for_rest_layer(es, max_attempts=3) + def test_dont_retry_eternally_on_transport_errors(self, es, sleep): + es.cluster.health.side_effect = elasticsearch.TransportError(401, "Unauthorized") + assert not client.wait_for_rest_layer(es, max_attempts=3) @mock.patch("elasticsearch.Elasticsearch") - def test_connection_ssl_error(self, es): - es.cluster.health.side_effect = elasticsearch.SSLError( - message="SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1131)", - ) - with pytest.raises( - exceptions.SystemSetupError, - match="Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", - ): - client.wait_for_rest_layer(es, max_attempts=3) - - @mock.patch("elasticsearch.Elasticsearch") - def test_connection_protocol_error(self, es): + def test_ssl_error(self, es): es.cluster.health.side_effect = elasticsearch.ConnectionError( - message="N/A", - errors=[urllib3.exceptions.ProtocolError("Connection aborted.")], + "N/A", + "[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)", + urllib3.exceptions.SSLError("[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)"), ) - with pytest.raises( - exceptions.SystemSetupError, - match="Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", - ): + with pytest.raises(exceptions.SystemSetupError, match="Could not connect to cluster via https. Is this an https endpoint?"): client.wait_for_rest_layer(es, max_attempts=3) @@ -519,32 +492,32 @@ def test_successfully_creates_api_keys(self, es): client_id = 0 assert client.create_api_key(es, client_id, max_attempts=3) # even though max_attempts is 3, this should only be called once - es.security.create_api_key.assert_called_once_with(name=f"rally-client-{client_id}") + es.security.create_api_key.assert_called_once_with({"name": f"rally-client-{client_id}"}) @mock.patch("elasticsearch.Elasticsearch") def test_api_key_creation_fails_on_405_and_raises_system_setup_error(self, es): client_id = 0 - es.security.create_api_key.side_effect = _api_error(405, "Incorrect HTTP method") + es.security.create_api_key.side_effect = elasticsearch.TransportError(405, "Incorrect HTTP method") with pytest.raises( exceptions.SystemSetupError, match=re.escape("Got status code 405 when attempting to create API keys. Is Elasticsearch Security enabled?"), ): client.create_api_key(es, client_id, max_attempts=5) - es.security.create_api_key.assert_called_once_with(name=f"rally-client-{client_id}") + es.security.create_api_key.assert_called_once_with({"name": f"rally-client-{client_id}"}) @mock.patch("time.sleep") @mock.patch("elasticsearch.Elasticsearch") def test_retries_api_key_creation_on_transport_errors(self, es, sleep): client_id = 0 es.security.create_api_key.side_effect = [ - _api_error(503, "Service Unavailable"), - _api_error(401, "Unauthorized"), - elasticsearch.TransportError("Connection timed out"), - _api_error(500, "Internal Server Error"), + elasticsearch.TransportError(503, "Service Unavailable"), + elasticsearch.TransportError(401, "Unauthorized"), + elasticsearch.TransportError(408, "Timed Out"), + elasticsearch.TransportError(500, "Internal Server Error"), {"id": "abc", "name": f"rally-client-{client_id}", "api_key": "123"}, ] - calls = [mock.call(name="rally-client-0") for _ in range(5)] + calls = [mock.call({"name": "rally-client-0"}) for _ in range(5)] assert client.create_api_key(es, client_id, max_attempts=5) assert es.security.create_api_key.call_args_list == calls @@ -567,7 +540,7 @@ def test_successfully_deletes_api_keys(self, es, version): ] else: es.security.invalidate_api_key.return_value = {"invalidated_api_keys": ["foo", "bar", "baz"], "error_count": 0} - calls = [mock.call(ids=ids)] + calls = [mock.call({"ids": ids})] assert client.delete_api_keys(es, ids, max_attempts=3) assert es.security.invalidate_api_key.has_calls(calls, any_order=True) @@ -583,28 +556,28 @@ def test_retries_api_keys_deletion_on_transport_errors(self, es, sleep, version) es.security.invalidate_api_key.side_effect = [ {"invalidated_api_keys": ["foo"]}, {"invalidated_api_keys": ["bar"]}, - _api_error(401, "Unauthorized"), - _api_error(503, "Service Unavailable"), + elasticsearch.TransportError(401, "Unauthorized"), + elasticsearch.TransportError(503, "Service Unavailable"), {"invalidated_api_keys": ["baz"]}, ] calls = [ # foo and bar are deleted successfully, leaving only baz - mock.call(id="foo"), - mock.call(id="bar"), + mock.call({"id": "foo"}), + mock.call({"id": "bar"}), # two exceptions are thrown, so it should take 3 attempts to delete baz - mock.call(id="baz"), - mock.call(id="baz"), - mock.call(id="baz"), + mock.call({"id": "baz"}), + mock.call({"id": "baz"}), + mock.call({"id": "baz"}), ] else: es.security.invalidate_api_key.side_effect = [ - _api_error(503, "Service Unavailable"), - _api_error(401, "Unauthorized"), - elasticsearch.TransportError("Connection timed Out"), - _api_error(500, "Internal Server Error"), + elasticsearch.TransportError(503, "Service Unavailable"), + elasticsearch.TransportError(401, "Unauthorized"), + elasticsearch.TransportError(408, "Timed Out"), + elasticsearch.TransportError(500, "Internal Server Error"), {"invalidated_api_keys": ["foo", "bar", "baz"], "error_count": 0}, ] - calls = [mock.call(ids=ids) for _ in range(max_attempts)] + calls = [mock.call({"ids": ids}) for _ in range(max_attempts)] assert client.delete_api_keys(es, ids, max_attempts=max_attempts) assert es.security.invalidate_api_key.call_args_list == calls @@ -619,13 +592,13 @@ def test_raises_exception_when_api_key_deletion_fails(self, es, version): es.security.invalidate_api_key.side_effect = [ {"invalidated_api_keys": ["foo"]}, {"invalidated_api_keys": ["bar"]}, - _api_error(500, "Internal Server Error"), + elasticsearch.TransportError(500, "Internal Server Error"), ] calls = [ - mock.call(id="foo"), - mock.call(id="bar"), - mock.call(id="baz"), + mock.call({"id": "foo"}), + mock.call({"id": "bar"}), + mock.call({"id": "baz"}), ] else: # Since there are two ways this version can fail, we interleave them @@ -634,21 +607,33 @@ def test_raises_exception_when_api_key_deletion_fails(self, es, version): "invalidated_api_keys": ["foo"], "error_count": 3, }, - _api_error(500, "Internal Server Error"), + elasticsearch.TransportError(500, "Internal Server Error"), { "invalidated_api_keys": ["bar"], "error_count": 2, }, - _api_error(500, "Internal Server Error"), + elasticsearch.TransportError(500, "Internal Server Error"), ] calls = [ - mock.call(ids=["foo", "bar", "baz", "qux"]), - mock.call(ids=["bar", "baz", "qux"]), - mock.call(ids=["bar", "baz", "qux"]), + mock.call({"ids": ["foo", "bar", "baz", "qux"]}), + mock.call({"ids": ["bar", "baz", "qux"]}), + mock.call({"ids": ["bar", "baz", "qux"]}), ] with pytest.raises(exceptions.RallyError, match=re.escape(f"Could not delete API keys with the following IDs: {failed_to_delete}")): client.delete_api_keys(es, ids, max_attempts=3) es.security.invalidate_api_key.assert_has_calls(calls) + + +class TestAsyncConnection: + @pytest.mark.asyncio + async def test_enable_cleanup_close(self): + connection = AIOHttpConnection() + # pylint: disable=protected-access + assert connection._enable_cleanup_closed is True + + connection = AIOHttpConnection(enable_cleanup_closed=False) + # pylint: disable=protected-access + assert connection._enable_cleanup_closed is False diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index b317cecb6..d40fab67e 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -23,7 +23,6 @@ from datetime import datetime from unittest import mock -import elastic_transport import elasticsearch import pytest @@ -1815,35 +1814,30 @@ async def test_execute_single_dict(self): async def test_execute_single_with_connection_error_always_aborts(self, on_error): es = None params = None - runner = mock.AsyncMock(side_effect=elasticsearch.ConnectionError(message="Connection error")) + # ES client uses pseudo-status "N/A" in this case... + runner = mock.AsyncMock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host", None)) with pytest.raises(exceptions.RallyAssertionError) as exc: await driver.execute_single(self.context_managed(runner), es, params, on_error=on_error) - assert exc.value.args[0] == "Request returned an error. Error type: transport, Description: Connection error" + assert exc.value.args[0] == "Request returned an error. Error type: transport, Description: no route to host" @pytest.mark.asyncio async def test_execute_single_with_http_400_aborts_when_specified(self): es = None params = None - error_meta = elastic_transport.ApiResponseMeta(status=404, http_version="1.1", headers={}, duration=0.0, node=None) - runner = mock.AsyncMock( - side_effect=elasticsearch.NotFoundError(message="not found", meta=error_meta, body="the requested document could not be found") - ) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) with pytest.raises(exceptions.RallyAssertionError) as exc: await driver.execute_single(self.context_managed(runner), es, params, on_error="abort") assert exc.value.args[0] == ( - "Request returned an error. Error type: api, Description: not found (the requested document could not be found)" + "Request returned an error. Error type: transport, Description: not found (the requested document could not be found)" ) @pytest.mark.asyncio async def test_execute_single_with_http_400(self): es = None params = None - error_meta = elastic_transport.ApiResponseMeta(status=404, http_version="1.1", headers={}, duration=0.0, node=None) - runner = mock.AsyncMock( - side_effect=elasticsearch.NotFoundError(message="not found", meta=error_meta, body="the requested document could not be found") - ) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1851,7 +1845,7 @@ async def test_execute_single_with_http_400(self): assert unit == "ops" assert request_meta_data == { "http-status": 404, - "error-type": "api", + "error-type": "transport", "error-description": "not found (the requested document could not be found)", "success": False, } @@ -1860,8 +1854,7 @@ async def test_execute_single_with_http_400(self): async def test_execute_single_with_http_413(self): es = None params = None - error_meta = elastic_transport.ApiResponseMeta(status=413, http_version="1.1", headers={}, duration=0.0, node=None) - runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(message="", meta=error_meta, body="")) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(413, b"", b"")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1869,7 +1862,7 @@ async def test_execute_single_with_http_413(self): assert unit == "ops" assert request_meta_data == { "http-status": 413, - "error-type": "api", + "error-type": "transport", "error-description": "", "success": False, } diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index c34878764..7bf7589a1 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -24,12 +24,10 @@ import random from unittest import mock -import elastic_transport import elasticsearch import pytest from esrally import client, exceptions -from esrally.client.asynchronous import RallyAsyncElasticsearch from esrally.driver import runner @@ -1297,7 +1295,7 @@ async def test_force_merge_with_polling_no_timeout(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling(self, es): - es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) es.tasks.list = mock.AsyncMock( side_effect=[ { @@ -1344,7 +1342,7 @@ async def test_force_merge_with_polling(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_and_params(self, es): - es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) + es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout()) es.tasks.list = mock.AsyncMock( side_effect=[ { @@ -1645,7 +1643,7 @@ async def test_query_with_timeout_and_headers(self, es): es.perform_request.assert_awaited_once_with( method="GET", path="/_all/_search", - params={"request_cache": "true"}, + params={"request_timeout": 3.0, "request_cache": "true"}, body=params["body"], headers={"header1": "value1", "x-opaque-id": "test-id1"}, ) @@ -2206,7 +2204,7 @@ async def test_scroll_query_cannot_clear_scroll(self, es): } es.perform_request = mock.AsyncMock(return_value=io.BytesIO(json.dumps(search_response).encode())) - es.clear_scroll = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.clear_scroll = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) query_runner = runner.Query() @@ -2728,7 +2726,7 @@ class TestDeleteIndexRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_indices(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() @@ -2793,7 +2791,7 @@ class TestDeleteDataStreamRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_data_streams(self, es): - es.indices.exists = mock.AsyncMock(side_effect=[False, True]) + es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_data_stream = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2923,7 +2921,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_template = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_template = mock.AsyncMock() es.indices.delete = mock.AsyncMock() @@ -3053,8 +3051,8 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio - async def test_deletes_only_existing_component_templates(self, es): - es.cluster.exists_component_template = mock.AsyncMock(side_effect=[False, True]) + async def test_deletes_only_existing_index_templates(self, es): + es.cluster.get_component_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -3214,7 +3212,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.get_index_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) es.indices.delete_index_template = mock.AsyncMock() r = runner.DeleteComposableTemplate() @@ -3273,8 +3271,7 @@ async def test_create_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_create_ml_datafeed_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.put_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.put_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" body = {"job_id": "total-requests", "indices": ["server-metrics"]} @@ -3303,8 +3300,7 @@ async def test_delete_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_delete_ml_datafeed_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.delete_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.delete_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" @@ -3337,8 +3333,7 @@ async def test_start_ml_datafeed_with_body(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_start_ml_datafeed_with_body_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.start_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.start_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() body = {"end": "now"} params = {"datafeed-id": "some-data-feed", "body": body} @@ -3386,8 +3381,7 @@ async def test_stop_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_stop_ml_datafeed_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.stop_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.stop_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() params = { @@ -3438,8 +3432,7 @@ async def test_create_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_create_ml_job_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.put_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.put_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() body = { @@ -3481,8 +3474,7 @@ async def test_delete_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_delete_ml_job_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.delete_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.delete_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() job_id = "an-ml-job" @@ -3513,8 +3505,7 @@ async def test_open_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_open_ml_job_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.open_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.open_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() job_id = "an-ml-job" @@ -3545,8 +3536,7 @@ async def test_close_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_close_ml_job_fallback(self, es): - error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) - es.ml.close_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) + es.ml.close_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) es.perform_request = mock.AsyncMock() params = { @@ -3766,7 +3756,7 @@ async def test_create_snapshot_repository(self, es): await r(es, params) es.snapshot.create_repository.assert_called_once_with( - name="backups", body={"type": "fs", "settings": {"location": "/var/backups"}}, params={} + repository="backups", body={"type": "fs", "settings": {"location": "/var/backups"}}, params={} ) @@ -4159,7 +4149,7 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): "completion-recheck-wait-period": 0, } - es.snapshot.get = mock.AsyncMock( + es.perform_request = mock.AsyncMock( side_effect=[ { "snapshots": [ @@ -4202,9 +4192,14 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): r = runner.WaitForCurrentSnapshotsCreate() result = await r(es, task_params) - es.snapshot.get.assert_awaited_with(repository=repository, snapshot="_current", verbose=False, index_names=False) + es.perform_request.assert_awaited_with( + method="GET", + path=f"_snapshot/{repository}/_current", + headers={"Content-Type": "application/json"}, + params={"index_names": "false", "verbose": "false"}, + ) - assert es.snapshot.get.await_count == 2 + assert es.perform_request.await_count == 2 assert result is None @@ -5097,6 +5092,7 @@ async def test_transform_stats_with_non_existing_path(self, es): class TestCreateIlmPolicyRunner: + params = { "policy-name": "my-ilm-policy", "request-params": {"master_timeout": "30s", "timeout": "30s"}, @@ -5121,12 +5117,7 @@ async def test_create_ilm_policy_with_request_params(self, es): } es.ilm.put_lifecycle.assert_awaited_once_with( - name=self.params["policy-name"], - policy=self.params["body"]["policy"], - master_timeout=self.params["request-params"].get("master_timeout"), - timeout=self.params["request-params"].get("timeout"), - error_trace=None, - filter_path=None, + policy=self.params["policy-name"], body=self.params["body"], params=self.params["request-params"] ) @mock.patch("elasticsearch.Elasticsearch") @@ -5143,33 +5134,11 @@ async def test_create_ilm_policy_without_request_params(self, es): "success": True, } - es.ilm.put_lifecycle.assert_awaited_once_with( - name=params["policy-name"], - policy=self.params["body"]["policy"], - master_timeout=None, - timeout=None, - error_trace=None, - filter_path=None, - ) - - @mock.patch("esrally.client.asynchronous.IlmClient") - @pytest.mark.asyncio - async def test_RallyIlmClient_rewrites_kwargs(self, es_ilm): - es = RallyAsyncElasticsearch(hosts=["http://localhost:9200"]) - es_ilm.put_lifecycle = mock.AsyncMock(return_value={}) - - # simulating a custom runner that hasn't been refactored - # to suit the new 'elasticsearch-py' 8.x kwarg only method signature - await es.ilm.put_lifecycle("test-name", body=self.params["body"]) - - es_ilm.put_lifecycle.assert_awaited_once_with( - es.ilm, - policy=self.params["body"]["policy"], - name="test-name", - ) + es.ilm.put_lifecycle.assert_awaited_once_with(policy=params["policy-name"], body=params["body"], params={}) class TestDeleteIlmPolicyRunner: + params = {"policy-name": "my-ilm-policy", "request-params": {"master_timeout": "30s", "timeout": "30s"}} @mock.patch("elasticsearch.Elasticsearch") @@ -5184,13 +5153,7 @@ async def test_delete_ilm_policy_with_request_params(self, es): "success": True, } - es.ilm.delete_lifecycle.assert_awaited_once_with( - name=self.params["policy-name"], - master_timeout=self.params["request-params"].get("master_timeout"), - timeout=self.params["request-params"].get("timeout"), - error_trace=None, - filter_path=None, - ) + es.ilm.delete_lifecycle.assert_awaited_once_with(policy=self.params["policy-name"], params=self.params["request-params"]) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio @@ -5206,13 +5169,7 @@ async def test_delete_ilm_policy_without_request_params(self, es): "success": True, } - es.ilm.delete_lifecycle.assert_awaited_once_with( - name=self.params["policy-name"], - master_timeout=None, - timeout=None, - error_trace=None, - filter_path=None, - ) + es.ilm.delete_lifecycle.assert_awaited_once_with(policy=params["policy-name"], params={}) class TestSqlRunner: @@ -6792,10 +6749,10 @@ async def test_is_does_not_retry_on_success(self): async def test_retries_on_timeout_if_wanted_and_raises_if_no_recovery(self): delegate = mock.AsyncMock( side_effect=[ - elasticsearch.ConnectionError(message="no route to host"), - elasticsearch.ConnectionError(message="no route to host"), - elasticsearch.ConnectionError(message="no route to host"), - elasticsearch.ConnectionError(message="no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), ] ) es = None @@ -6819,7 +6776,7 @@ async def test_retries_on_timeout_if_wanted_and_returns_first_call(self): delegate = mock.AsyncMock( side_effect=[ - elasticsearch.ConnectionError(message="no route to host"), + elasticsearch.ConnectionError("N/A", "no route to host"), failed_return_value, ] ) @@ -6841,7 +6798,7 @@ async def test_retries_on_timeout_if_wanted_and_returns_first_call(self): @pytest.mark.asyncio async def test_retries_mixed_timeout_and_application_errors(self): - connection_error = elasticsearch.ConnectionError(message="no route to host") + connection_error = elasticsearch.ConnectionError("N/A", "no route to host") failed_return_value = {"weight": 1, "unit": "ops", "success": False} success_return_value = {"weight": 1, "unit": "ops", "success": False} diff --git a/tests/log_test.py b/tests/log_test.py deleted file mode 100644 index f78579512..000000000 --- a/tests/log_test.py +++ /dev/null @@ -1,55 +0,0 @@ -# Licensed to Elasticsearch B.V. under one or more contributor -# license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright -# ownership. Elasticsearch B.V. licenses this file to you under -# the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import copy -import json -import os -from unittest import mock -from unittest.mock import mock_open, patch - -import pytest - -from esrally import log - - -class TestLog: - def _load_logging_template(self, p): - with open(p) as f: - return json.load(f) - - @pytest.fixture(autouse=True) - def set_configuration(self): - p = os.path.join(os.path.join(os.path.dirname(__file__), "..", "esrally", "resources", "logging.json")) - self.configuration = self._load_logging_template(p) - - @mock.patch("json.load") - def test_add_missing_loggers_to_config_missing(self, mock_json_load, caplog): - source_template = copy.deepcopy(self.configuration) - existing_configuration = copy.deepcopy(self.configuration) - # change existing to differ from source template, showing that we don't overwrite any existing loggers config - existing_configuration["loggers"]["rally.profile"]["level"] = "DEBUG" - expected_configuration = json.dumps(copy.deepcopy(existing_configuration), indent=2) - # simulate user missing 'elastic_transport' in logging.json - del existing_configuration["loggers"]["elastic_transport"] - - # first loads template, then existing configuration - mock_json_load.side_effect = [source_template, existing_configuration] - - with patch("builtins.open", mock_open()) as mock_file: - log.add_missing_loggers_to_config() - handle = mock_file() - handle.write.assert_called_once_with(expected_configuration) diff --git a/tests/metrics_test.py b/tests/metrics_test.py index f94ca88cd..b15ccff17 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -24,7 +24,6 @@ import string import tempfile import uuid -from dataclasses import dataclass from unittest import mock import elasticsearch.exceptions @@ -84,28 +83,37 @@ def total_time(self): return 0 -class TestEsClient: - class NodeMock: - def __init__(self, host, port): - self.host = host - self.port = port +class TransportErrors: + err_return_codes = { + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 429: "Too Many Requests", + } - class NodePoolMock: - def __init__(self, hosts): - self.nodes = [] - for h in hosts: - self.nodes.append(TestEsClient.NodeMock(host=h["host"], port=h["port"])) + def __init__(self, max_err_responses=10): + self.max_err_responses = max_err_responses + # allow duplicates in list of error codes + self.rnd_err_codes = [random.choice(list(TransportErrors.err_return_codes)) for _ in range(self.max_err_responses)] - def get(self): - return self.nodes[0] + @property + def code_list(self): + return self.rnd_err_codes + + @property + def side_effects(self): + side_effect_list = [ + elasticsearch.exceptions.TransportError(rnd_code, TransportErrors.err_return_codes[rnd_code]) for rnd_code in self.rnd_err_codes + ] + side_effect_list.append("success") + return side_effect_list + + +class TestEsClient: class TransportMock: def __init__(self, hosts): - self.node_pool = TestEsClient.NodePoolMock(hosts) - - @dataclass - class ApiResponseMeta: - status: int + self.hosts = hosts class ClientMock: def __init__(self, hosts): @@ -160,110 +168,22 @@ def test_config_opts_parsing(self, client_esclientfactory, password_configuratio hosts=[{"host": _datastore_host, "port": _datastore_port}], client_options=expected_client_options ) - @mock.patch("random.random") - @mock.patch("esrally.time.sleep") - def test_retries_on_various_errors(self, mocked_sleep, mocked_random, caplog): - class ConnectionError: - def logging_statements(self, retries): - logging_statements = [] - for i, v in enumerate(range(retries)): - logging_statements.append( - "Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds." - % ( - "unit-test", - i + 1, - max_retry, - sleep_slots[v], - ) - ) - logging_statements.append( - "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [127.0.0.1] at " - f"port [9200] or fix the configuration in [{paths.rally_confdir()}/rally.ini]." - ) - return logging_statements - - def raise_error(self): - raise elasticsearch.exceptions.ConnectionError("unit-test") - - class ConnectionTimeout: - def logging_statements(self, retries): - logging_statements = [] - for i, v in enumerate(range(retries)): - logging_statements.append( - "Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds." - % ( - "unit-test", - i + 1, - max_retry, - sleep_slots[v], - ) - ) - logging_statements.append(f"Connection timeout while running [raise_error] (retried {retries} times).") - return logging_statements - - def raise_error(self): - raise elasticsearch.exceptions.ConnectionTimeout("unit-test") - - class ApiError: - def __init__(self, status_code): - self.status_code = status_code - - def logging_statements(self, retries): - logging_statements = [] - for i, v in enumerate(range(retries)): - logging_statements.append( - "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds." - % ( - "unit-test", - self.status_code, - i + 1, - max_retry, - sleep_slots[v], - ) - ) - logging_statements.append( - "An error [unit-test] occurred while running the operation [raise_error] against your Elasticsearch " - "metrics store on host [127.0.0.1] at port [9200]." - ) - return logging_statements - - def raise_error(self): - err = elasticsearch.exceptions.ApiError("unit-test", meta=TestEsClient.ApiResponseMeta(status=self.status_code), body={}) - raise err - - retriable_errors = [ApiError(429), ApiError(502), ApiError(503), ApiError(504), ConnectionError(), ConnectionTimeout()] - - max_retry = 10 - - # The sec to sleep for 10 transport errors is - # [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] ~> 17.05min in total - sleep_slots = [float(2**i) for i in range(0, max_retry)] - - # we want deterministic timings to assess logging statements - mocked_random.return_value = 0 + def test_raises_sytem_setup_error_on_connection_problems(self): + def raise_connection_error(): + raise elasticsearch.exceptions.ConnectionError("unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9200"}])) - exepcted_logger_calls = [] - expected_sleep_calls = [] - - for e in retriable_errors: - exepcted_logger_calls += e.logging_statements(max_retry) - expected_sleep_calls += [mock.call(int(sleep_slots[i])) for i in range(0, max_retry)] - - with pytest.raises(exceptions.RallyError): - with caplog.at_level(logging.DEBUG): - client.guarded(e.raise_error) - - actual_logger_calls = [r.message for r in caplog.records] - actual_sleep_calls = mocked_sleep.call_args_list - - assert actual_sleep_calls == expected_sleep_calls - assert actual_logger_calls == exepcted_logger_calls + with pytest.raises(exceptions.SystemSetupError) as ctx: + client.guarded(raise_connection_error) + assert ctx.value.args[0] == ( + "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [127.0.0.1] at " + f"port [9200] or fix the configuration in [{paths.rally_confdir()}/rally.ini]." + ) def test_raises_sytem_setup_error_on_authentication_problems(self): def raise_authentication_error(): - raise elasticsearch.exceptions.AuthenticationException(meta=None, body=None, message="unit-test") + raise elasticsearch.exceptions.AuthenticationException("unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) @@ -276,7 +196,7 @@ def raise_authentication_error(): def test_raises_sytem_setup_error_on_authorization_problems(self): def raise_authorization_error(): - raise elasticsearch.exceptions.AuthorizationException(meta=None, body=None, message="unit-test") + raise elasticsearch.exceptions.AuthorizationException("unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) @@ -290,15 +210,77 @@ def raise_authorization_error(): def test_raises_rally_error_on_unknown_problems(self): def raise_unknown_error(): - exc = elasticsearch.exceptions.TransportError(message="unit-test") - raise exc + raise elasticsearch.exceptions.SerializationError("unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) with pytest.raises(exceptions.RallyError) as ctx: client.guarded(raise_unknown_error) assert ctx.value.args[0] == ( - "Transport error(s) [unit-test] occurred while running the operation [raise_unknown_error] against your Elasticsearch metrics " + "An unknown error occurred while running the operation [raise_unknown_error] against your Elasticsearch metrics " + "store on host [127.0.0.1] at port [9243]." + ) + + def test_retries_on_various_transport_errors(self): + @mock.patch("random.random") + @mock.patch("esrally.time.sleep") + def test_transport_error_retries(side_effect, expected_logging_calls, expected_sleep_calls, mocked_sleep, mocked_random): + # should return on first success + operation = mock.Mock(side_effect=side_effect) + + # Disable additional randomization time in exponential backoff calls + mocked_random.return_value = 0 + + client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) + + logger = logging.getLogger("esrally.metrics") + with mock.patch.object(logger, "debug") as mocked_debug_logger: + test_result = client.guarded(operation) + mocked_sleep.assert_has_calls(expected_sleep_calls) + mocked_debug_logger.assert_has_calls(expected_logging_calls, any_order=True) + assert test_result == "success" + + max_retry = 10 + all_err_codes = TransportErrors.err_return_codes + transport_errors = TransportErrors(max_err_responses=max_retry) + rnd_err_codes = transport_errors.code_list + rnd_side_effects = transport_errors.side_effects + rnd_mocked_logger_calls = [] + + # The sec to sleep for 10 transport errors is + # [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] ~> 17.05min in total + sleep_slots = [float(2**i) for i in range(0, max_retry)] + mocked_sleep_calls = [mock.call(sleep_slots[i]) for i in range(0, max_retry)] + + for rnd_err_idx, rnd_err_code in enumerate(rnd_err_codes): + # List of logger.debug calls to expect + rnd_mocked_logger_calls.append( + mock.call( + "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.", + all_err_codes[rnd_err_code], + rnd_err_code, + rnd_err_idx + 1, + max_retry + 1, + sleep_slots[rnd_err_idx], + ) + ) + # pylint: disable=no-value-for-parameter + test_transport_error_retries(rnd_side_effects, rnd_mocked_logger_calls, mocked_sleep_calls) + + @mock.patch("esrally.time.sleep") + def test_fails_after_too_many_errors(self, mocked_sleep): + def random_transport_error(rnd_resp_code): + raise elasticsearch.exceptions.TransportError(rnd_resp_code, TransportErrors.err_return_codes[rnd_resp_code]) + + client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) + rnd_code = random.choice(list(TransportErrors.err_return_codes)) + + with pytest.raises(exceptions.RallyError) as ctx: + client.guarded(random_transport_error, rnd_code) + + assert ctx.value.args[0] == ( + "A transport error occurred while running the operation " + "[random_transport_error] against your Elasticsearch metrics " "store on host [127.0.0.1] at port [9243]." ) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 5d2498b4d..c77b2ef65 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -215,7 +215,7 @@ def __call__(self, *args, **kwargs): class TransportClient: - def __init__(self, response=None, force_error=False, error=elasticsearch.TransportError(message="transport error")): + def __init__(self, *, response=None, force_error=False, error=elasticsearch.TransportError): self._response = response self._force_error = force_error self._error = error @@ -4357,152 +4357,171 @@ class TestDiskUsageStats: def test_uses_indices_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - es.indices.disk_usage.assert_has_calls( - [ - call(index="foo", run_expensive_tasks=True), - call(index="bar", run_expensive_tasks=True), - ] - ) + assert tc.kwargs == [ + {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + ] @mock.patch("elasticsearch.Elasticsearch") def test_uses_data_streams_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=["foo", "bar"]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - es.indices.disk_usage.assert_has_calls( - [ - call(index="foo", run_expensive_tasks=True), - call(index="bar", run_expensive_tasks=True), - ] - ) + assert tc.kwargs == [ + {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + ] @mock.patch("elasticsearch.Elasticsearch") def test_uses_indices_param_if_specified_instead_of_index_names(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) device = telemetry.DiskUsageStats( {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=[] ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - es.indices.disk_usage.assert_has_calls( - [ - call(index="foo", run_expensive_tasks=True), - call(index="bar", run_expensive_tasks=True), - ] - ) + assert tc.kwargs == [ + {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + ] @mock.patch("elasticsearch.Elasticsearch") def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} + tc = TransportClient(response={"_shards": {"failed": 0}}) + es = Client(transport_client=tc) device = telemetry.DiskUsageStats( {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=[], data_stream_names=["baz"] ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - es.indices.disk_usage.assert_has_calls( - [ - call(index="foo", run_expensive_tasks=True), - call(index="bar", run_expensive_tasks=True), - ] - ) + assert tc.kwargs == [ + {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, + ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_error_on_retrieval_does_not_store_metrics(self, es, metrics_store_cluster_level, caplog): + def test_error_on_retrieval_does_not_store_metrics(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.side_effect = elasticsearch.RequestError(message="error", meta=None, body=None) + es = Client( + transport_client=TransportClient( + force_error=True, + error=elasticsearch.RequestError, + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): t.on_benchmark_stop() - assert "A transport error occurred while collecting disk usage for foo" in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_no_indices_fails(self, es, metrics_store_cluster_level, caplog): + def test_no_indices_fails(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} + es = Client( + transport_client=TransportClient( + force_error=True, + error=elasticsearch.RequestError, + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) with pytest.raises(exceptions.RallyError): t.on_benchmark_start() - msg = ( - "No indices defined for disk-usage-stats. Set disk-usage-stats-indices " - "telemetry param or add indices or data streams to the track config." - ) - assert msg in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_missing_all_fails(self, es, metrics_store_cluster_level, caplog): + def test_missing_all_fails(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.side_effect = elasticsearch.NotFoundError(message="error", meta=None, body=None) + es = Client( + transport_client=TransportClient( + force_error=True, + error=elasticsearch.RequestError, + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): t.on_benchmark_stop() - - assert "Requested disk usage for missing index foo" in caplog.text - assert "Requested disk usage for missing index bar" in caplog.text - assert "Couldn't find any indices for disk usage foo,bar" in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_some_mising_succeeds(self, es, metrics_store_cluster_level, caplog): + def test_some_mising_succeeds(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - not_found_response = elasticsearch.NotFoundError(message="error", meta=None, body=None) - successful_response = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_id": { - "total_in_bytes": 21079, - "inverted_index": {"total_in_bytes": 17110}, - "stored_fields_in_bytes": 3969, + + class TwoTransportClients: + def __init__(self, first, rest): + self.first = first + self.rest = rest + + def perform_request(self, *args, **kwargs): + if self.first: + first = self.first + self.first = None + return first.perform_request(args, kwargs) + else: + return self.rest.perform_request(args, kwargs) + + not_found_transport_client = TransportClient( + force_error=True, + error=elasticsearch.NotFoundError, + ) + successful_client = TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_id": { + "total_in_bytes": 21079, + "inverted_index": {"total_in_bytes": 17110}, + "stored_fields_in_bytes": 3969, + } } - } - }, - } - es.indices.disk_usage.side_effect = [not_found_response, successful_response] + }, + } + ) + + es = Client(transport_client=TwoTransportClients(not_found_transport_client, successful_client)) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert "Requested disk usage for missing index foo" in caplog.text assert metrics_store_cluster_level.call_count == 3 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_successful_shards(self, es, metrics_store_cluster_level): + def test_successful_shards(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"total": 1, "successful": 1, "failed": 0}, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"total": 1, "successful": 1, "failed": 0}, + } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4510,13 +4529,16 @@ def test_successful_shards(self, es, metrics_store_cluster_level): assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_unsuccessful_shards(self, es, metrics_store_cluster_level): + def test_unsuccessful_shards(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"total": 1, "successful": 0, "failed": 1, "failures": "hello there!"}, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"total": 1, "successful": 0, "failed": 1, "failures": "hello there!"}, + } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4525,21 +4547,24 @@ def test_unsuccessful_shards(self, es, metrics_store_cluster_level): assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_source(self, es, metrics_store_cluster_level): + def test_source(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_source": { - "total_in_bytes": 40676, - "stored_fields_in_bytes": 40676, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_source": { + "total_in_bytes": 40676, + "stored_fields_in_bytes": 40676, + } + } + }, } - }, - } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4550,22 +4575,25 @@ def test_source(self, es, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_id(self, es, metrics_store_cluster_level): + def test_id(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_id": { - "total_in_bytes": 21079, - "inverted_index": {"total_in_bytes": 17110}, - "stored_fields_in_bytes": 3969, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_id": { + "total_in_bytes": 21079, + "inverted_index": {"total_in_bytes": 17110}, + "stored_fields_in_bytes": 3969, + } + } + }, } - }, - } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4577,8 +4605,7 @@ def test_id(self, es, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_empty_field(self, es, metrics_store_cluster_level): + def test_empty_field(self, metrics_store_cluster_level): """ Fields like _primary_term commonly have take 0 bytes at all. But they are declared fields so we return their total size just so no one @@ -4586,20 +4613,24 @@ def test_empty_field(self, es, metrics_store_cluster_level): """ cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_primary_term": { - "total_in_bytes": 0, - "inverted_index": {"total_in_bytes": 0}, - "stored_fields_in_bytes": 0, - "doc_values_in_bytes": 0, - "points_in_bytes": 0, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_primary_term": { + "total_in_bytes": 0, + "inverted_index": {"total_in_bytes": 0}, + "stored_fields_in_bytes": 0, + "doc_values_in_bytes": 0, + "points_in_bytes": 0, + } + } + }, } - }, - } + ) + ) device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4609,22 +4640,25 @@ def test_empty_field(self, es, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_number(self, es, metrics_store_cluster_level): + def test_number(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "prcp": { - "total_in_bytes": 1498, - "doc_values_in_bytes": 748, - "points_in_bytes": 750, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "prcp": { + "total_in_bytes": 1498, + "doc_values_in_bytes": 748, + "points_in_bytes": 750, + } + } + }, } - }, - } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4636,22 +4670,25 @@ def test_number(self, es, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_keyword(self, es, metrics_store_cluster_level): + def test_keyword(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": { - "fields": { - "station.country_code": { - "total_in_bytes": 346, - "doc_values_in_bytes": 328, - "points_in_bytes": 18, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": { + "station.country_code": { + "total_in_bytes": 346, + "doc_values_in_bytes": 328, + "points_in_bytes": 18, + } + } + }, } - }, - } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4663,14 +4700,19 @@ def test_keyword(self, es, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - @mock.patch("elasticsearch.Elasticsearch") - def test_indexed_vector(self, es, metrics_store_cluster_level): + def test_indexed_vector(self, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es.indices.disk_usage.return_value = { - "_shards": {"failed": 0}, - "foo": {"fields": {"title_vector": {"total_in_bytes": 64179820, "doc_values_in_bytes": 0, "knn_vectors_in_bytes": 64179820}}}, - } + es = Client( + transport_client=TransportClient( + response={ + "_shards": {"failed": 0}, + "foo": { + "fields": {"title_vector": {"total_in_bytes": 64179820, "doc_values_in_bytes": 0, "knn_vectors_in_bytes": 64179820}} + }, + } + ) + ) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() diff --git a/tests/tracker/corpus_test.py b/tests/tracker/corpus_test.py index 6ff514217..007fd833d 100644 --- a/tests/tracker/corpus_test.py +++ b/tests/tracker/corpus_test.py @@ -31,9 +31,6 @@ def test_extract(client, mo): doc = {"field1": "stuff", "field2": "things"} doc_data = serialize_doc(doc) client.count.return_value = {"count": 1001} - # the scan helper calls client.options(), which returns a new client instance - # we override this behavior here to facilitate mocking - client.options.return_value = client client.search.return_value = { "_scroll_id": "uohialjrknf", "_shards": { diff --git a/tests/utils/opts_test.py b/tests/utils/opts_test.py index e3c6a057c..ed9dfbe4a 100644 --- a/tests/utils/opts_test.py +++ b/tests/utils/opts_test.py @@ -182,6 +182,8 @@ def test_csv_hosts_parses(self): assert opts.TargetHosts(target_hosts).default == [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}] + assert opts.TargetHosts(target_hosts).default == [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}] + def test_jsonstring_parses_as_dict_of_clusters(self): target_hosts = ( '{"default": ["127.0.0.1:9200","10.17.0.5:19200"],' @@ -190,14 +192,14 @@ def test_jsonstring_parses_as_dict_of_clusters(self): ) assert opts.TargetHosts(target_hosts).all_hosts == { - "default": [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}], - "remote_1": [{"host": "88.33.22.15", "port": 19200}], - "remote_2": [{"host": "10.18.0.6", "port": 19200}, {"host": "10.18.0.7", "port": 19201}], + "default": ["127.0.0.1:9200", "10.17.0.5:19200"], + "remote_1": ["88.33.22.15:19200"], + "remote_2": ["10.18.0.6:19200", "10.18.0.7:19201"], } def test_json_file_parameter_parses(self): assert opts.TargetHosts(os.path.join(os.path.dirname(__file__), "resources", "target_hosts_1.json")).all_hosts == { - "default": [{"host": "127.0.0.1", "port": 9200, "use_ssl": True}, {"host": "10.127.0.3", "port": 19200}] + "default": ["127.0.0.1:9200", "10.127.0.3:19200"] } assert opts.TargetHosts(os.path.join(os.path.dirname(__file__), "resources", "target_hosts_2.json")).all_hosts == { @@ -275,7 +277,8 @@ def test_no_client_option_parses_to_default(self): def test_no_client_option_parses_to_default_with_multicluster(self): client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS - target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200", "10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}') + target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200,10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}') + assert opts.ClientOptions(client_options_string, target_hosts=target_hosts).default == {"timeout": 60} assert opts.ClientOptions(client_options_string, target_hosts=target_hosts).all_client_options == { diff --git a/tests/utils/resources/target_hosts_1.json b/tests/utils/resources/target_hosts_1.json index 2dfe79a77..c65ea169e 100644 --- a/tests/utils/resources/target_hosts_1.json +++ b/tests/utils/resources/target_hosts_1.json @@ -1 +1 @@ -{ "default": ["https://127.0.0.1:9200","10.127.0.3:19200"] } +{ "default": ["127.0.0.1:9200","10.127.0.3:19200"] }