diff --git a/docs/command_line_reference.rst b/docs/command_line_reference.rst index 4e96a1abb..b4595de87 100644 --- a/docs/command_line_reference.rst +++ b/docs/command_line_reference.rst @@ -765,34 +765,31 @@ Define a JSON file containing a list of objects with the following properties: * ``path``: A path or path pattern that should be matched. Only leading and trailing wildcards (``*``) are supported. A path containing only a wildcard acts matches any path. * ``body``: The respective response body. -* ``body-encoding``: Either ``raw`` or ``json``. Use ``json`` by default and ``raw`` for the operation-type ``bulk`` and ``search``. Here we define the necessary responses for a track that bulk-indexes data:: [ + { + "path": "/_cluster/settings", + "body": { + "transient": { + "action.destructive_requires_name": "true" + } + } + }, { "path": "*/_bulk", "body": { "errors": false, "took": 1 - }, - "body-encoding": "raw" + } }, { "path": "/_cluster/health*", "body": { "status": "green", "relocating_shards": 0 - }, - "body-encoding": "json" - }, - { - "path": "/_cluster/settings", - "body": { - "persistent": {}, - "transient": {} - }, - "body-encoding": "json" + } }, { "path": "/_all/_stats/_all", @@ -804,13 +801,11 @@ Here we define the necessary responses for a track that bulk-indexes data:: } } } - }, - "body-encoding": "json" + } }, { "path": "*", - "body": {}, - "body-encoding": "json" + "body": {} } ] diff --git a/docs/migrate.rst b/docs/migrate.rst index c33322a20..5401bfadd 100644 --- a/docs/migrate.rst +++ b/docs/migrate.rst @@ -4,6 +4,10 @@ Migration Guide Migrating to Rally 2.7.1 ------------------------ +Elasticsearch client logs are now captured by the `elastic_transport `_ logger +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Rally migrated to the 8.x version of the ``elasticsearch-py`` library which uses a new logger named ``elastic_transport``. Rally will automatically configure this logger to only emit logs of level ``WARNING`` and above, even if a past Rally version configured logging using the ``~./rally/logging.json`` file without that logger. + Snapshot repository plugins are no longer built from source ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/esrally/client/asynchronous.py b/esrally/client/asynchronous.py index e76f83995..7ca1c3f9b 100644 --- a/esrally/client/asynchronous.py +++ b/esrally/client/asynchronous.py @@ -18,18 +18,34 @@ import asyncio import json import logging -from typing import List, Optional +import warnings +from typing import Any, Iterable, List, Mapping, Optional import aiohttp -import elasticsearch from aiohttp import BaseConnector, RequestInfo from aiohttp.client_proto import ResponseHandler from aiohttp.helpers import BaseTimerContext +from elastic_transport import ( + AiohttpHttpNode, + ApiResponse, + AsyncTransport, + BinaryApiResponse, + HeadApiResponse, + ListApiResponse, + ObjectApiResponse, + TextApiResponse, +) +from elastic_transport.client_utils import DEFAULT +from elasticsearch import AsyncElasticsearch +from elasticsearch._async.client import IlmClient +from elasticsearch.compat import warn_stacklevel +from elasticsearch.exceptions import HTTP_EXCEPTIONS, ApiError, ElasticsearchWarning from multidict import CIMultiDict, CIMultiDictProxy from yarl import URL +from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query from esrally.client.context import RequestContextHolder -from esrally.utils import io +from esrally.utils import io, versions class StaticTransport: @@ -42,6 +58,9 @@ def is_closing(self): def close(self): self.closed = True + def abort(self): + self.close() + class StaticConnector(BaseConnector): async def _create_connection(self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout") -> ResponseHandler: @@ -106,21 +125,8 @@ async def start(self, connection: "Connection") -> "ClientResponse": self.status = 200 return self - async def text(self, encoding=None, errors="strict"): - return self.static_body - - -class RawClientResponse(aiohttp.ClientResponse): - """ - Returns the body as bytes object (instead of a str) to avoid decoding overhead. - """ - - async def text(self, encoding=None, errors="strict"): - """Read response payload and decode.""" - if self._body is None: - await self.read() - - return self._body + async def read(self): + return self.static_body.encode("utf-8") class ResponseMatcher: @@ -139,14 +145,7 @@ def __init__(self, responses): else: matcher = ResponseMatcher.equals(path) - body = response["body"] - body_encoding = response.get("body-encoding", "json") - if body_encoding == "raw": - body = json.dumps(body).encode("utf-8") - elif body_encoding == "json": - body = json.dumps(body) - else: - raise ValueError(f"Unknown body encoding [{body_encoding}] for path [{path}]") + body = json.dumps(response["body"]) self.responses.append((path, matcher, body)) @@ -185,94 +184,192 @@ def response(self, path): return body -class AIOHttpConnection(elasticsearch.AIOHttpConnection): - def __init__( - self, - host="localhost", - port=None, - http_auth=None, - use_ssl=False, - ssl_assert_fingerprint=None, - headers=None, - ssl_context=None, - http_compress=None, - cloud_id=None, - api_key=None, - opaque_id=None, - loop=None, - trace_config=None, - **kwargs, - ): - super().__init__( - host=host, - port=port, - http_auth=http_auth, - use_ssl=use_ssl, - ssl_assert_fingerprint=ssl_assert_fingerprint, - # provided to the base class via `maxsize` to keep base class state consistent despite Rally - # calling the attribute differently. - maxsize=kwargs.get("max_connections", 0), - headers=headers, - ssl_context=ssl_context, - http_compress=http_compress, - cloud_id=cloud_id, - api_key=api_key, - opaque_id=opaque_id, - loop=loop, - **kwargs, - ) - - self._trace_configs = [trace_config] if trace_config else None - self._enable_cleanup_closed = kwargs.get("enable_cleanup_closed", True) - - static_responses = kwargs.get("static_responses") - self.use_static_responses = static_responses is not None - - if self.use_static_responses: +class RallyAiohttpHttpNode(AiohttpHttpNode): + def __init__(self, config): + super().__init__(config) + self._loop = None + self.trace_configs = None + self.enable_cleanup_closed = None + self._static_responses = None + self._request_class = aiohttp.ClientRequest + self._response_class = aiohttp.ClientResponse + + @property + def static_responses(self): + return self._static_responses + + @static_responses.setter + def static_responses(self, static_responses): + self._static_responses = static_responses + if self._static_responses: # read static responses once and reuse them if not StaticRequest.RESPONSES: - with open(io.normalize_path(static_responses)) as f: + with open(io.normalize_path(self._static_responses)) as f: StaticRequest.RESPONSES = ResponseMatcher(json.load(f)) self._request_class = StaticRequest self._response_class = StaticResponse - else: - self._request_class = aiohttp.ClientRequest - self._response_class = RawClientResponse - async def _create_aiohttp_session(self): - if self.loop is None: - self.loop = asyncio.get_running_loop() + def _create_aiohttp_session(self): + if self._loop is None: + self._loop = asyncio.get_running_loop() - if self.use_static_responses: - connector = StaticConnector(limit=self._limit, enable_cleanup_closed=self._enable_cleanup_closed) + if self._static_responses: + connector = StaticConnector(limit_per_host=self._connections_per_node, enable_cleanup_closed=self.enable_cleanup_closed) else: connector = aiohttp.TCPConnector( - limit=self._limit, use_dns_cache=True, ssl=self._ssl_context, enable_cleanup_closed=self._enable_cleanup_closed + limit_per_host=self._connections_per_node, + use_dns_cache=True, + ssl=self._ssl_context, + enable_cleanup_closed=self.enable_cleanup_closed, ) self.session = aiohttp.ClientSession( headers=self.headers, auto_decompress=True, - loop=self.loop, + loop=self._loop, cookie_jar=aiohttp.DummyCookieJar(), request_class=self._request_class, response_class=self._response_class, connector=connector, - trace_configs=self._trace_configs, + trace_configs=self.trace_configs, ) -class VerifiedAsyncTransport(elasticsearch.AsyncTransport): +class RallyAsyncTransport(AsyncTransport): + def __init__(self, *args, **kwargs): + super().__init__(*args, node_class=RallyAiohttpHttpNode, **kwargs) + + +class RallyIlmClient(IlmClient): + async def put_lifecycle(self, *args, **kwargs): + """ + The 'elasticsearch-py' 8.x method signature renames the 'policy' param to 'name', and the previously so-called + 'body' param becomes 'policy' + """ + if args: + kwargs["name"] = args[0] + + if body := kwargs.pop("body", None): + kwargs["policy"] = body.get("policy", {}) + # pylint: disable=missing-kwoa + return await IlmClient.put_lifecycle(self, **kwargs) + + +class RallyAsyncElasticsearch(AsyncElasticsearch, RequestContextHolder): def __init__(self, *args, **kwargs): + distribution_version = kwargs.pop("distribution_version", None) super().__init__(*args, **kwargs) # skip verification at this point; we've already verified this earlier with the synchronous client. # The async client is used in the hot code path and we use customized overrides (such as that we don't # parse response bodies in some cases for performance reasons, e.g. when using the bulk API). self._verified_elasticsearch = True + if distribution_version: + self.distribution_version = versions.Version.from_string(distribution_version) + else: + self.distribution_version = None + + # some ILM method signatures changed in 'elasticsearch-py' 8.x, + # so we override method(s) here to provide BWC for any custom + # runners that aren't using the new kwargs + self.ilm = RallyIlmClient(self) + async def perform_request( + self, + method: str, + path: str, + *, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, str]] = None, + body: Optional[Any] = None, + ) -> ApiResponse[Any]: + # We need to ensure that we provide content-type and accept headers + if body is not None: + if headers is None: + headers = {"content-type": "application/json", "accept": "application/json"} + else: + if headers.get("content-type") is None: + headers["content-type"] = "application/json" + if headers.get("accept") is None: + headers["accept"] = "application/json" + + if headers: + request_headers = self._headers.copy() + request_headers.update(headers) + else: + request_headers = self._headers + + # Converts all parts of a Accept/Content-Type headers + # from application/X -> application/vnd.elasticsearch+X + # see https://github.com/elastic/elasticsearch/issues/51816 + if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): + _mimetype_header_to_compat("Accept", request_headers) + _mimetype_header_to_compat("Content-Type", request_headers) + + if params: + target = f"{path}?{_quote_query(params)}" + else: + target = path + + meta, resp_body = await self.transport.perform_request( + method, + target, + headers=request_headers, + body=body, + request_timeout=self._request_timeout, + max_retries=self._max_retries, + retry_on_status=self._retry_on_status, + retry_on_timeout=self._retry_on_timeout, + client_meta=self._client_meta, + ) + + # HEAD with a 404 is returned as a normal response + # since this is used as an 'exists' functionality. + if not (method == "HEAD" and meta.status == 404) and ( + not 200 <= meta.status < 299 + and (self._ignore_status is DEFAULT or self._ignore_status is None or meta.status not in self._ignore_status) + ): + message = str(resp_body) + + # If the response is an error response try parsing + # the raw Elasticsearch error before raising. + if isinstance(resp_body, dict): + try: + error = resp_body.get("error", message) + if isinstance(error, dict) and "type" in error: + error = error["type"] + message = error + except (ValueError, KeyError, TypeError): + pass + + raise HTTP_EXCEPTIONS.get(meta.status, ApiError)(message=message, meta=meta, body=resp_body) + + # 'Warning' headers should be reraised as 'ElasticsearchWarning' + if "warning" in meta.headers: + warning_header = (meta.headers.get("warning") or "").strip() + warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (warning_header,) + stacklevel = warn_stacklevel() + for warning_message in warning_messages: + warnings.warn( + warning_message, + category=ElasticsearchWarning, + stacklevel=stacklevel, + ) + + if method == "HEAD": + response = HeadApiResponse(meta=meta) + elif isinstance(resp_body, dict): + response = ObjectApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + elif isinstance(resp_body, list): + response = ListApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + elif isinstance(resp_body, str): + response = TextApiResponse( # type: ignore[assignment] + body=resp_body, + meta=meta, + ) + elif isinstance(resp_body, bytes): + response = BinaryApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + else: + response = ApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] -class RallyAsyncElasticsearch(elasticsearch.AsyncElasticsearch, RequestContextHolder): - def perform_request(self, *args, **kwargs): - kwargs["url"] = kwargs.pop("path") - return self.transport.perform_request(*args, **kwargs) + return response diff --git a/esrally/client/common.py b/esrally/client/common.py new file mode 100644 index 000000000..7702fb4de --- /dev/null +++ b/esrally/client/common.py @@ -0,0 +1,58 @@ +import re +from datetime import date, datetime +from typing import Any, Mapping + +from elastic_transport.client_utils import percent_encode +from elasticsearch import VERSION + + +def _client_major_version_to_str(version: tuple) -> str: + return str(version[0]) + + +_WARNING_RE = re.compile(r"\"([^\"]*)\"") +_COMPAT_MIMETYPE_TEMPLATE = "application/vnd.elasticsearch+%s; compatible-with=" + _client_major_version_to_str(VERSION) +_COMPAT_MIMETYPE_RE = re.compile(r"application/(json|x-ndjson|vnd\.mapbox-vector-tile)") +_COMPAT_MIMETYPE_SUB = _COMPAT_MIMETYPE_TEMPLATE % (r"\g<1>",) + + +def _mimetype_header_to_compat(header, request_headers): + # Converts all parts of a Accept/Content-Type headers + # from application/X -> application/vnd.elasticsearch+X + mimetype = request_headers.get(header, None) + if mimetype: + request_headers[header] = _COMPAT_MIMETYPE_RE.sub(_COMPAT_MIMETYPE_SUB, mimetype) + + +def _escape(value: Any) -> str: + """ + Escape a single value of a URL string or a query parameter. If it is a list + or tuple, turn it into a comma-separated string first. + """ + + # make sequences into comma-separated stings + if isinstance(value, (list, tuple)): + value = ",".join([_escape(item) for item in value]) + + # dates and datetimes into isoformat + elif isinstance(value, (date, datetime)): + value = value.isoformat() + + # make bools into true/false strings + elif isinstance(value, bool): + value = str(value).lower() + + elif isinstance(value, bytes): + return value.decode("utf-8", "surrogatepass") + + if not isinstance(value, str): + return str(value) + return value + + +def _quote(value: Any) -> str: + return percent_encode(_escape(value), ",*") + + +def _quote_query(query: Mapping[str, Any]) -> str: + return "&".join([f"{k}={_quote(v)}" for k, v in query.items()]) diff --git a/esrally/client/factory.py b/esrally/client/factory.py index f8a70f644..201fc0e53 100644 --- a/esrally/client/factory.py +++ b/esrally/client/factory.py @@ -19,7 +19,6 @@ import time import certifi -import urllib3 from urllib3.connection import is_ipaddress from esrally import doc_link, exceptions @@ -28,13 +27,22 @@ class EsClientFactory: """ - Abstracts how the Elasticsearch client is created. Intended for testing. + Abstracts how the Elasticsearch client is created and customizes the client for backwards + compatibility guarantees that are broader than the library's defaults. """ - def __init__(self, hosts, client_options): - self.hosts = hosts + def __init__(self, hosts, client_options, distribution_version=None): + def host_string(host): + # protocol can be set at either host or client opts level + protocol = "https" if client_options.get("use_ssl") or host.get("use_ssl") else "http" + return f"{protocol}://{host['host']}:{host['port']}" + + self.hosts = [host_string(h) for h in hosts] self.client_options = dict(client_options) self.ssl_context = None + # This attribute is necessary for the backwards-compatibility logic contained in + # RallySyncElasticsearch.perform_request() and RallyAsyncElasticsearch.perform_request(). + self.distribution_version = distribution_version self.logger = logging.getLogger(__name__) masked_client_options = dict(client_options) @@ -50,26 +58,24 @@ def __init__(self, hosts, client_options): import ssl self.logger.debug("SSL support: on") - self.client_options["scheme"] = "https" self.ssl_context = ssl.create_default_context( ssl.Purpose.SERVER_AUTH, cafile=self.client_options.pop("ca_certs", certifi.where()) ) - if not self.client_options.pop("verify_certs", True): + # We call get() here instead of pop() in order to pass verify_certs through as a kwarg + # to the elasticsearch.Elasticsearch constructor. Setting the ssl_context's verify_mode to + # ssl.CERT_NONE is insufficient with version 8.0+ of elasticsearch-py. + if not self.client_options.get("verify_certs", True): self.logger.debug("SSL certificate verification: off") # order matters to avoid ValueError: check_hostname needs a SSL context with either CERT_OPTIONAL or CERT_REQUIRED self.ssl_context.check_hostname = False self.ssl_context.verify_mode = ssl.CERT_NONE + self.client_options["ssl_show_warn"] = False self.logger.warning( - "User has enabled SSL but disabled certificate verification. This is dangerous but may be ok for a " - "benchmark. Disabling urllib warnings now to avoid a logging storm. " - "See https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings for details." + "User has enabled SSL but disabled certificate verification. This is dangerous but may be ok for a benchmark." ) - # disable: "InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly \ - # advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings" - urllib3.disable_warnings() else: # check_hostname should not be set when host is an IP address self.ssl_context.check_hostname = self._only_hostnames(hosts) @@ -105,9 +111,9 @@ def __init__(self, hosts, client_options): self.ssl_context.load_cert_chain(certfile=client_cert, keyfile=client_key) else: self.logger.debug("SSL support: off") - self.client_options["scheme"] = "http" if self._is_set(self.client_options, "create_api_key_per_client"): + self.client_options.pop("create_api_key_per_client") basic_auth_user = self.client_options.get("basic_auth_user", False) basic_auth_password = self.client_options.get("basic_auth_password", False) provided_auth = {"basic_auth_user": basic_auth_user, "basic_auth_password": basic_auth_password} @@ -127,8 +133,8 @@ def __init__(self, hosts, client_options): self.logger.debug("Automatic creation of client API keys: off") if self._is_set(self.client_options, "basic_auth_user") and self._is_set(self.client_options, "basic_auth_password"): + self.client_options["basic_auth"] = (self.client_options.pop("basic_auth_user"), self.client_options.pop("basic_auth_password")) self.logger.debug("HTTP basic authentication: on") - self.client_options["http_auth"] = (self.client_options.pop("basic_auth_user"), self.client_options.pop("basic_auth_password")) else: self.logger.debug("HTTP basic authentication: off") @@ -141,8 +147,12 @@ def __init__(self, hosts, client_options): else: self.logger.debug("HTTP compression: off") - if self._is_set(self.client_options, "enable_cleanup_closed"): - self.client_options["enable_cleanup_closed"] = convert.to_bool(self.client_options.pop("enable_cleanup_closed")) + self.enable_cleanup_closed = convert.to_bool(self.client_options.pop("enable_cleanup_closed", True)) + self.max_connections = max(256, self.client_options.pop("max_connections", 0)) + self.static_responses = self.client_options.pop("static_responses", None) + + if self._is_set(self.client_options, "timeout"): + self.client_options["request_timeout"] = self.client_options.pop("timeout") @staticmethod def _only_hostnames(hosts): @@ -170,7 +180,9 @@ def create(self): # pylint: disable=import-outside-toplevel from esrally.client.synchronous import RallySyncElasticsearch - return RallySyncElasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options) + return RallySyncElasticsearch( + distribution_version=self.distribution_version, hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options + ) def create_async(self, api_key=None): # pylint: disable=import-outside-toplevel @@ -180,18 +192,17 @@ def create_async(self, api_key=None): from elasticsearch.serializer import JSONSerializer from esrally.client.asynchronous import ( - AIOHttpConnection, RallyAsyncElasticsearch, - VerifiedAsyncTransport, + RallyAsyncTransport, ) class LazyJSONSerializer(JSONSerializer): - def loads(self, s): + def loads(self, data): meta = RallyAsyncElasticsearch.request_context.get() if "raw_response" in meta: - return io.BytesIO(s) + return io.BytesIO(data) else: - return super().loads(s) + return super().loads(data) async def on_request_start(session, trace_config_ctx, params): RallyAsyncElasticsearch.on_request_start() @@ -207,20 +218,30 @@ async def on_request_end(session, trace_config_ctx, params): # override the builtin JSON serializer self.client_options["serializer"] = LazyJSONSerializer() - self.client_options["trace_config"] = trace_config if api_key is not None: - self.client_options.pop("http_auth") + self.client_options.pop("http_auth", None) + self.client_options.pop("basic_auth", None) self.client_options["api_key"] = api_key - return RallyAsyncElasticsearch( + async_client = RallyAsyncElasticsearch( + distribution_version=self.distribution_version, hosts=self.hosts, - transport_class=VerifiedAsyncTransport, - connection_class=AIOHttpConnection, + transport_class=RallyAsyncTransport, ssl_context=self.ssl_context, + maxsize=self.max_connections, **self.client_options, ) + # the AsyncElasticsearch constructor automatically creates the corresponding NodeConfig objects, so we set + # their instance attributes after they've been instantiated + for node_connection in async_client.transport.node_pool.all(): + node_connection.trace_configs = [trace_config] + node_connection.enable_cleanup_closed = self.enable_cleanup_closed + node_connection.static_responses = self.static_responses + + return async_client + def wait_for_rest_layer(es, max_attempts=40): """ @@ -233,11 +254,19 @@ def wait_for_rest_layer(es, max_attempts=40): # assume that at least the hosts that we expect to contact should be available. Note that this is not 100% # bullet-proof as a cluster could have e.g. dedicated masters which are not contained in our list of target hosts # but this is still better than just checking for any random node's REST API being reachable. - expected_node_count = len(es.transport.hosts) + expected_node_count = len(es.transport.node_pool) logger = logging.getLogger(__name__) - for attempt in range(max_attempts): + attempt = 0 + while attempt <= max_attempts: + attempt += 1 # pylint: disable=import-outside-toplevel - import elasticsearch + from elastic_transport import ( + ApiError, + ConnectionError, + SerializationError, + TlsError, + TransportError, + ) try: # see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also @@ -245,19 +274,44 @@ def wait_for_rest_layer(es, max_attempts=40): es.cluster.health(wait_for_nodes=f">={expected_node_count}") logger.debug("REST API is available for >= [%s] nodes after [%s] attempts.", expected_node_count, attempt) return True - except elasticsearch.ConnectionError as e: - if "SSL: UNKNOWN_PROTOCOL" in str(e): - raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e) - logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt) - time.sleep(3) - except elasticsearch.TransportError as e: + except SerializationError as e: + if "Client sent an HTTP request to an HTTPS server" in str(e): + raise exceptions.SystemSetupError( + "Rally sent an HTTP request to an HTTPS server. Are you sure this is an HTTP endpoint?", e + ) + + if attempt <= max_attempts: + logger.debug("Got serialization error [%s] on attempt [%s]. Sleeping...", e, attempt) + time.sleep(3) + else: + raise + except TlsError as e: + raise exceptions.SystemSetupError("Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", e) + except ConnectionError as e: + if "ProtocolError" in str(e): + raise exceptions.SystemSetupError( + "Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", e + ) + + if attempt <= max_attempts: + logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt) + time.sleep(3) + else: + raise + except TransportError as e: + if attempt <= max_attempts: + logger.debug("Got transport error on attempt [%s]. Sleeping...", attempt) + time.sleep(3) + else: + raise + except ApiError as e: # cluster block, x-pack not initialized yet, our wait condition is not reached - if e.status_code in (503, 401, 408): - logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.status_code, attempt) + if e.status_code in (503, 401, 408) and attempt <= max_attempts: + logger.debug("Got status code [%s] on attempt [%s]. Sleeping...", e.message, attempt) time.sleep(3) else: - logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt) - raise e + logger.warning("Got unexpected status code [%s] on attempt [%s].", e.message, attempt) + raise return False @@ -278,9 +332,12 @@ def create_api_key(es, client_id, max_attempts=5): try: logger.debug("Creating ES API key for client ID [%s]", client_id) - return es.security.create_api_key({"name": f"rally-client-{client_id}"}) + return es.security.create_api_key(name=f"rally-client-{client_id}") except elasticsearch.TransportError as e: - if e.status_code == 405: + logger.debug("Got transport error [%s] on attempt [%s]. Sleeping...", str(e), attempt) + time.sleep(1) + except elasticsearch.ApiError as e: + if e.meta.status == 405: # We don't retry on 405 since it indicates a misconfigured benchmark candidate and isn't recoverable raise exceptions.SystemSetupError( "Got status code 405 when attempting to create API keys. Is Elasticsearch Security enabled?", e @@ -320,7 +377,7 @@ def raise_exception(failed_ids, cause=None): try: if current_version >= minimum_version: - resp = es.security.invalidate_api_key({"ids": remaining}) + resp = es.security.invalidate_api_key(ids=remaining) deleted += resp["invalidated_api_keys"] remaining = [i for i in ids if i not in deleted] # Like bulk indexing requests, we can get an HTTP 200, but the @@ -346,16 +403,16 @@ def raise_exception(failed_ids, cause=None): remaining = [i for i in ids if i not in deleted] if attempt < max_attempts: for i in remaining: - es.security.invalidate_api_key({"id": i}) + es.security.invalidate_api_key(id=i) deleted.append(i) else: if remaining: raise_exception(remaining) return True - except elasticsearch.TransportError as e: + except elasticsearch.ApiError as e: if attempt < max_attempts: - logger.debug("Got status code [%s] on attempt [%s] of [%s]. Sleeping...", e.status_code, attempt, max_attempts) + logger.debug("Got status code [%s] on attempt [%s] of [%s]. Sleeping...", e.meta.status, attempt, max_attempts) time.sleep(1) else: raise_exception(remaining, cause=e) diff --git a/esrally/client/synchronous.py b/esrally/client/synchronous.py index 90e3d8a5f..486ccb7a2 100644 --- a/esrally/client/synchronous.py +++ b/esrally/client/synchronous.py @@ -15,10 +15,231 @@ # specific language governing permissions and limitations # under the License. -import elasticsearch +import re +import warnings +from typing import Any, Iterable, Mapping, Optional +from elastic_transport import ( + ApiResponse, + BinaryApiResponse, + HeadApiResponse, + ListApiResponse, + ObjectApiResponse, + TextApiResponse, +) +from elastic_transport.client_utils import DEFAULT +from elasticsearch import Elasticsearch +from elasticsearch.compat import warn_stacklevel +from elasticsearch.exceptions import ( + HTTP_EXCEPTIONS, + ApiError, + ElasticsearchWarning, + UnsupportedProductError, +) -class RallySyncElasticsearch(elasticsearch.Elasticsearch): - def perform_request(self, *args, **kwargs): - kwargs["url"] = kwargs.pop("path") - return self.transport.perform_request(*args, **kwargs) +from esrally.client.common import _WARNING_RE, _mimetype_header_to_compat, _quote_query +from esrally.utils import versions + + +# This reproduces the product verification behavior of v7.14.0 of the client: +# https://github.com/elastic/elasticsearch-py/blob/v7.14.0/elasticsearch/transport.py#L606 +# +# As of v8.0.0, the client determines whether the server is Elasticsearch by checking +# whether HTTP responses contain the `X-elastic-product` header. If they do not, it raises +# an `UnsupportedProductError`. This header was only introduced in Elasticsearch 7.14.0, +# however, so the client will consider any version of ES prior to 7.14.0 unsupported due to +# responses not including it. +# +# Because Rally needs to support versions of ES >= 6.8.0, we resurrect the previous +# logic for determining the authenticity of the server, which does not rely exclusively +# on this header. +class _ProductChecker: + """Class which verifies we're connected to a supported product""" + + # States that can be returned from 'check_product' + SUCCESS = True + UNSUPPORTED_PRODUCT = 2 + UNSUPPORTED_DISTRIBUTION = 3 + + @classmethod + def raise_error(cls, state, meta, body): + # These states mean the product_check() didn't fail so do nothing. + if state in (None, True): + return + + if state == cls.UNSUPPORTED_DISTRIBUTION: + message = "The client noticed that the server is not a supported distribution of Elasticsearch" + else: # UNSUPPORTED_PRODUCT + message = "The client noticed that the server is not Elasticsearch and we do not support this unknown product" + raise UnsupportedProductError(message, meta=meta, body=body) + + @classmethod + def check_product(cls, headers, response): + # type: (dict[str, str], dict[str, str]) -> int + """Verifies that the server we're talking to is Elasticsearch. + Does this by checking HTTP headers and the deserialized + response to the 'info' API. Returns one of the states above. + """ + try: + version = response.get("version", {}) + version_number = tuple( + int(x) if x is not None else 999 for x in re.search(r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?", version["number"]).groups() + ) + except (KeyError, TypeError, ValueError, AttributeError): + # No valid 'version.number' field, effectively 0.0.0 + version = {} + version_number = (0, 0, 0) + + # Check all of the fields and headers for missing/valid values. + try: + bad_tagline = response.get("tagline", None) != "You Know, for Search" + bad_build_flavor = version.get("build_flavor", None) != "default" + bad_product_header = headers.get("x-elastic-product", None) != "Elasticsearch" + except (AttributeError, TypeError): + bad_tagline = True + bad_build_flavor = True + bad_product_header = True + + # 7.0-7.13 and there's a bad 'tagline' or unsupported 'build_flavor' + if (7, 0, 0) <= version_number < (7, 14, 0): + if bad_tagline: + return cls.UNSUPPORTED_PRODUCT + elif bad_build_flavor: + return cls.UNSUPPORTED_DISTRIBUTION + + elif ( + # No version or version less than 6.x + version_number < (6, 0, 0) + # 6.x and there's a bad 'tagline' + or ((6, 0, 0) <= version_number < (7, 0, 0) and bad_tagline) + # 7.14+ and there's a bad 'X-Elastic-Product' HTTP header + or ((7, 14, 0) <= version_number and bad_product_header) + ): + return cls.UNSUPPORTED_PRODUCT + + return True + + +class RallySyncElasticsearch(Elasticsearch): + def __init__(self, *args, **kwargs): + distribution_version = kwargs.pop("distribution_version", None) + super().__init__(*args, **kwargs) + self._verified_elasticsearch = None + + if distribution_version: + self.distribution_version = versions.Version.from_string(distribution_version) + else: + self.distribution_version = None + + def perform_request( + self, + method: str, + path: str, + *, + params: Optional[Mapping[str, Any]] = None, + headers: Optional[Mapping[str, str]] = None, + body: Optional[Any] = None, + ) -> ApiResponse[Any]: + # We need to ensure that we provide content-type and accept headers + if body is not None: + if headers is None: + headers = {"content-type": "application/json", "accept": "application/json"} + else: + if headers.get("content-type") is None: + headers["content-type"] = "application/json" + if headers.get("accept") is None: + headers["accept"] = "application/json" + + if headers: + request_headers = self._headers.copy() + request_headers.update(headers) + else: + request_headers = self._headers + + if self._verified_elasticsearch is None: + info = self.transport.perform_request(method="GET", target="/", headers=request_headers) + info_meta = info.meta + info_body = info.body + + if not 200 <= info_meta.status < 299: + raise HTTP_EXCEPTIONS.get(info_meta.status, ApiError)(message=str(info_body), meta=info_meta, body=info_body) + + self._verified_elasticsearch = _ProductChecker.check_product(info_meta.headers, info_body) + + if self._verified_elasticsearch is not True: + _ProductChecker.raise_error(self._verified_elasticsearch, info_meta, info_body) + + # Converts all parts of a Accept/Content-Type headers + # from application/X -> application/vnd.elasticsearch+X + # see https://github.com/elastic/elasticsearch/issues/51816 + if self.distribution_version is not None and self.distribution_version >= versions.Version.from_string("8.0.0"): + _mimetype_header_to_compat("Accept", request_headers) + _mimetype_header_to_compat("Content-Type", request_headers) + + if params: + target = f"{path}?{_quote_query(params)}" + else: + target = path + + meta, resp_body = self.transport.perform_request( + method, + target, + headers=request_headers, + body=body, + request_timeout=self._request_timeout, + max_retries=self._max_retries, + retry_on_status=self._retry_on_status, + retry_on_timeout=self._retry_on_timeout, + client_meta=self._client_meta, + ) + + # HEAD with a 404 is returned as a normal response + # since this is used as an 'exists' functionality. + if not (method == "HEAD" and meta.status == 404) and ( + not 200 <= meta.status < 299 + and (self._ignore_status is DEFAULT or self._ignore_status is None or meta.status not in self._ignore_status) + ): + message = str(resp_body) + + # If the response is an error response try parsing + # the raw Elasticsearch error before raising. + if isinstance(resp_body, dict): + try: + error = resp_body.get("error", message) + if isinstance(error, dict) and "type" in error: + error = error["type"] + message = error + except (ValueError, KeyError, TypeError): + pass + + raise HTTP_EXCEPTIONS.get(meta.status, ApiError)(message=message, meta=meta, body=resp_body) + + # 'Warning' headers should be reraised as 'ElasticsearchWarning' + if "warning" in meta.headers: + warning_header = (meta.headers.get("warning") or "").strip() + warning_messages: Iterable[str] = _WARNING_RE.findall(warning_header) or (warning_header,) + stacklevel = warn_stacklevel() + for warning_message in warning_messages: + warnings.warn( + warning_message, + category=ElasticsearchWarning, + stacklevel=stacklevel, + ) + + if method == "HEAD": + response = HeadApiResponse(meta=meta) + elif isinstance(resp_body, dict): + response = ObjectApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + elif isinstance(resp_body, list): + response = ListApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + elif isinstance(resp_body, str): + response = TextApiResponse( # type: ignore[assignment] + body=resp_body, + meta=meta, + ) + elif isinstance(resp_body, bytes): + response = BinaryApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + else: + response = ApiResponse(body=resp_body, meta=meta) # type: ignore[assignment] + + return response diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index e7212d118..a10d626e5 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -28,6 +28,7 @@ import time from dataclasses import dataclass from enum import Enum +from io import BytesIO from typing import Callable import thespian.actors @@ -597,13 +598,16 @@ def __init__(self, target, config, es_client_factory_class=client.EsClientFactor def create_es_clients(self): all_hosts = self.config.opts("client", "hosts").all_hosts + distribution_version = self.config.opts("mechanic", "distribution.version", mandatory=False) es = {} for cluster_name, cluster_hosts in all_hosts.items(): all_client_options = self.config.opts("client", "options").all_client_options cluster_client_options = dict(all_client_options[cluster_name]) # Use retries to avoid aborts on long living connections for telemetry devices - cluster_client_options["retry-on-timeout"] = True - es[cluster_name] = self.es_client_factory(cluster_hosts, cluster_client_options).create() + cluster_client_options["retry_on_timeout"] = True + es[cluster_name] = self.es_client_factory( + cluster_hosts, cluster_client_options, distribution_version=distribution_version + ).create() return es def prepare_telemetry(self, es, enable, index_names, data_stream_names): @@ -1728,12 +1732,14 @@ def _logging_exception_handler(self, loop, context): self.logger.error("Uncaught exception in event loop: %s", context) async def run(self): - def es_clients(client_id, all_hosts, all_client_options): + def es_clients(client_id, all_hosts, all_client_options, distribution_version): es = {} context = self.client_contexts.get(client_id) api_key = context.api_key for cluster_name, cluster_hosts in all_hosts.items(): - es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async(api_key=api_key) + es[cluster_name] = client.EsClientFactory( + cluster_hosts, all_client_options[cluster_name], distribution_version=distribution_version + ).create_async(api_key=api_key) return es if self.assertions_enabled: @@ -1750,7 +1756,12 @@ def es_clients(client_id, all_hosts, all_client_options): param_source = track.operation_parameters(self.track, task) params_per_task[task] = param_source schedule = schedule_for(task_allocation, params_per_task[task]) - es = es_clients(client_id, self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options")) + es = es_clients( + client_id, + self.cfg.opts("client", "hosts").all_hosts, + self.cfg.opts("client", "options"), + self.cfg.opts("mechanic", "distribution.version", mandatory=False), + ) clients.append(es) async_executor = AsyncExecutor( client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error) @@ -1981,20 +1992,47 @@ async def execute_single(runner, es, params, on_error): total_ops = 0 total_ops_unit = "ops" request_meta_data = {"success": False, "error-type": "transport"} - # The ES client will sometimes return string like "N/A" or "TIMEOUT" for connection errors. - if isinstance(e.status_code, int): - request_meta_data["http-status"] = e.status_code + # For the 'errors' attribute, errors are ordered from + # most recently raised (index=0) to least recently raised (index=N) + # + # If an HTTP status code is available with the error it + # will be stored under 'status'. If HTTP headers are available + # they are stored under 'headers'. + if e.errors: + if hasattr(e.errors[0], "status"): + request_meta_data["http-status"] = e.errors[0].status # connection timeout errors don't provide a helpful description if isinstance(e, elasticsearch.ConnectionTimeout): request_meta_data["error-description"] = "network connection timed out" - elif e.info: - request_meta_data["error-description"] = f"{e.error} ({e.info})" else: - if isinstance(e.error, bytes): - error_description = e.error.decode("utf-8") - else: - error_description = str(e.error) + error_description = e.message request_meta_data["error-description"] = error_description + except elasticsearch.ApiError as e: + total_ops = 0 + total_ops_unit = "ops" + request_meta_data = {"success": False, "error-type": "api"} + + if isinstance(e.error, bytes): + error_message = e.error.decode("utf-8") + elif isinstance(e.error, BytesIO): + error_message = e.error.read().decode("utf-8") + else: + error_message = e.error + + if isinstance(e.info, bytes): + error_body = e.info.decode("utf-8") + elif isinstance(e.info, BytesIO): + error_body = e.info.read().decode("utf-8") + else: + error_body = e.info + + if error_body: + error_message += f" ({error_body})" + error_description = error_message + + request_meta_data["error-description"] = error_description + if e.status_code: + request_meta_data["http-status"] = e.status_code except KeyError as e: logging.getLogger(__name__).exception("Cannot execute runner [%s]; most likely due to missing parameters.", str(runner)) msg = "Cannot execute [%s]. Provided parameters are: %s. Error: [%s]." % (str(runner), list(params.keys()), str(e)) diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 7c9d3655a..bd8167453 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -730,7 +730,7 @@ class NodeStats(Runner): async def __call__(self, es, params): request_timeout = params.get("request-timeout") - await es.nodes.stats(metric="_all", request_timeout=request_timeout) + await es.options(request_timeout=request_timeout).nodes.stats(metric="_all") def __repr__(self, *args, **kwargs): return "node-stats" @@ -848,6 +848,9 @@ def __init__(self): async def __call__(self, es, params): request_params, headers = self._transport_request_params(params) + request_timeout = request_params.pop("request_timeout", None) + if request_timeout is not None: + es.options(request_timeout=request_timeout) # Mandatory to ensure it is always provided. This is especially important when this runner is used in a # composite context where there is no actual parameter source and the entire request structure must be provided # by the composite's parameter source. @@ -1395,15 +1398,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete(index=index_name, params=request_params) ops += 1 - elif only_if_exists: - # here we use .get() and check for 404 instead of exists due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - get_response = await es.indices.get(index=index_name, ignore=[404]) - if not get_response.get("status") == 404: - self.logger.info("Index [%s] already exists. Deleting it.", index_name) - await es.indices.delete(index=index_name, params=request_params) - ops += 1 + elif only_if_exists and await es.indices.exists(index=index_name): + self.logger.info("Index [%s] already exists. Deleting it.", index_name) + await es.indices.delete(index=index_name, params=request_params) + ops += 1 finally: await set_destructive_requires_name(es, prior_destructive_setting) return { @@ -1432,15 +1430,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_data_stream(name=data_stream, ignore=[404], params=request_params) ops += 1 - elif only_if_exists: - # here we use .get() and check for 404 instead of exists due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - get_response = await es.indices.get(index=data_stream, ignore=[404]) - if not get_response.get("status") == 404: - self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) - await es.indices.delete_data_stream(name=data_stream, params=request_params) - ops += 1 + elif only_if_exists and await es.indices.exists(index=data_stream): + self.logger.info("Data stream [%s] already exists. Deleting it.", data_stream) + await es.indices.delete_data_stream(name=data_stream, params=request_params) + ops += 1 return { "weight": ops, @@ -1489,15 +1482,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists: - # here we use .get() and check for 404 instead of exists_component_template due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - component_template_exists = await es.cluster.get_component_template(name=template_name, ignore=[404]) - if not component_template_exists.get("status") == 404: - self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) - await es.cluster.delete_component_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists and await es.cluster.exists_component_template(name=template_name): + self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) + await es.cluster.delete_component_template(name=template_name, params=request_params) + ops_count += 1 return { "weight": ops_count, "unit": "ops", @@ -1544,15 +1532,10 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_index_template(name=template_name, params=request_params, ignore=[404]) ops_count += 1 - elif only_if_exists: - # here we use .get() and check for 404 instead of exists_index_template due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - index_template_exists = await es.indices.get_index_template(name=template_name, ignore=[404]) - if not index_template_exists.get("status") == 404: - self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) - await es.indices.delete_index_template(name=template_name, params=request_params) - ops_count += 1 + elif only_if_exists and await es.indices.exists_index_template(name=template_name): + self.logger.info("Composable Index template [%s] already exists. Deleting it.", template_name) + await es.indices.delete_index_template(name=template_name, params=request_params) + ops_count += 1 # ensure that we do not provide an empty index pattern by accident if delete_matching_indices and index_pattern: await es.indices.delete(index=index_pattern) @@ -1604,10 +1587,7 @@ async def __call__(self, es, params): if not only_if_exists: await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 - # here we use .get_template() and check for empty instead of exists_template due to a bug in some versions - # of elasticsearch-py/elastic-transport with HEAD calls. - # can change back once using elasticsearch-py >= 8.0.0 and elastic-transport >= 8.1.0 - elif only_if_exists and await es.indices.get_template(name=template_name, ignore=[404]): + elif only_if_exists and await es.indices.exists_template(name=template_name): self.logger.info("Index template [%s] already exists. Deleting it.", template_name) await es.indices.delete_template(name=template_name, params=request_params) ops_count += 1 @@ -1721,16 +1701,13 @@ async def __call__(self, es, params): body = mandatory(params, "body", self) try: await es.ml.put_datafeed(datafeed_id=datafeed_id, body=body) - except elasticsearch.TransportError as e: - # fallback to old path - if e.status_code == 400: - await es.perform_request( - method="PUT", - path=f"/_xpack/ml/datafeeds/{datafeed_id}", - body=body, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="PUT", + path=f"/_xpack/ml/datafeeds/{datafeed_id}", + body=body, + ) def __repr__(self, *args, **kwargs): return "create-ml-datafeed" @@ -1750,16 +1727,13 @@ async def __call__(self, es, params): try: # we don't want to fail if a datafeed does not exist, thus we ignore 404s. await es.ml.delete_datafeed(datafeed_id=datafeed_id, force=force, ignore=[404]) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - await es.perform_request( - method="DELETE", - path=f"/_xpack/ml/datafeeds/{datafeed_id}", - params={"force": escape(force), "ignore": 404}, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="DELETE", + path=f"/_xpack/ml/datafeeds/{datafeed_id}", + params={"force": escape(force), "ignore": 404}, + ) def __repr__(self, *args, **kwargs): return "delete-ml-datafeed" @@ -1781,16 +1755,13 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.start_datafeed(datafeed_id=datafeed_id, body=body, start=start, end=end, timeout=timeout) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - await es.perform_request( - method="POST", - path=f"/_xpack/ml/datafeeds/{datafeed_id}/_start", - body=body, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="POST", + path=f"/_xpack/ml/datafeeds/{datafeed_id}/_start", + body=body, + ) def __repr__(self, *args, **kwargs): return "start-ml-datafeed" @@ -1810,21 +1781,18 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.stop_datafeed(datafeed_id=datafeed_id, force=force, timeout=timeout) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - request_params = { - "force": escape(force), - } - if timeout: - request_params["timeout"] = escape(timeout) - await es.perform_request( - method="POST", - path=f"/_xpack/ml/datafeeds/{datafeed_id}/_stop", - params=request_params, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + request_params = { + "force": escape(force), + } + if timeout: + request_params["timeout"] = escape(timeout) + await es.perform_request( + method="POST", + path=f"/_xpack/ml/datafeeds/{datafeed_id}/_stop", + params=request_params, + ) def __repr__(self, *args, **kwargs): return "stop-ml-datafeed" @@ -1843,16 +1811,13 @@ async def __call__(self, es, params): body = mandatory(params, "body", self) try: await es.ml.put_job(job_id=job_id, body=body) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - await es.perform_request( - method="PUT", - path=f"/_xpack/ml/anomaly_detectors/{job_id}", - body=body, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="PUT", + path=f"/_xpack/ml/anomaly_detectors/{job_id}", + body=body, + ) def __repr__(self, *args, **kwargs): return "create-ml-job" @@ -1872,16 +1837,13 @@ async def __call__(self, es, params): # we don't want to fail if a job does not exist, thus we ignore 404s. try: await es.ml.delete_job(job_id=job_id, force=force, ignore=[404]) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - await es.perform_request( - method="DELETE", - path=f"/_xpack/ml/anomaly_detectors/{job_id}", - params={"force": escape(force), "ignore": 404}, - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="DELETE", + path=f"/_xpack/ml/anomaly_detectors/{job_id}", + params={"force": escape(force), "ignore": 404}, + ) def __repr__(self, *args, **kwargs): return "delete-ml-job" @@ -1899,15 +1861,12 @@ async def __call__(self, es, params): job_id = mandatory(params, "job-id", self) try: await es.ml.open_job(job_id=job_id) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - await es.perform_request( - method="POST", - path=f"/_xpack/ml/anomaly_detectors/{job_id}/_open", - ) - else: - raise e + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + await es.perform_request( + method="POST", + path=f"/_xpack/ml/anomaly_detectors/{job_id}/_open", + ) def __repr__(self, *args, **kwargs): return "open-ml-job" @@ -1927,22 +1886,19 @@ async def __call__(self, es, params): timeout = params.get("timeout") try: await es.ml.close_job(job_id=job_id, force=force, timeout=timeout) - except elasticsearch.TransportError as e: - # fallback to old path (ES < 7) - if e.status_code == 400: - request_params = { - "force": escape(force), - } - if timeout: - request_params["timeout"] = escape(timeout) + except elasticsearch.BadRequestError: + # TODO: remove the fallback to '_xpack' path when we drop support for Elasticsearch 6.8 + request_params = { + "force": escape(force), + } + if timeout: + request_params["timeout"] = escape(timeout) - await es.perform_request( - method="POST", - path=f"/_xpack/ml/anomaly_detectors/{job_id}/_close", - params=request_params, - ) - else: - raise e + await es.perform_request( + method="POST", + path=f"/_xpack/ml/anomaly_detectors/{job_id}/_close", + params=request_params, + ) def __repr__(self, *args, **kwargs): return "close-ml-job" @@ -2007,7 +1963,7 @@ class CreateSnapshotRepository(Runner): async def __call__(self, es, params): request_params = params.get("request-params", {}) await es.snapshot.create_repository( - repository=mandatory(params, "repository", repr(self)), body=mandatory(params, "body", repr(self)), params=request_params + name=mandatory(params, "repository", repr(self)), body=mandatory(params, "body", repr(self)), params=request_params ) def __repr__(self, *args, **kwargs): @@ -2097,32 +2053,15 @@ async def __call__(self, es, params): wait_period = params.get("completion-recheck-wait-period", 1) es_info = await es.info() es_version = Version.from_string(es_info["version"]["number"]) - api = es.snapshot.get request_args = {"repository": repository, "snapshot": "_current", "verbose": False} # significantly reduce response size when lots of snapshots have been taken # only available since ES 8.3.0 (https://github.com/elastic/elasticsearch/pull/86269) if (es_version.major, es_version.minor) >= (8, 3): - request_params, headers = self._transport_request_params(params) - headers["Content-Type"] = "application/json" - - request_params["index_names"] = "false" - request_params["verbose"] = "false" - - request_args = { - "method": "GET", - "path": f"_snapshot/{repository}/_current", - "headers": headers, - "params": request_params, - } - - # TODO: Switch to native es.snapshot.get once `index_names` becomes supported in - # `es.snapshot.get` of the elasticsearch-py client and we've upgraded the client in Rally, see: - # https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch.client.SnapshotClient.get - api = es.perform_request + request_args["index_names"] = False while True: - response = await api(**request_args) + response = await es.snapshot.get(**request_args) if int(response.get("total")) == 0: break @@ -2669,8 +2608,21 @@ class CreateIlmPolicy(Runner): async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) body = mandatory(params, "body", self) + policy = mandatory(body, "policy", self) request_params = params.get("request-params", {}) - await es.ilm.put_lifecycle(policy=policy_name, body=body, params=request_params) + error_trace = request_params.get("error_trace", None) + filter_path = request_params.get("filter_path", None) + master_timeout = request_params.get("master_timeout", None) + timeout = request_params.get("timeout", None) + + await es.ilm.put_lifecycle( + name=policy_name, + policy=policy, + error_trace=error_trace, + filter_path=filter_path, + master_timeout=master_timeout, + timeout=timeout, + ) return { "weight": 1, "unit": "ops", @@ -2690,7 +2642,14 @@ class DeleteIlmPolicy(Runner): async def __call__(self, es, params): policy_name = mandatory(params, "policy-name", self) request_params = params.get("request-params", {}) - await es.ilm.delete_lifecycle(policy=policy_name, params=request_params) + error_trace = request_params.get("error_trace", None) + filter_path = request_params.get("filter_path", None) + master_timeout = request_params.get("master_timeout", None) + timeout = request_params.get("timeout", None) + + await es.ilm.delete_lifecycle( + name=policy_name, error_trace=error_trace, filter_path=filter_path, master_timeout=master_timeout, timeout=timeout + ) return { "weight": 1, "unit": "ops", @@ -2743,7 +2702,6 @@ class Downsample(Runner): """ async def __call__(self, es, params): - request_params, request_headers = self._transport_request_params(params) fixed_interval = mandatory(params, "fixed-interval", self) @@ -2911,7 +2869,7 @@ async def __call__(self, es, params): if last_attempt or not retry_on_timeout: raise await asyncio.sleep(sleep_time) - except elasticsearch.exceptions.TransportError as e: + except elasticsearch.ApiError as e: if last_attempt or not retry_on_timeout: raise e @@ -2921,6 +2879,16 @@ async def __call__(self, es, params): else: raise e + except elasticsearch.exceptions.ConnectionTimeout as e: + if last_attempt or not retry_on_timeout: + raise e + + self.logger.info("[%s] has timed out. Retrying in [%.2f] seconds.", repr(self.delegate), sleep_time) + await asyncio.sleep(sleep_time) + except elasticsearch.exceptions.TransportError as e: + if last_attempt or not retry_on_timeout: + raise e + async def __aexit__(self, exc_type, exc_val, exc_tb): return await self.delegate.__aexit__(exc_type, exc_val, exc_tb) diff --git a/esrally/log.py b/esrally/log.py index 0bc8409e4..189a1b8c5 100644 --- a/esrally/log.py +++ b/esrally/log.py @@ -43,6 +43,40 @@ def log_config_path(): return os.path.join(paths.rally_confdir(), "logging.json") +def add_missing_loggers_to_config(): + """ + Ensures that any missing top level loggers in resources/logging.json are + appended to an existing log configuration + """ + + def _missing_loggers(source, target): + """ + Returns any top-level loggers present in 'source', but not in 'target' + :return: A dict of all loggers present in 'source', but not in 'target' + """ + missing_loggers = {} + for logger in source: + if logger in source and logger in target: + continue + else: + missing_loggers[logger] = source[logger] + return missing_loggers + + source_path = io.normalize_path(os.path.join(os.path.dirname(__file__), "resources", "logging.json")) + + with open(log_config_path(), encoding="UTF-8") as target: + with open(source_path, encoding="UTF-8") as src: + template = json.load(src) + existing_logging_config = json.load(target) + if missing_loggers := _missing_loggers(source=template["loggers"], target=existing_logging_config["loggers"]): + existing_logging_config["loggers"].update(missing_loggers) + updated_config = json.dumps(existing_logging_config, indent=2) + + if missing_loggers: + with open(log_config_path(), "w", encoding="UTF-8") as target: + target.write(updated_config) + + def install_default_log_config(): """ Ensures a log configuration file is present on this machine. The default @@ -63,6 +97,7 @@ def install_default_log_config(): log_path = io.escape_path(log_path) contents = src.read().replace("${LOG_PATH}", log_path) target.write(contents) + add_missing_loggers_to_config() io.ensure_dir(paths.logs()) diff --git a/esrally/metrics.py b/esrally/metrics.py index b6482de54..84dcd4762 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -30,7 +30,6 @@ import uuid import zlib from enum import Enum, IntEnum -from http.client import responses import tabulate @@ -59,7 +58,8 @@ def probe_version(self): raise exceptions.RallyError(msg) def put_template(self, name, template): - return self.guarded(self._client.indices.put_template, name=name, body=template) + tmpl = json.loads(template) + return self.guarded(self._client.indices.put_template, name=name, **tmpl) def template_exists(self, name): return self.guarded(self._client.indices.exists_template, name=name) @@ -103,70 +103,79 @@ def search(self, index, body): def guarded(self, target, *args, **kwargs): # pylint: disable=import-outside-toplevel import elasticsearch + from elastic_transport import ApiError, TransportError - max_execution_count = 11 + max_execution_count = 10 execution_count = 0 - while execution_count < max_execution_count: + while execution_count <= max_execution_count: time_to_sleep = 2**execution_count + random.random() execution_count += 1 try: return target(*args, **kwargs) + except elasticsearch.exceptions.ConnectionTimeout as e: + if execution_count <= max_execution_count: + self.logger.debug( + "Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.", + e.message, + execution_count, + max_execution_count, + time_to_sleep, + ) + time.sleep(time_to_sleep) + else: + operation = target.__name__ + self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count) + node = self._client.transport.node_pool.get() + msg = ( + "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on " + "host [%s] at port [%s]." % (operation, node.host, node.port) + ) + raise exceptions.RallyError(msg) + except elasticsearch.exceptions.ConnectionError as e: + if execution_count <= max_execution_count: + self.logger.debug( + "Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.", + e.message, + execution_count, + max_execution_count, + time_to_sleep, + ) + time.sleep(time_to_sleep) + else: + node = self._client.transport.node_pool.get() + msg = ( + "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]" + " or fix the configuration in [%s]." % (node.host, node.port, config.ConfigFile().location) + ) + self.logger.exception(msg) + # connection errors doesn't neccessarily mean it's during setup + raise exceptions.RallyError(msg) except elasticsearch.exceptions.AuthenticationException: # we know that it is just one host (see EsClientFactory) - node = self._client.transport.hosts[0] + node = self._client.transport.node_pool.get() msg = ( "The configured user could not authenticate against your Elasticsearch metrics store running on host [%s] at " "port [%s] (wrong password?). Please fix the configuration in [%s]." - % (node["host"], node["port"], config.ConfigFile().location) + % (node.host, node.port, config.ConfigFile().location) ) self.logger.exception(msg) raise exceptions.SystemSetupError(msg) except elasticsearch.exceptions.AuthorizationException: - node = self._client.transport.hosts[0] + node = self._client.transport.node_pool.get() msg = ( "The configured user does not have enough privileges to run the operation [%s] against your Elasticsearch metrics " "store running on host [%s] at port [%s]. Please adjust your x-pack configuration or specify a user with enough " - "privileges in the configuration in [%s]." % (target.__name__, node["host"], node["port"], config.ConfigFile().location) - ) - self.logger.exception(msg) - raise exceptions.SystemSetupError(msg) - except elasticsearch.exceptions.ConnectionTimeout: - if execution_count < max_execution_count: - self.logger.debug("Connection timeout in attempt [%d/%d].", execution_count, max_execution_count) - time.sleep(time_to_sleep) - else: - operation = target.__name__ - self.logger.exception("Connection timeout while running [%s] (retried %d times).", operation, max_execution_count) - node = self._client.transport.hosts[0] - msg = ( - "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on " - "host [%s] at port [%s]." % (operation, node["host"], node["port"]) - ) - raise exceptions.RallyError(msg) - except elasticsearch.exceptions.ConnectionError: - node = self._client.transport.hosts[0] - msg = ( - "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]" - " or fix the configuration in [%s]." % (node["host"], node["port"], config.ConfigFile().location) + "privileges in the configuration in [%s]." % (target.__name__, node.host, node.port, config.ConfigFile().location) ) self.logger.exception(msg) raise exceptions.SystemSetupError(msg) - except elasticsearch.TransportError as e: - if e.status_code == 404 and e.error == "index_not_found_exception": - node = self._client.transport.hosts[0] - msg = ( - "The operation [%s] against your Elasticsearch metrics store on " - "host [%s] at port [%s] failed because index [%s] does not exist." - % (target.__name__, node["host"], node["port"], kwargs.get("index")) - ) - self.logger.exception(msg) - raise exceptions.RallyError(msg) - if e.status_code in (502, 503, 504, 429) and execution_count < max_execution_count: + except ApiError as e: + if e.status_code in (502, 503, 504, 429) and execution_count <= max_execution_count: self.logger.debug( "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.", - responses[e.status_code], + e.error, e.status_code, execution_count, max_execution_count, @@ -174,19 +183,24 @@ def guarded(self, target, *args, **kwargs): ) time.sleep(time_to_sleep) else: - node = self._client.transport.hosts[0] + node = self._client.transport.node_pool.get() msg = ( - "A transport error occurred while running the operation [%s] against your Elasticsearch metrics store on " - "host [%s] at port [%s]." % (target.__name__, node["host"], node["port"]) + "An error [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] " + "at port [%s]." % (e.error, target.__name__, node.host, node.port) ) self.logger.exception(msg) + # this does not necessarily mean it's a system setup problem... raise exceptions.RallyError(msg) + except TransportError as e: + node = self._client.transport.node_pool.get() - except elasticsearch.exceptions.ElasticsearchException: - node = self._client.transport.hosts[0] + if e.errors: + err = e.errors + else: + err = e msg = ( - "An unknown error occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] " - "at port [%s]." % (target.__name__, node["host"], node["port"]) + "Transport error(s) [%s] occurred while running the operation [%s] against your Elasticsearch metrics store on " + "host [%s] at port [%s]." % (err, target.__name__, node.host, node.port) ) self.logger.exception(msg) # this does not necessarily mean it's a system setup problem... diff --git a/esrally/resources/logging.json b/esrally/resources/logging.json index b58721060..ba559e235 100644 --- a/esrally/resources/logging.json +++ b/esrally/resources/logging.json @@ -47,6 +47,11 @@ "handlers": ["rally_profile_handler"], "level": "INFO", "propagate": false + }, + "elastic_transport": { + "handlers": ["rally_log_handler"], + "level": "WARNING", + "propagate": false } } } diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 7eecdc626..5d09bd412 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -2284,7 +2284,7 @@ def on_benchmark_stop(self): for index in self.indices.split(","): self.logger.debug("Gathering disk usage for [%s]", index) try: - response = self.client.perform_request(method="POST", path=f"/{index}/_disk_usage", params={"run_expensive_tasks": "true"}) + response = self.client.indices.disk_usage(index=index, run_expensive_tasks=True) except elasticsearch.RequestError: msg = f"A transport error occurred while collecting disk usage for {index}" self.logger.exception(msg) @@ -2307,8 +2307,10 @@ def handle_telemetry_usage(self, response): self.logger.exception(msg) raise exceptions.RallyError(msg) - del response["_shards"] for index, idx_fields in response.items(): + if index == "_shards": + continue + for field, field_info in idx_fields["fields"].items(): meta = {"index": index, "field": field} self.metrics_store.put_value_cluster_level("disk_usage_total", field_info["total_in_bytes"], meta_data=meta, unit="byte") diff --git a/esrally/tracker/tracker.py b/esrally/tracker/tracker.py index d8daa06e1..cc0a30db5 100644 --- a/esrally/tracker/tracker.py +++ b/esrally/tracker/tracker.py @@ -18,7 +18,7 @@ import logging import os -from elasticsearch import ElasticsearchException +from elastic_transport import ApiError, TransportError from jinja2 import Environment, FileSystemLoader from esrally import PROGRAM_NAME @@ -42,7 +42,7 @@ def extract_indices_from_data_streams(client, data_streams_to_extract): for data_stream_name in data_streams_to_extract: try: indices += index.extract_indices_from_data_stream(client, data_stream_name) - except ElasticsearchException: + except (ApiError, TransportError): logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name) return indices @@ -56,7 +56,7 @@ def extract_mappings_and_corpora(client, output_path, indices_to_extract): for index_name in indices_to_extract: try: indices += index.extract(client, output_path, index_name) - except ElasticsearchException: + except (ApiError, TransportError): logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) # That list only contains valid indices (with index patterns already resolved) diff --git a/esrally/utils/opts.py b/esrally/utils/opts.py index 7b8d2b9e1..a51a0f883 100644 --- a/esrally/utils/opts.py +++ b/esrally/utils/opts.py @@ -171,6 +171,51 @@ def __init__(self, argvalue): self.parse_options() + @classmethod + def _normalize_hosts(cls, hosts): + # pylint: disable=import-outside-toplevel + from urllib.parse import unquote, urlparse + + string_types = str, bytes + # if hosts are empty, just defer to defaults down the line + if hosts is None: + return [{}] + + # passed in just one string + if isinstance(hosts, string_types): + hosts = [hosts] + + out = [] + # normalize hosts to dicts + for host in hosts: + if isinstance(host, string_types): + if "://" not in host: + host = "//%s" % host + + parsed_url = urlparse(host) + h = {"host": parsed_url.hostname} + + if parsed_url.port: + h["port"] = parsed_url.port + + if parsed_url.scheme == "https": + h["port"] = parsed_url.port or 443 + h["use_ssl"] = True + + if parsed_url.username or parsed_url.password: + h["http_auth"] = "%s:%s" % ( + unquote(parsed_url.username), + unquote(parsed_url.password), + ) + + if parsed_url.path and parsed_url.path != "/": + h["url_prefix"] = parsed_url.path + + out.append(h) + else: + out.append(host) + return out + def parse_options(self): def normalize_to_dict(arg): """ @@ -178,12 +223,14 @@ def normalize_to_dict(arg): This is needed to support backwards compatible --target-hosts for single clusters that are not defined as a json string or file. """ - # pylint: disable=import-outside-toplevel - from elasticsearch.client import _normalize_hosts + return {TargetHosts.DEFAULT: self._normalize_hosts(arg)} - return {TargetHosts.DEFAULT: _normalize_hosts(arg)} + parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) + p_opts_copy = parsed_options.copy() + for cluster_name, nodes in p_opts_copy.items(): + parsed_options[cluster_name] = self._normalize_hosts(nodes) - self.parsed_options = to_dict(self.argvalue, default_parser=normalize_to_dict) + self.parsed_options = parsed_options @property def all_hosts(self): @@ -218,7 +265,6 @@ def parse_options(self): @staticmethod def normalize_to_dict(arg): - """ When --client-options is a non-json csv string (single cluster mode), return parsed client options as dict with "default" key diff --git a/it/__init__.py b/it/__init__.py index f3d77b619..9e6d5b4c8 100644 --- a/it/__init__.py +++ b/it/__init__.py @@ -21,8 +21,10 @@ import os import platform import random +import shutil import socket import subprocess +import tempfile import time import pytest @@ -265,3 +267,27 @@ def setup_module(): def teardown_module(): ES_METRICS_STORE.stop() remove_integration_test_config() + + +# ensures that a fresh log file is available +@pytest.fixture(scope="function") +def fresh_log_file(): + cfg = ConfigFile(config_name=None) + log_file = os.path.join(cfg.rally_home, "logs", "rally.log") + + if os.path.exists(log_file): + bak = os.path.join(tempfile.mkdtemp(), "rally.log") + shutil.move(log_file, bak) + yield log_file + # append log lines to the original file and move it back to its original + with open(log_file) as src: + with open(bak, "a") as dst: + dst.write(src.read()) + shutil.move(bak, log_file) + else: + yield log_file + + +def check_log_line_present(log_file, text): + with open(log_file) as f: + return any(text in line for line in f) diff --git a/it/esrallyd_test.py b/it/esrallyd_test.py new file mode 100644 index 000000000..b523fafa1 --- /dev/null +++ b/it/esrallyd_test.py @@ -0,0 +1,56 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +import it + +# pylint: disable=unused-import +from it import fresh_log_file + + +@pytest.fixture(autouse=True) +def setup_esrallyd(): + it.wait_until_port_is_free(1900) + it.shell_cmd("esrallyd start --node-ip 127.0.0.1 --coordinator-ip 127.0.0.1") + yield + it.shell_cmd("esrallyd stop") + + +@it.rally_in_mem +def test_elastic_transport_module_does_not_log_at_info_level(cfg, fresh_log_file): + """ + The 'elastic_transport' module logs at 'INFO' by default and is _very_ noisy, so we explicitly set the threshold to + 'WARNING' to avoid perturbing benchmarking results due to the high volume of logging calls by the client itself. + + Unfortunately, due to the underlying double-fork behaviour of the ActorSystem, it's possible for this module's logger + threshold to be overriden and reset to the default 'INFO' level via eager top level imports (i.e at the top of a module). + + Therefore we try to tightly control the imports of `elastic_transport` and `elasticsearch` throughout the codebase, but + it is very easy to reintroduce this 'bug' by simply putting the import statement in the 'wrong' spot, thus this IT + attempts to ensure this doesn't happen. + + See https://github.com/elastic/rally/pull/1669#issuecomment-1442783985 for more details. + """ + port = 19200 + it.wait_until_port_is_free(port_number=port) + dist = it.DISTRIBUTIONS[-1] + it.race( + cfg, + f'--distribution-version={dist} --track="geonames" --include-tasks=delete-index ' + f"--test-mode --car=4gheap,trial-license --target-hosts=127.0.0.1:{port} ", + ) + assert not it.check_log_line_present(fresh_log_file, "elastic_transport.transport INFO") diff --git a/it/proxy_test.py b/it/proxy_test.py index 7c73fb502..e38750221 100644 --- a/it/proxy_test.py +++ b/it/proxy_test.py @@ -17,14 +17,15 @@ import collections import os -import shutil -import tempfile import pytest import it from esrally.utils import process +# pylint: disable=unused-import +from it import fresh_log_file + HttpProxy = collections.namedtuple("HttpProxy", ["authenticated_url", "anonymous_url"]) @@ -44,34 +45,10 @@ def http_proxy(): process.run_subprocess(f"docker stop {proxy_container_id}") -# ensures that a fresh log file is available -@pytest.fixture(scope="function") -def fresh_log_file(): - cfg = it.ConfigFile(config_name=None) - log_file = os.path.join(cfg.rally_home, "logs", "rally.log") - - if os.path.exists(log_file): - bak = os.path.join(tempfile.mkdtemp(), "rally.log") - shutil.move(log_file, bak) - yield log_file - # append log lines to the original file and move it back to its original - with open(log_file) as src: - with open(bak, "a") as dst: - dst.write(src.read()) - shutil.move(bak, log_file) - else: - yield log_file - - -def assert_log_line_present(log_file, text): - with open(log_file) as f: - assert any(text in line for line in f), f"Could not find [{text}] in [{log_file}]." - - @it.rally_in_mem def test_run_with_direct_internet_connection(cfg, http_proxy, fresh_log_file): assert it.esrally(cfg, "list tracks") == 0 - assert_log_line_present(fresh_log_file, "Connecting directly to the Internet") + assert it.check_log_line_present(fresh_log_file, "Connecting directly to the Internet") @it.rally_in_mem diff --git a/pyproject.toml b/pyproject.toml index 953dcfe6e..63fabb849 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,9 @@ dependencies = [ # transitive dependencies: # urllib3: MIT # aiohttp: Apache 2.0 - "elasticsearch[async]==7.14.0", + + "elasticsearch[async]==8.6.1", + "elastic-transport==8.4.0", "urllib3==1.26.9", "docker==6.0.0", # License: BSD diff --git a/tests/client/common_test.py b/tests/client/common_test.py new file mode 100644 index 000000000..dac449c60 --- /dev/null +++ b/tests/client/common_test.py @@ -0,0 +1,24 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from esrally.client import common + + +# pylint: disable=protected-access +def test_client_major_version_to_str(): + version = (8, 2, 0) + assert common._client_major_version_to_str(version) == "8" diff --git a/tests/client/factory_test.py b/tests/client/factory_test.py index 42f3a993e..5cb9d0f56 100644 --- a/tests/client/factory_test.py +++ b/tests/client/factory_test.py @@ -29,13 +29,18 @@ import pytest import trustme import urllib3.exceptions +from elastic_transport import ApiResponseMeta from pytest_httpserver import HTTPServer from esrally import client, doc_link, exceptions -from esrally.client.asynchronous import AIOHttpConnection, VerifiedAsyncTransport +from esrally.client.asynchronous import RallyAsyncTransport from esrally.utils import console +def _api_error(status, message): + return elasticsearch.ApiError(message, ApiResponseMeta(status=status, http_version="1.1", headers={}, duration=0.0, node=None), None) + + class TestEsClientFactory: cwd = os.path.dirname(__file__) @@ -47,10 +52,9 @@ def test_create_http_connection(self): f = client.EsClientFactory(hosts, client_options) - assert f.hosts == hosts + assert f.hosts == ["http://localhost:9200"] assert f.ssl_context is None - assert f.client_options["scheme"] == "http" - assert "http_auth" not in f.client_options + assert "basic_auth" not in f.client_options assert client_options == original_client_options @@ -60,7 +64,7 @@ def test_create_https_connection_verify_server(self, mocked_load_cert_chain): client_options = { "use_ssl": True, "verify_certs": True, - "http_auth": ("user", "password"), + "basic_auth": ("user", "password"), } # make a copy so we can verify later that the factory did not modify it original_client_options = deepcopy(client_options) @@ -80,14 +84,13 @@ def test_create_https_connection_verify_server(self, mocked_load_cert_chain): not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == hosts + assert f.hosts == ["https://localhost:9200"] assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["scheme"] == "https" - assert f.client_options["http_auth"] == ("user", "password") + assert f.client_options["basic_auth"] == ("user", "password") + assert f.client_options["verify_certs"] assert "use_ssl" not in f.client_options - assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert client_options == original_client_options @@ -98,7 +101,7 @@ def test_create_https_connection_verify_self_signed_server_and_client_certificat client_options = { "use_ssl": True, "verify_certs": True, - "http_auth": ("user", "password"), + "basic_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), "client_cert": os.path.join(self.cwd, "../utils/resources/certs/client.crt"), "client_key": os.path.join(self.cwd, "../utils/resources/certs/client.key"), @@ -122,14 +125,14 @@ def test_create_https_connection_verify_self_signed_server_and_client_certificat keyfile=client_options["client_key"], ) - assert f.hosts == hosts + assert f.hosts == ["https://localhost:9200"] assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["scheme"] == "https" - assert f.client_options["http_auth"] == ("user", "password") + assert f.client_options["basic_auth"] == ("user", "password") + assert f.client_options["verify_certs"] + assert "use_ssl" not in f.client_options - assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert "client_cert" not in f.client_options assert "client_key" not in f.client_options @@ -142,7 +145,7 @@ def test_create_https_connection_only_verify_self_signed_server_certificate(self client_options = { "use_ssl": True, "verify_certs": True, - "http_auth": ("user", "password"), + "basic_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), } # make a copy so we can verify later that the factory did not modify it @@ -162,14 +165,13 @@ def test_create_https_connection_only_verify_self_signed_server_certificate(self assert ( not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == hosts + assert f.hosts == ["https://localhost:9200"] assert f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED - assert f.client_options["scheme"] == "https" - assert f.client_options["http_auth"] == ("user", "password") + assert f.client_options["basic_auth"] == ("user", "password") + assert f.client_options["verify_certs"] assert "use_ssl" not in f.client_options - assert "verify_certs" not in f.client_options assert "ca_certs" not in f.client_options assert client_options == original_client_options @@ -179,7 +181,7 @@ def test_raises_error_when_only_one_of_client_cert_and_client_key_defined(self): client_options = { "use_ssl": True, "verify_certs": True, - "http_auth": ("user", "password"), + "basic_auth": ("user", "password"), "ca_certs": os.path.join(self.cwd, "../utils/resources/certs/ca.crt"), } @@ -234,14 +236,14 @@ def test_create_https_connection_unverified_certificate(self, mocked_load_cert_c not mocked_load_cert_chain.called ), "ssl_context.load_cert_chain should not have been called as we have not supplied client certs" - assert f.hosts == hosts + assert f.hosts == ["https://localhost:9200"] assert not f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_NONE - assert f.client_options["scheme"] == "https" - assert f.client_options["http_auth"] == ("user", "password") + assert f.client_options["basic_auth"] == ("user", "password") + assert not f.client_options["verify_certs"] + assert "use_ssl" not in f.client_options - assert "verify_certs" not in f.client_options assert "basic_auth_user" not in f.client_options assert "basic_auth_password" not in f.client_options @@ -253,7 +255,7 @@ def test_create_https_connection_unverified_certificate_present_client_certifica client_options = { "use_ssl": True, "verify_certs": False, - "http_auth": ("user", "password"), + "basic_auth": ("user", "password"), "client_cert": os.path.join(self.cwd, "../utils/resources/certs/client.crt"), "client_key": os.path.join(self.cwd, "../utils/resources/certs/client.key"), } @@ -275,14 +277,13 @@ def test_create_https_connection_unverified_certificate_present_client_certifica keyfile=client_options["client_key"], ) - assert f.hosts == hosts + assert f.hosts == ["https://localhost:9200"] assert not f.ssl_context.check_hostname assert f.ssl_context.verify_mode == ssl.CERT_NONE - assert f.client_options["scheme"] == "https" - assert f.client_options["http_auth"] == ("user", "password") + assert f.client_options["basic_auth"] == ("user", "password") assert "use_ssl" not in f.client_options - assert "verify_certs" not in f.client_options + assert not f.client_options["verify_certs"] assert "basic_auth_user" not in f.client_options assert "basic_auth_password" not in f.client_options assert "ca_certs" not in f.client_options @@ -314,7 +315,7 @@ def test_check_hostname_false_when_host_is_ip(self): } f = client.EsClientFactory(hosts, client_options) - assert f.hosts == hosts + assert f.hosts == ["https://127.0.0.1:9200"] assert f.ssl_context.check_hostname is False assert f.ssl_context.verify_mode == ssl.CERT_REQUIRED @@ -325,6 +326,7 @@ def test_create_async_client_with_api_key_auth_override(self, es): "use_ssl": True, "verify_certs": True, "http_auth": ("user", "password"), + "max_connections": 600, } # make a copy so we can verify later that the factory did not modify it original_client_options = deepcopy(client_options) @@ -335,16 +337,17 @@ def test_create_async_client_with_api_key_auth_override(self, es): assert f.create_async(api_key=api_key) assert "http_auth" not in f.client_options assert f.client_options["api_key"] == api_key + assert client_options["max_connections"] == f.max_connections assert client_options == original_client_options es.assert_called_once_with( - hosts=hosts, - transport_class=VerifiedAsyncTransport, - connection_class=AIOHttpConnection, + distribution_version=None, + hosts=["https://localhost:9200"], + transport_class=RallyAsyncTransport, ssl_context=f.ssl_context, - scheme="https", + maxsize=f.max_connections, + verify_certs=True, serializer=f.client_options["serializer"], - trace_config=f.client_options["trace_config"], api_key=api_key, ) @@ -444,10 +447,7 @@ async def test_propagates_nested_context(self): class TestRestLayer: @mock.patch("elasticsearch.Elasticsearch") def test_successfully_waits_for_rest_layer(self, es): - es.transport.hosts = [ - {"host": "node-a.example.org", "port": 9200}, - {"host": "node-b.example.org", "port": 9200}, - ] + es.transport.node_pool.__len__ = mock.Mock(return_value=2) assert client.wait_for_rest_layer(es, max_attempts=3) es.cluster.health.assert_has_calls( [ @@ -460,10 +460,10 @@ def test_successfully_waits_for_rest_layer(self, es): @mock.patch("elasticsearch.Elasticsearch") def test_retries_on_transport_errors(self, es, sleep): es.cluster.health.side_effect = [ - elasticsearch.TransportError(503, "Service Unavailable"), - elasticsearch.TransportError(401, "Unauthorized"), - elasticsearch.TransportError(408, "Timed Out"), - elasticsearch.TransportError(408, "Timed Out"), + _api_error(503, "Service Unavailable"), + _api_error(401, "Unauthorized"), + elasticsearch.TransportError("Connection timed out"), + elasticsearch.TransportError("Connection timed out"), {"version": {"number": "5.0.0", "build_hash": "abc123"}}, ] assert client.wait_for_rest_layer(es, max_attempts=5) @@ -471,18 +471,45 @@ def test_retries_on_transport_errors(self, es, sleep): # don't sleep in realtime @mock.patch("time.sleep") @mock.patch("elasticsearch.Elasticsearch") - def test_dont_retry_eternally_on_transport_errors(self, es, sleep): - es.cluster.health.side_effect = elasticsearch.TransportError(401, "Unauthorized") - assert not client.wait_for_rest_layer(es, max_attempts=3) + def test_dont_retry_eternally_on_api_errors(self, es, sleep): + es.cluster.health.side_effect = _api_error(401, "Unauthorized") + es.transport.node_pool = ["node_1"] + with pytest.raises(elasticsearch.ApiError, match=r"Unauthorized"): + client.wait_for_rest_layer(es, max_attempts=3) + es.cluster.health.assert_has_calls( + [mock.call(wait_for_nodes=">=1"), mock.call(wait_for_nodes=">=1"), mock.call(wait_for_nodes=">=1")] + ) + + @mock.patch("elasticsearch.Elasticsearch") + def test_ssl_serialization_error(self, es): + es.cluster.health.side_effect = elasticsearch.SerializationError(message="Client sent an HTTP request to an HTTPS server") + with pytest.raises( + exceptions.SystemSetupError, + match="Rally sent an HTTP request to an HTTPS server. Are you sure this is an HTTP endpoint?", + ): + client.wait_for_rest_layer(es, max_attempts=3) @mock.patch("elasticsearch.Elasticsearch") - def test_ssl_error(self, es): + def test_connection_ssl_error(self, es): + es.cluster.health.side_effect = elasticsearch.SSLError( + message="SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1131)", + ) + with pytest.raises( + exceptions.SystemSetupError, + match="Could not connect to cluster via HTTPS. Are you sure this is an HTTPS endpoint?", + ): + client.wait_for_rest_layer(es, max_attempts=3) + + @mock.patch("elasticsearch.Elasticsearch") + def test_connection_protocol_error(self, es): es.cluster.health.side_effect = elasticsearch.ConnectionError( - "N/A", - "[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)", - urllib3.exceptions.SSLError("[SSL: UNKNOWN_PROTOCOL] unknown protocol (_ssl.c:719)"), + message="N/A", + errors=[urllib3.exceptions.ProtocolError("Connection aborted.")], ) - with pytest.raises(exceptions.SystemSetupError, match="Could not connect to cluster via https. Is this an https endpoint?"): + with pytest.raises( + exceptions.SystemSetupError, + match="Received a protocol error. Are you sure you're using the correct scheme (HTTP or HTTPS)?", + ): client.wait_for_rest_layer(es, max_attempts=3) @@ -492,32 +519,32 @@ def test_successfully_creates_api_keys(self, es): client_id = 0 assert client.create_api_key(es, client_id, max_attempts=3) # even though max_attempts is 3, this should only be called once - es.security.create_api_key.assert_called_once_with({"name": f"rally-client-{client_id}"}) + es.security.create_api_key.assert_called_once_with(name=f"rally-client-{client_id}") @mock.patch("elasticsearch.Elasticsearch") def test_api_key_creation_fails_on_405_and_raises_system_setup_error(self, es): client_id = 0 - es.security.create_api_key.side_effect = elasticsearch.TransportError(405, "Incorrect HTTP method") + es.security.create_api_key.side_effect = _api_error(405, "Incorrect HTTP method") with pytest.raises( exceptions.SystemSetupError, match=re.escape("Got status code 405 when attempting to create API keys. Is Elasticsearch Security enabled?"), ): client.create_api_key(es, client_id, max_attempts=5) - es.security.create_api_key.assert_called_once_with({"name": f"rally-client-{client_id}"}) + es.security.create_api_key.assert_called_once_with(name=f"rally-client-{client_id}") @mock.patch("time.sleep") @mock.patch("elasticsearch.Elasticsearch") def test_retries_api_key_creation_on_transport_errors(self, es, sleep): client_id = 0 es.security.create_api_key.side_effect = [ - elasticsearch.TransportError(503, "Service Unavailable"), - elasticsearch.TransportError(401, "Unauthorized"), - elasticsearch.TransportError(408, "Timed Out"), - elasticsearch.TransportError(500, "Internal Server Error"), + _api_error(503, "Service Unavailable"), + _api_error(401, "Unauthorized"), + elasticsearch.TransportError("Connection timed out"), + _api_error(500, "Internal Server Error"), {"id": "abc", "name": f"rally-client-{client_id}", "api_key": "123"}, ] - calls = [mock.call({"name": "rally-client-0"}) for _ in range(5)] + calls = [mock.call(name="rally-client-0") for _ in range(5)] assert client.create_api_key(es, client_id, max_attempts=5) assert es.security.create_api_key.call_args_list == calls @@ -540,7 +567,7 @@ def test_successfully_deletes_api_keys(self, es, version): ] else: es.security.invalidate_api_key.return_value = {"invalidated_api_keys": ["foo", "bar", "baz"], "error_count": 0} - calls = [mock.call({"ids": ids})] + calls = [mock.call(ids=ids)] assert client.delete_api_keys(es, ids, max_attempts=3) assert es.security.invalidate_api_key.has_calls(calls, any_order=True) @@ -556,28 +583,28 @@ def test_retries_api_keys_deletion_on_transport_errors(self, es, sleep, version) es.security.invalidate_api_key.side_effect = [ {"invalidated_api_keys": ["foo"]}, {"invalidated_api_keys": ["bar"]}, - elasticsearch.TransportError(401, "Unauthorized"), - elasticsearch.TransportError(503, "Service Unavailable"), + _api_error(401, "Unauthorized"), + _api_error(503, "Service Unavailable"), {"invalidated_api_keys": ["baz"]}, ] calls = [ # foo and bar are deleted successfully, leaving only baz - mock.call({"id": "foo"}), - mock.call({"id": "bar"}), + mock.call(id="foo"), + mock.call(id="bar"), # two exceptions are thrown, so it should take 3 attempts to delete baz - mock.call({"id": "baz"}), - mock.call({"id": "baz"}), - mock.call({"id": "baz"}), + mock.call(id="baz"), + mock.call(id="baz"), + mock.call(id="baz"), ] else: es.security.invalidate_api_key.side_effect = [ - elasticsearch.TransportError(503, "Service Unavailable"), - elasticsearch.TransportError(401, "Unauthorized"), - elasticsearch.TransportError(408, "Timed Out"), - elasticsearch.TransportError(500, "Internal Server Error"), + _api_error(503, "Service Unavailable"), + _api_error(401, "Unauthorized"), + elasticsearch.TransportError("Connection timed Out"), + _api_error(500, "Internal Server Error"), {"invalidated_api_keys": ["foo", "bar", "baz"], "error_count": 0}, ] - calls = [mock.call({"ids": ids}) for _ in range(max_attempts)] + calls = [mock.call(ids=ids) for _ in range(max_attempts)] assert client.delete_api_keys(es, ids, max_attempts=max_attempts) assert es.security.invalidate_api_key.call_args_list == calls @@ -592,13 +619,13 @@ def test_raises_exception_when_api_key_deletion_fails(self, es, version): es.security.invalidate_api_key.side_effect = [ {"invalidated_api_keys": ["foo"]}, {"invalidated_api_keys": ["bar"]}, - elasticsearch.TransportError(500, "Internal Server Error"), + _api_error(500, "Internal Server Error"), ] calls = [ - mock.call({"id": "foo"}), - mock.call({"id": "bar"}), - mock.call({"id": "baz"}), + mock.call(id="foo"), + mock.call(id="bar"), + mock.call(id="baz"), ] else: # Since there are two ways this version can fail, we interleave them @@ -607,33 +634,21 @@ def test_raises_exception_when_api_key_deletion_fails(self, es, version): "invalidated_api_keys": ["foo"], "error_count": 3, }, - elasticsearch.TransportError(500, "Internal Server Error"), + _api_error(500, "Internal Server Error"), { "invalidated_api_keys": ["bar"], "error_count": 2, }, - elasticsearch.TransportError(500, "Internal Server Error"), + _api_error(500, "Internal Server Error"), ] calls = [ - mock.call({"ids": ["foo", "bar", "baz", "qux"]}), - mock.call({"ids": ["bar", "baz", "qux"]}), - mock.call({"ids": ["bar", "baz", "qux"]}), + mock.call(ids=["foo", "bar", "baz", "qux"]), + mock.call(ids=["bar", "baz", "qux"]), + mock.call(ids=["bar", "baz", "qux"]), ] with pytest.raises(exceptions.RallyError, match=re.escape(f"Could not delete API keys with the following IDs: {failed_to_delete}")): client.delete_api_keys(es, ids, max_attempts=3) es.security.invalidate_api_key.assert_has_calls(calls) - - -class TestAsyncConnection: - @pytest.mark.asyncio - async def test_enable_cleanup_close(self): - connection = AIOHttpConnection() - # pylint: disable=protected-access - assert connection._enable_cleanup_closed is True - - connection = AIOHttpConnection(enable_cleanup_closed=False) - # pylint: disable=protected-access - assert connection._enable_cleanup_closed is False diff --git a/tests/driver/driver_test.py b/tests/driver/driver_test.py index d40fab67e..b317cecb6 100644 --- a/tests/driver/driver_test.py +++ b/tests/driver/driver_test.py @@ -23,6 +23,7 @@ from datetime import datetime from unittest import mock +import elastic_transport import elasticsearch import pytest @@ -1814,30 +1815,35 @@ async def test_execute_single_dict(self): async def test_execute_single_with_connection_error_always_aborts(self, on_error): es = None params = None - # ES client uses pseudo-status "N/A" in this case... - runner = mock.AsyncMock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host", None)) + runner = mock.AsyncMock(side_effect=elasticsearch.ConnectionError(message="Connection error")) with pytest.raises(exceptions.RallyAssertionError) as exc: await driver.execute_single(self.context_managed(runner), es, params, on_error=on_error) - assert exc.value.args[0] == "Request returned an error. Error type: transport, Description: no route to host" + assert exc.value.args[0] == "Request returned an error. Error type: transport, Description: Connection error" @pytest.mark.asyncio async def test_execute_single_with_http_400_aborts_when_specified(self): es = None params = None - runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) + error_meta = elastic_transport.ApiResponseMeta(status=404, http_version="1.1", headers={}, duration=0.0, node=None) + runner = mock.AsyncMock( + side_effect=elasticsearch.NotFoundError(message="not found", meta=error_meta, body="the requested document could not be found") + ) with pytest.raises(exceptions.RallyAssertionError) as exc: await driver.execute_single(self.context_managed(runner), es, params, on_error="abort") assert exc.value.args[0] == ( - "Request returned an error. Error type: transport, Description: not found (the requested document could not be found)" + "Request returned an error. Error type: api, Description: not found (the requested document could not be found)" ) @pytest.mark.asyncio async def test_execute_single_with_http_400(self): es = None params = None - runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(404, "not found", "the requested document could not be found")) + error_meta = elastic_transport.ApiResponseMeta(status=404, http_version="1.1", headers={}, duration=0.0, node=None) + runner = mock.AsyncMock( + side_effect=elasticsearch.NotFoundError(message="not found", meta=error_meta, body="the requested document could not be found") + ) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1845,7 +1851,7 @@ async def test_execute_single_with_http_400(self): assert unit == "ops" assert request_meta_data == { "http-status": 404, - "error-type": "transport", + "error-type": "api", "error-description": "not found (the requested document could not be found)", "success": False, } @@ -1854,7 +1860,8 @@ async def test_execute_single_with_http_400(self): async def test_execute_single_with_http_413(self): es = None params = None - runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(413, b"", b"")) + error_meta = elastic_transport.ApiResponseMeta(status=413, http_version="1.1", headers={}, duration=0.0, node=None) + runner = mock.AsyncMock(side_effect=elasticsearch.NotFoundError(message="", meta=error_meta, body="")) ops, unit, request_meta_data = await driver.execute_single(self.context_managed(runner), es, params, on_error="continue") @@ -1862,7 +1869,7 @@ async def test_execute_single_with_http_413(self): assert unit == "ops" assert request_meta_data == { "http-status": 413, - "error-type": "transport", + "error-type": "api", "error-description": "", "success": False, } diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 7bf7589a1..c34878764 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -24,10 +24,12 @@ import random from unittest import mock +import elastic_transport import elasticsearch import pytest from esrally import client, exceptions +from esrally.client.asynchronous import RallyAsyncElasticsearch from esrally.driver import runner @@ -1295,7 +1297,7 @@ async def test_force_merge_with_polling_no_timeout(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling(self, es): - es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) es.tasks.list = mock.AsyncMock( side_effect=[ { @@ -1342,7 +1344,7 @@ async def test_force_merge_with_polling(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_and_params(self, es): - es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout()) + es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) es.tasks.list = mock.AsyncMock( side_effect=[ { @@ -1643,7 +1645,7 @@ async def test_query_with_timeout_and_headers(self, es): es.perform_request.assert_awaited_once_with( method="GET", path="/_all/_search", - params={"request_timeout": 3.0, "request_cache": "true"}, + params={"request_cache": "true"}, body=params["body"], headers={"header1": "value1", "x-opaque-id": "test-id1"}, ) @@ -2204,7 +2206,7 @@ async def test_scroll_query_cannot_clear_scroll(self, es): } es.perform_request = mock.AsyncMock(return_value=io.BytesIO(json.dumps(search_response).encode())) - es.clear_scroll = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout()) + es.clear_scroll = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) query_runner = runner.Query() @@ -2726,7 +2728,7 @@ class TestDeleteIndexRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_indices(self, es): - es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) es.indices.delete = mock.AsyncMock() es.cluster.get_settings = mock.AsyncMock(return_value={"persistent": {}, "transient": {"action.destructive_requires_name": True}}) es.cluster.put_settings = mock.AsyncMock() @@ -2791,7 +2793,7 @@ class TestDeleteDataStreamRunner: @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_existing_data_streams(self, es): - es.indices.get = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) + es.indices.exists = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_data_stream = mock.AsyncMock() r = runner.DeleteDataStream() @@ -2921,7 +2923,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.get_template = mock.AsyncMock(side_effect=[False, True]) + es.indices.exists_template = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_template = mock.AsyncMock() es.indices.delete = mock.AsyncMock() @@ -3051,8 +3053,8 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio - async def test_deletes_only_existing_index_templates(self, es): - es.cluster.get_component_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) + async def test_deletes_only_existing_component_templates(self, es): + es.cluster.exists_component_template = mock.AsyncMock(side_effect=[False, True]) es.cluster.delete_component_template = mock.AsyncMock() r = runner.DeleteComponentTemplate() @@ -3212,7 +3214,7 @@ async def test_deletes_all_index_templates(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_deletes_only_existing_index_templates(self, es): - es.indices.get_index_template = mock.AsyncMock(side_effect=[{"status": 404}, {"status": 200}]) + es.indices.exists_index_template = mock.AsyncMock(side_effect=[False, True]) es.indices.delete_index_template = mock.AsyncMock() r = runner.DeleteComposableTemplate() @@ -3271,7 +3273,8 @@ async def test_create_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_create_ml_datafeed_fallback(self, es): - es.ml.put_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.put_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" body = {"job_id": "total-requests", "indices": ["server-metrics"]} @@ -3300,7 +3303,8 @@ async def test_delete_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_delete_ml_datafeed_fallback(self, es): - es.ml.delete_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.delete_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() datafeed_id = "some-data-feed" @@ -3333,7 +3337,8 @@ async def test_start_ml_datafeed_with_body(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_start_ml_datafeed_with_body_fallback(self, es): - es.ml.start_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.start_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() body = {"end": "now"} params = {"datafeed-id": "some-data-feed", "body": body} @@ -3381,7 +3386,8 @@ async def test_stop_ml_datafeed(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_stop_ml_datafeed_fallback(self, es): - es.ml.stop_datafeed = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.stop_datafeed = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() params = { @@ -3432,7 +3438,8 @@ async def test_create_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_create_ml_job_fallback(self, es): - es.ml.put_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.put_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() body = { @@ -3474,7 +3481,8 @@ async def test_delete_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_delete_ml_job_fallback(self, es): - es.ml.delete_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.delete_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() job_id = "an-ml-job" @@ -3505,7 +3513,8 @@ async def test_open_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_open_ml_job_fallback(self, es): - es.ml.open_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.open_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() job_id = "an-ml-job" @@ -3536,7 +3545,8 @@ async def test_close_ml_job(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_close_ml_job_fallback(self, es): - es.ml.close_job = mock.AsyncMock(side_effect=elasticsearch.TransportError(400, "Bad Request")) + error_meta = elastic_transport.ApiResponseMeta(status=400, http_version="1.1", headers=None, duration=0, node=None) + es.ml.close_job = mock.AsyncMock(side_effect=elasticsearch.BadRequestError(message=400, meta=error_meta, body="Bad Request")) es.perform_request = mock.AsyncMock() params = { @@ -3756,7 +3766,7 @@ async def test_create_snapshot_repository(self, es): await r(es, params) es.snapshot.create_repository.assert_called_once_with( - repository="backups", body={"type": "fs", "settings": {"location": "/var/backups"}}, params={} + name="backups", body={"type": "fs", "settings": {"location": "/var/backups"}}, params={} ) @@ -4149,7 +4159,7 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): "completion-recheck-wait-period": 0, } - es.perform_request = mock.AsyncMock( + es.snapshot.get = mock.AsyncMock( side_effect=[ { "snapshots": [ @@ -4192,14 +4202,9 @@ async def test_wait_for_current_snapshots_create_after_8_3_0(self, es): r = runner.WaitForCurrentSnapshotsCreate() result = await r(es, task_params) - es.perform_request.assert_awaited_with( - method="GET", - path=f"_snapshot/{repository}/_current", - headers={"Content-Type": "application/json"}, - params={"index_names": "false", "verbose": "false"}, - ) + es.snapshot.get.assert_awaited_with(repository=repository, snapshot="_current", verbose=False, index_names=False) - assert es.perform_request.await_count == 2 + assert es.snapshot.get.await_count == 2 assert result is None @@ -5092,7 +5097,6 @@ async def test_transform_stats_with_non_existing_path(self, es): class TestCreateIlmPolicyRunner: - params = { "policy-name": "my-ilm-policy", "request-params": {"master_timeout": "30s", "timeout": "30s"}, @@ -5117,7 +5121,12 @@ async def test_create_ilm_policy_with_request_params(self, es): } es.ilm.put_lifecycle.assert_awaited_once_with( - policy=self.params["policy-name"], body=self.params["body"], params=self.params["request-params"] + name=self.params["policy-name"], + policy=self.params["body"]["policy"], + master_timeout=self.params["request-params"].get("master_timeout"), + timeout=self.params["request-params"].get("timeout"), + error_trace=None, + filter_path=None, ) @mock.patch("elasticsearch.Elasticsearch") @@ -5134,11 +5143,33 @@ async def test_create_ilm_policy_without_request_params(self, es): "success": True, } - es.ilm.put_lifecycle.assert_awaited_once_with(policy=params["policy-name"], body=params["body"], params={}) + es.ilm.put_lifecycle.assert_awaited_once_with( + name=params["policy-name"], + policy=self.params["body"]["policy"], + master_timeout=None, + timeout=None, + error_trace=None, + filter_path=None, + ) + @mock.patch("esrally.client.asynchronous.IlmClient") + @pytest.mark.asyncio + async def test_RallyIlmClient_rewrites_kwargs(self, es_ilm): + es = RallyAsyncElasticsearch(hosts=["http://localhost:9200"]) + es_ilm.put_lifecycle = mock.AsyncMock(return_value={}) -class TestDeleteIlmPolicyRunner: + # simulating a custom runner that hasn't been refactored + # to suit the new 'elasticsearch-py' 8.x kwarg only method signature + await es.ilm.put_lifecycle("test-name", body=self.params["body"]) + + es_ilm.put_lifecycle.assert_awaited_once_with( + es.ilm, + policy=self.params["body"]["policy"], + name="test-name", + ) + +class TestDeleteIlmPolicyRunner: params = {"policy-name": "my-ilm-policy", "request-params": {"master_timeout": "30s", "timeout": "30s"}} @mock.patch("elasticsearch.Elasticsearch") @@ -5153,7 +5184,13 @@ async def test_delete_ilm_policy_with_request_params(self, es): "success": True, } - es.ilm.delete_lifecycle.assert_awaited_once_with(policy=self.params["policy-name"], params=self.params["request-params"]) + es.ilm.delete_lifecycle.assert_awaited_once_with( + name=self.params["policy-name"], + master_timeout=self.params["request-params"].get("master_timeout"), + timeout=self.params["request-params"].get("timeout"), + error_trace=None, + filter_path=None, + ) @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio @@ -5169,7 +5206,13 @@ async def test_delete_ilm_policy_without_request_params(self, es): "success": True, } - es.ilm.delete_lifecycle.assert_awaited_once_with(policy=params["policy-name"], params={}) + es.ilm.delete_lifecycle.assert_awaited_once_with( + name=self.params["policy-name"], + master_timeout=None, + timeout=None, + error_trace=None, + filter_path=None, + ) class TestSqlRunner: @@ -6749,10 +6792,10 @@ async def test_is_does_not_retry_on_success(self): async def test_retries_on_timeout_if_wanted_and_raises_if_no_recovery(self): delegate = mock.AsyncMock( side_effect=[ - elasticsearch.ConnectionError("N/A", "no route to host"), - elasticsearch.ConnectionError("N/A", "no route to host"), - elasticsearch.ConnectionError("N/A", "no route to host"), - elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError(message="no route to host"), + elasticsearch.ConnectionError(message="no route to host"), + elasticsearch.ConnectionError(message="no route to host"), + elasticsearch.ConnectionError(message="no route to host"), ] ) es = None @@ -6776,7 +6819,7 @@ async def test_retries_on_timeout_if_wanted_and_returns_first_call(self): delegate = mock.AsyncMock( side_effect=[ - elasticsearch.ConnectionError("N/A", "no route to host"), + elasticsearch.ConnectionError(message="no route to host"), failed_return_value, ] ) @@ -6798,7 +6841,7 @@ async def test_retries_on_timeout_if_wanted_and_returns_first_call(self): @pytest.mark.asyncio async def test_retries_mixed_timeout_and_application_errors(self): - connection_error = elasticsearch.ConnectionError("N/A", "no route to host") + connection_error = elasticsearch.ConnectionError(message="no route to host") failed_return_value = {"weight": 1, "unit": "ops", "success": False} success_return_value = {"weight": 1, "unit": "ops", "success": False} diff --git a/tests/log_test.py b/tests/log_test.py new file mode 100644 index 000000000..f78579512 --- /dev/null +++ b/tests/log_test.py @@ -0,0 +1,55 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import copy +import json +import os +from unittest import mock +from unittest.mock import mock_open, patch + +import pytest + +from esrally import log + + +class TestLog: + def _load_logging_template(self, p): + with open(p) as f: + return json.load(f) + + @pytest.fixture(autouse=True) + def set_configuration(self): + p = os.path.join(os.path.join(os.path.dirname(__file__), "..", "esrally", "resources", "logging.json")) + self.configuration = self._load_logging_template(p) + + @mock.patch("json.load") + def test_add_missing_loggers_to_config_missing(self, mock_json_load, caplog): + source_template = copy.deepcopy(self.configuration) + existing_configuration = copy.deepcopy(self.configuration) + # change existing to differ from source template, showing that we don't overwrite any existing loggers config + existing_configuration["loggers"]["rally.profile"]["level"] = "DEBUG" + expected_configuration = json.dumps(copy.deepcopy(existing_configuration), indent=2) + # simulate user missing 'elastic_transport' in logging.json + del existing_configuration["loggers"]["elastic_transport"] + + # first loads template, then existing configuration + mock_json_load.side_effect = [source_template, existing_configuration] + + with patch("builtins.open", mock_open()) as mock_file: + log.add_missing_loggers_to_config() + handle = mock_file() + handle.write.assert_called_once_with(expected_configuration) diff --git a/tests/metrics_test.py b/tests/metrics_test.py index b15ccff17..f94ca88cd 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -24,6 +24,7 @@ import string import tempfile import uuid +from dataclasses import dataclass from unittest import mock import elasticsearch.exceptions @@ -83,37 +84,28 @@ def total_time(self): return 0 -class TransportErrors: - err_return_codes = { - 502: "Bad Gateway", - 503: "Service Unavailable", - 504: "Gateway Timeout", - 429: "Too Many Requests", - } - - def __init__(self, max_err_responses=10): - self.max_err_responses = max_err_responses - # allow duplicates in list of error codes - self.rnd_err_codes = [random.choice(list(TransportErrors.err_return_codes)) for _ in range(self.max_err_responses)] - - @property - def code_list(self): - return self.rnd_err_codes - - @property - def side_effects(self): - side_effect_list = [ - elasticsearch.exceptions.TransportError(rnd_code, TransportErrors.err_return_codes[rnd_code]) for rnd_code in self.rnd_err_codes - ] - side_effect_list.append("success") +class TestEsClient: + class NodeMock: + def __init__(self, host, port): + self.host = host + self.port = port - return side_effect_list + class NodePoolMock: + def __init__(self, hosts): + self.nodes = [] + for h in hosts: + self.nodes.append(TestEsClient.NodeMock(host=h["host"], port=h["port"])) + def get(self): + return self.nodes[0] -class TestEsClient: class TransportMock: def __init__(self, hosts): - self.hosts = hosts + self.node_pool = TestEsClient.NodePoolMock(hosts) + + @dataclass + class ApiResponseMeta: + status: int class ClientMock: def __init__(self, hosts): @@ -168,22 +160,110 @@ def test_config_opts_parsing(self, client_esclientfactory, password_configuratio hosts=[{"host": _datastore_host, "port": _datastore_port}], client_options=expected_client_options ) - def test_raises_sytem_setup_error_on_connection_problems(self): - def raise_connection_error(): - raise elasticsearch.exceptions.ConnectionError("unit-test") + @mock.patch("random.random") + @mock.patch("esrally.time.sleep") + def test_retries_on_various_errors(self, mocked_sleep, mocked_random, caplog): + class ConnectionError: + def logging_statements(self, retries): + logging_statements = [] + for i, v in enumerate(range(retries)): + logging_statements.append( + "Connection error [%s] in attempt [%d/%d]. Sleeping for [%f] seconds." + % ( + "unit-test", + i + 1, + max_retry, + sleep_slots[v], + ) + ) + logging_statements.append( + "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [127.0.0.1] at " + f"port [9200] or fix the configuration in [{paths.rally_confdir()}/rally.ini]." + ) + return logging_statements + + def raise_error(self): + raise elasticsearch.exceptions.ConnectionError("unit-test") + + class ConnectionTimeout: + def logging_statements(self, retries): + logging_statements = [] + for i, v in enumerate(range(retries)): + logging_statements.append( + "Connection timeout [%s] in attempt [%d/%d]. Sleeping for [%f] seconds." + % ( + "unit-test", + i + 1, + max_retry, + sleep_slots[v], + ) + ) + logging_statements.append(f"Connection timeout while running [raise_error] (retried {retries} times).") + return logging_statements + + def raise_error(self): + raise elasticsearch.exceptions.ConnectionTimeout("unit-test") + + class ApiError: + def __init__(self, status_code): + self.status_code = status_code + + def logging_statements(self, retries): + logging_statements = [] + for i, v in enumerate(range(retries)): + logging_statements.append( + "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds." + % ( + "unit-test", + self.status_code, + i + 1, + max_retry, + sleep_slots[v], + ) + ) + logging_statements.append( + "An error [unit-test] occurred while running the operation [raise_error] against your Elasticsearch " + "metrics store on host [127.0.0.1] at port [9200]." + ) + return logging_statements + + def raise_error(self): + err = elasticsearch.exceptions.ApiError("unit-test", meta=TestEsClient.ApiResponseMeta(status=self.status_code), body={}) + raise err + + retriable_errors = [ApiError(429), ApiError(502), ApiError(503), ApiError(504), ConnectionError(), ConnectionTimeout()] + + max_retry = 10 + + # The sec to sleep for 10 transport errors is + # [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] ~> 17.05min in total + sleep_slots = [float(2**i) for i in range(0, max_retry)] + + # we want deterministic timings to assess logging statements + mocked_random.return_value = 0 client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9200"}])) - with pytest.raises(exceptions.SystemSetupError) as ctx: - client.guarded(raise_connection_error) - assert ctx.value.args[0] == ( - "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [127.0.0.1] at " - f"port [9200] or fix the configuration in [{paths.rally_confdir()}/rally.ini]." - ) + exepcted_logger_calls = [] + expected_sleep_calls = [] + + for e in retriable_errors: + exepcted_logger_calls += e.logging_statements(max_retry) + expected_sleep_calls += [mock.call(int(sleep_slots[i])) for i in range(0, max_retry)] + + with pytest.raises(exceptions.RallyError): + with caplog.at_level(logging.DEBUG): + client.guarded(e.raise_error) + + actual_logger_calls = [r.message for r in caplog.records] + actual_sleep_calls = mocked_sleep.call_args_list + + assert actual_sleep_calls == expected_sleep_calls + assert actual_logger_calls == exepcted_logger_calls def test_raises_sytem_setup_error_on_authentication_problems(self): def raise_authentication_error(): - raise elasticsearch.exceptions.AuthenticationException("unit-test") + raise elasticsearch.exceptions.AuthenticationException(meta=None, body=None, message="unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) @@ -196,7 +276,7 @@ def raise_authentication_error(): def test_raises_sytem_setup_error_on_authorization_problems(self): def raise_authorization_error(): - raise elasticsearch.exceptions.AuthorizationException("unit-test") + raise elasticsearch.exceptions.AuthorizationException(meta=None, body=None, message="unit-test") client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) @@ -210,77 +290,15 @@ def raise_authorization_error(): def test_raises_rally_error_on_unknown_problems(self): def raise_unknown_error(): - raise elasticsearch.exceptions.SerializationError("unit-test") + exc = elasticsearch.exceptions.TransportError(message="unit-test") + raise exc client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) with pytest.raises(exceptions.RallyError) as ctx: client.guarded(raise_unknown_error) assert ctx.value.args[0] == ( - "An unknown error occurred while running the operation [raise_unknown_error] against your Elasticsearch metrics " - "store on host [127.0.0.1] at port [9243]." - ) - - def test_retries_on_various_transport_errors(self): - @mock.patch("random.random") - @mock.patch("esrally.time.sleep") - def test_transport_error_retries(side_effect, expected_logging_calls, expected_sleep_calls, mocked_sleep, mocked_random): - # should return on first success - operation = mock.Mock(side_effect=side_effect) - - # Disable additional randomization time in exponential backoff calls - mocked_random.return_value = 0 - - client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) - - logger = logging.getLogger("esrally.metrics") - with mock.patch.object(logger, "debug") as mocked_debug_logger: - test_result = client.guarded(operation) - mocked_sleep.assert_has_calls(expected_sleep_calls) - mocked_debug_logger.assert_has_calls(expected_logging_calls, any_order=True) - assert test_result == "success" - - max_retry = 10 - all_err_codes = TransportErrors.err_return_codes - transport_errors = TransportErrors(max_err_responses=max_retry) - rnd_err_codes = transport_errors.code_list - rnd_side_effects = transport_errors.side_effects - rnd_mocked_logger_calls = [] - - # The sec to sleep for 10 transport errors is - # [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] ~> 17.05min in total - sleep_slots = [float(2**i) for i in range(0, max_retry)] - mocked_sleep_calls = [mock.call(sleep_slots[i]) for i in range(0, max_retry)] - - for rnd_err_idx, rnd_err_code in enumerate(rnd_err_codes): - # List of logger.debug calls to expect - rnd_mocked_logger_calls.append( - mock.call( - "%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.", - all_err_codes[rnd_err_code], - rnd_err_code, - rnd_err_idx + 1, - max_retry + 1, - sleep_slots[rnd_err_idx], - ) - ) - # pylint: disable=no-value-for-parameter - test_transport_error_retries(rnd_side_effects, rnd_mocked_logger_calls, mocked_sleep_calls) - - @mock.patch("esrally.time.sleep") - def test_fails_after_too_many_errors(self, mocked_sleep): - def random_transport_error(rnd_resp_code): - raise elasticsearch.exceptions.TransportError(rnd_resp_code, TransportErrors.err_return_codes[rnd_resp_code]) - - client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}])) - rnd_code = random.choice(list(TransportErrors.err_return_codes)) - - with pytest.raises(exceptions.RallyError) as ctx: - client.guarded(random_transport_error, rnd_code) - - assert ctx.value.args[0] == ( - "A transport error occurred while running the operation " - "[random_transport_error] against your Elasticsearch metrics " + "Transport error(s) [unit-test] occurred while running the operation [raise_unknown_error] against your Elasticsearch metrics " "store on host [127.0.0.1] at port [9243]." ) diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index c77b2ef65..5d2498b4d 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -215,7 +215,7 @@ def __call__(self, *args, **kwargs): class TransportClient: - def __init__(self, *, response=None, force_error=False, error=elasticsearch.TransportError): + def __init__(self, response=None, force_error=False, error=elasticsearch.TransportError(message="transport error")): self._response = response self._force_error = force_error self._error = error @@ -4357,171 +4357,152 @@ class TestDiskUsageStats: def test_uses_indices_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - tc = TransportClient(response={"_shards": {"failed": 0}}) - es = Client(transport_client=tc) + es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert tc.kwargs == [ - {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - ] + es.indices.disk_usage.assert_has_calls( + [ + call(index="foo", run_expensive_tasks=True), + call(index="bar", run_expensive_tasks=True), + ] + ) @mock.patch("elasticsearch.Elasticsearch") def test_uses_data_streams_by_default(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - tc = TransportClient(response={"_shards": {"failed": 0}}) - es = Client(transport_client=tc) + es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=["foo", "bar"]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert tc.kwargs == [ - {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - ] + es.indices.disk_usage.assert_has_calls( + [ + call(index="foo", run_expensive_tasks=True), + call(index="bar", run_expensive_tasks=True), + ] + ) @mock.patch("elasticsearch.Elasticsearch") def test_uses_indices_param_if_specified_instead_of_index_names(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - tc = TransportClient(response={"_shards": {"failed": 0}}) - es = Client(transport_client=tc) + es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} device = telemetry.DiskUsageStats( {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=["baz"], data_stream_names=[] ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert tc.kwargs == [ - {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - ] + es.indices.disk_usage.assert_has_calls( + [ + call(index="foo", run_expensive_tasks=True), + call(index="bar", run_expensive_tasks=True), + ] + ) @mock.patch("elasticsearch.Elasticsearch") def test_uses_indices_param_if_specified_instead_of_data_stream_names(self, es): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - tc = TransportClient(response={"_shards": {"failed": 0}}) - es = Client(transport_client=tc) + es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} device = telemetry.DiskUsageStats( {"disk-usage-stats-indices": "foo,bar"}, es, metrics_store, index_names=[], data_stream_names=["baz"] ) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() - assert tc.kwargs == [ - {"method": "POST", "path": "/foo/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - {"method": "POST", "path": "/bar/_disk_usage", "params": {"run_expensive_tasks": "true"}}, - ] + es.indices.disk_usage.assert_has_calls( + [ + call(index="foo", run_expensive_tasks=True), + call(index="bar", run_expensive_tasks=True), + ] + ) @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_error_on_retrieval_does_not_store_metrics(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_error_on_retrieval_does_not_store_metrics(self, es, metrics_store_cluster_level, caplog): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - force_error=True, - error=elasticsearch.RequestError, - ) - ) + es.indices.disk_usage.side_effect = elasticsearch.RequestError(message="error", meta=None, body=None) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): t.on_benchmark_stop() + assert "A transport error occurred while collecting disk usage for foo" in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_no_indices_fails(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_no_indices_fails(self, es, metrics_store_cluster_level, caplog): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - force_error=True, - error=elasticsearch.RequestError, - ) - ) + es.indices.disk_usage.return_value = {"_shards": {"failed": 0}} device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=[], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) with pytest.raises(exceptions.RallyError): t.on_benchmark_start() + msg = ( + "No indices defined for disk-usage-stats. Set disk-usage-stats-indices " + "telemetry param or add indices or data streams to the track config." + ) + assert msg in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_missing_all_fails(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_missing_all_fails(self, es, metrics_store_cluster_level, caplog): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - force_error=True, - error=elasticsearch.RequestError, - ) - ) + es.indices.disk_usage.side_effect = elasticsearch.NotFoundError(message="error", meta=None, body=None) device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() with pytest.raises(exceptions.RallyError): t.on_benchmark_stop() + + assert "Requested disk usage for missing index foo" in caplog.text + assert "Requested disk usage for missing index bar" in caplog.text + assert "Couldn't find any indices for disk usage foo,bar" in caplog.text assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_some_mising_succeeds(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_some_mising_succeeds(self, es, metrics_store_cluster_level, caplog): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - - class TwoTransportClients: - def __init__(self, first, rest): - self.first = first - self.rest = rest - - def perform_request(self, *args, **kwargs): - if self.first: - first = self.first - self.first = None - return first.perform_request(args, kwargs) - else: - return self.rest.perform_request(args, kwargs) - - not_found_transport_client = TransportClient( - force_error=True, - error=elasticsearch.NotFoundError, - ) - successful_client = TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_id": { - "total_in_bytes": 21079, - "inverted_index": {"total_in_bytes": 17110}, - "stored_fields_in_bytes": 3969, - } + not_found_response = elasticsearch.NotFoundError(message="error", meta=None, body=None) + successful_response = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_id": { + "total_in_bytes": 21079, + "inverted_index": {"total_in_bytes": 17110}, + "stored_fields_in_bytes": 3969, } - }, - } - ) - - es = Client(transport_client=TwoTransportClients(not_found_transport_client, successful_client)) + } + }, + } + es.indices.disk_usage.side_effect = [not_found_response, successful_response] device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo", "bar"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() t.on_benchmark_stop() + assert "Requested disk usage for missing index foo" in caplog.text assert metrics_store_cluster_level.call_count == 3 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_successful_shards(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_successful_shards(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"total": 1, "successful": 1, "failed": 0}, - } - ) - ) + es.indices.disk_usage.return_value = { + "_shards": {"total": 1, "successful": 1, "failed": 0}, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4529,16 +4510,13 @@ def test_successful_shards(self, metrics_store_cluster_level): assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_unsuccessful_shards(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_unsuccessful_shards(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"total": 1, "successful": 0, "failed": 1, "failures": "hello there!"}, - } - ) - ) + es.indices.disk_usage.return_value = { + "_shards": {"total": 1, "successful": 0, "failed": 1, "failures": "hello there!"}, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4547,24 +4525,21 @@ def test_unsuccessful_shards(self, metrics_store_cluster_level): assert metrics_store_cluster_level.call_count == 0 @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_source(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_source(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_source": { - "total_in_bytes": 40676, - "stored_fields_in_bytes": 40676, - } - } - }, + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_source": { + "total_in_bytes": 40676, + "stored_fields_in_bytes": 40676, + } } - ) - ) + }, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4575,25 +4550,22 @@ def test_source(self, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_id(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_id(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_id": { - "total_in_bytes": 21079, - "inverted_index": {"total_in_bytes": 17110}, - "stored_fields_in_bytes": 3969, - } - } - }, + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_id": { + "total_in_bytes": 21079, + "inverted_index": {"total_in_bytes": 17110}, + "stored_fields_in_bytes": 3969, + } } - ) - ) + }, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4605,7 +4577,8 @@ def test_id(self, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_empty_field(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_empty_field(self, es, metrics_store_cluster_level): """ Fields like _primary_term commonly have take 0 bytes at all. But they are declared fields so we return their total size just so no one @@ -4613,24 +4586,20 @@ def test_empty_field(self, metrics_store_cluster_level): """ cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "_primary_term": { - "total_in_bytes": 0, - "inverted_index": {"total_in_bytes": 0}, - "stored_fields_in_bytes": 0, - "doc_values_in_bytes": 0, - "points_in_bytes": 0, - } - } - }, + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "_primary_term": { + "total_in_bytes": 0, + "inverted_index": {"total_in_bytes": 0}, + "stored_fields_in_bytes": 0, + "doc_values_in_bytes": 0, + "points_in_bytes": 0, + } } - ) - ) + }, + } device = telemetry.DiskUsageStats({"disk-usage-stats-indices": "foo"}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4640,25 +4609,22 @@ def test_empty_field(self, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_number(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_number(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "prcp": { - "total_in_bytes": 1498, - "doc_values_in_bytes": 748, - "points_in_bytes": 750, - } - } - }, + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "prcp": { + "total_in_bytes": 1498, + "doc_values_in_bytes": 748, + "points_in_bytes": 750, + } } - ) - ) + }, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4670,25 +4636,22 @@ def test_number(self, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_keyword(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_keyword(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": { - "station.country_code": { - "total_in_bytes": 346, - "doc_values_in_bytes": 328, - "points_in_bytes": 18, - } - } - }, + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": { + "fields": { + "station.country_code": { + "total_in_bytes": 346, + "doc_values_in_bytes": 328, + "points_in_bytes": 18, + } } - ) - ) + }, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() @@ -4700,19 +4663,14 @@ def test_keyword(self, metrics_store_cluster_level): ] @mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level") - def test_indexed_vector(self, metrics_store_cluster_level): + @mock.patch("elasticsearch.Elasticsearch") + def test_indexed_vector(self, es, metrics_store_cluster_level): cfg = create_config() metrics_store = metrics.EsMetricsStore(cfg) - es = Client( - transport_client=TransportClient( - response={ - "_shards": {"failed": 0}, - "foo": { - "fields": {"title_vector": {"total_in_bytes": 64179820, "doc_values_in_bytes": 0, "knn_vectors_in_bytes": 64179820}} - }, - } - ) - ) + es.indices.disk_usage.return_value = { + "_shards": {"failed": 0}, + "foo": {"fields": {"title_vector": {"total_in_bytes": 64179820, "doc_values_in_bytes": 0, "knn_vectors_in_bytes": 64179820}}}, + } device = telemetry.DiskUsageStats({}, es, metrics_store, index_names=["foo"], data_stream_names=[]) t = telemetry.Telemetry(enabled_devices=[device.command], devices=[device]) t.on_benchmark_start() diff --git a/tests/tracker/corpus_test.py b/tests/tracker/corpus_test.py index 007fd833d..6ff514217 100644 --- a/tests/tracker/corpus_test.py +++ b/tests/tracker/corpus_test.py @@ -31,6 +31,9 @@ def test_extract(client, mo): doc = {"field1": "stuff", "field2": "things"} doc_data = serialize_doc(doc) client.count.return_value = {"count": 1001} + # the scan helper calls client.options(), which returns a new client instance + # we override this behavior here to facilitate mocking + client.options.return_value = client client.search.return_value = { "_scroll_id": "uohialjrknf", "_shards": { diff --git a/tests/utils/opts_test.py b/tests/utils/opts_test.py index ed9dfbe4a..e3c6a057c 100644 --- a/tests/utils/opts_test.py +++ b/tests/utils/opts_test.py @@ -182,8 +182,6 @@ def test_csv_hosts_parses(self): assert opts.TargetHosts(target_hosts).default == [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}] - assert opts.TargetHosts(target_hosts).default == [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}] - def test_jsonstring_parses_as_dict_of_clusters(self): target_hosts = ( '{"default": ["127.0.0.1:9200","10.17.0.5:19200"],' @@ -192,14 +190,14 @@ def test_jsonstring_parses_as_dict_of_clusters(self): ) assert opts.TargetHosts(target_hosts).all_hosts == { - "default": ["127.0.0.1:9200", "10.17.0.5:19200"], - "remote_1": ["88.33.22.15:19200"], - "remote_2": ["10.18.0.6:19200", "10.18.0.7:19201"], + "default": [{"host": "127.0.0.1", "port": 9200}, {"host": "10.17.0.5", "port": 19200}], + "remote_1": [{"host": "88.33.22.15", "port": 19200}], + "remote_2": [{"host": "10.18.0.6", "port": 19200}, {"host": "10.18.0.7", "port": 19201}], } def test_json_file_parameter_parses(self): assert opts.TargetHosts(os.path.join(os.path.dirname(__file__), "resources", "target_hosts_1.json")).all_hosts == { - "default": ["127.0.0.1:9200", "10.127.0.3:19200"] + "default": [{"host": "127.0.0.1", "port": 9200, "use_ssl": True}, {"host": "10.127.0.3", "port": 19200}] } assert opts.TargetHosts(os.path.join(os.path.dirname(__file__), "resources", "target_hosts_2.json")).all_hosts == { @@ -277,8 +275,7 @@ def test_no_client_option_parses_to_default(self): def test_no_client_option_parses_to_default_with_multicluster(self): client_options_string = opts.ClientOptions.DEFAULT_CLIENT_OPTIONS - target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200,10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}') - + target_hosts = opts.TargetHosts('{"default": ["127.0.0.1:9200", "10.17.0.5:19200"], "remote": ["88.33.22.15:19200"]}') assert opts.ClientOptions(client_options_string, target_hosts=target_hosts).default == {"timeout": 60} assert opts.ClientOptions(client_options_string, target_hosts=target_hosts).all_client_options == { diff --git a/tests/utils/resources/target_hosts_1.json b/tests/utils/resources/target_hosts_1.json index c65ea169e..2dfe79a77 100644 --- a/tests/utils/resources/target_hosts_1.json +++ b/tests/utils/resources/target_hosts_1.json @@ -1 +1 @@ -{ "default": ["127.0.0.1:9200","10.127.0.3:19200"] } +{ "default": ["https://127.0.0.1:9200","10.127.0.3:19200"] }