diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index a4dfea42b..bd77107a3 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -56,10 +56,12 @@ def register_default_runners(): register_runner(workload.OperationType.Search, Query(), async_runner=True) register_runner(workload.OperationType.PaginatedSearch, Query(), async_runner=True) register_runner(workload.OperationType.ScrollSearch, Query(), async_runner=True) + register_runner(workload.OperationType.PointInTimeSearch, Query(), async_runner=True) register_runner(workload.OperationType.RawRequest, RawRequest(), async_runner=True) register_runner(workload.OperationType.Composite, Composite(), async_runner=True) register_runner(workload.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True) - register_runner(workload.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True) + register_runner(workload.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), + async_runner=True) register_runner(workload.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True) register_runner(workload.OperationType.CreatePointInTime, CreatePointInTime(), async_runner=True) register_runner(workload.OperationType.DeletePointInTime, DeletePointInTime(), async_runner=True) @@ -78,15 +80,19 @@ def register_default_runners(): register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True) register_runner(workload.OperationType.CreateComponentTemplate, Retry(CreateComponentTemplate()), async_runner=True) register_runner(workload.OperationType.DeleteComponentTemplate, Retry(DeleteComponentTemplate()), async_runner=True) - register_runner(workload.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), async_runner=True) - register_runner(workload.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate()), async_runner=True) + register_runner(workload.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), + async_runner=True) + register_runner(workload.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate()), + async_runner=True) register_runner(workload.OperationType.CreateDataStream, Retry(CreateDataStream()), async_runner=True) register_runner(workload.OperationType.DeleteDataStream, Retry(DeleteDataStream()), async_runner=True) register_runner(workload.OperationType.CreateIndexTemplate, Retry(CreateIndexTemplate()), async_runner=True) register_runner(workload.OperationType.DeleteIndexTemplate, Retry(DeleteIndexTemplate()), async_runner=True) register_runner(workload.OperationType.ShrinkIndex, Retry(ShrinkIndex()), async_runner=True) - register_runner(workload.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), async_runner=True) - register_runner(workload.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), async_runner=True) + register_runner(workload.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), + async_runner=True) + register_runner(workload.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), + async_runner=True) register_runner(workload.OperationType.WaitForSnapshotCreate, Retry(WaitForSnapshotCreate()), async_runner=True) register_runner(workload.OperationType.WaitForRecovery, Retry(IndicesRecovery()), async_runner=True) register_runner(workload.OperationType.PutSettings, Retry(PutSettings()), async_runner=True) @@ -120,7 +126,8 @@ def register_runner(operation_type, runner, **kwargs): if not async_runner: raise exceptions.BenchmarkAssertionError( - "Runner [{}] must be implemented as async runner and registered with async_runner=True.".format(str(runner))) + "Runner [{}] must be implemented as async runner and registered with async_runner=True.".format( + str(runner))) if getattr(runner, "multi_cluster", False): if "__aenter__" in dir(runner) and "__aexit__" in dir(runner): @@ -129,7 +136,8 @@ def register_runner(operation_type, runner, **kwargs): cluster_aware_runner = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True) else: if logger.isEnabledFor(logging.DEBUG): - logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type)) + logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), + str(operation_type)) cluster_aware_runner = _multi_cluster_runner(runner, str(runner)) # we'd rather use callable() but this will erroneously also classify a class as callable... elif isinstance(runner, types.FunctionType): @@ -138,7 +146,8 @@ def register_runner(operation_type, runner, **kwargs): cluster_aware_runner = _single_cluster_runner(runner, runner.__name__) elif "__aenter__" in dir(runner) and "__aexit__" in dir(runner): if logger.isEnabledFor(logging.DEBUG): - logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type)) + logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), + str(operation_type)) cluster_aware_runner = _single_cluster_runner(runner, str(runner), context_manager_enabled=True) else: if logger.isEnabledFor(logging.DEBUG): @@ -190,7 +199,7 @@ def _default_kw_params(self, params): "params": "request-params", "request_timeout": "request-timeout", } - full_result = {k: params.get(v) for (k, v) in kw_dict.items()} + full_result = {k: params.get(v) for (k, v) in kw_dict.items()} # filter Nones return dict(filter(lambda kv: kv[1] is not None, full_result.items())) @@ -210,6 +219,7 @@ class Delegator: """ Mixin to unify delegate handling """ + def __init__(self, delegate, *args, **kwargs): super().__init__(*args, **kwargs) self.delegate = delegate @@ -491,7 +501,8 @@ async def __call__(self, opensearch, params): else: response = await opensearch.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs) - stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response) + stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, + response) meta_data = { "index": params.get("index"), @@ -848,7 +859,7 @@ async def _search_after_query(opensearch, params): if pit_op: pit_id = CompositeContext.get(pit_op) body["pit"] = {"id": pit_id, - "keep_alive": "1m" } + "keep_alive": "1m"} response = await self._raw_search( opensearch, doc_type=None, index=index, body=body.copy(), @@ -905,6 +916,62 @@ async def _request_body_query(opensearch, params): "success": True } + async def _point_in_time_query(opensearch, params): + keep_alive = params.get("keep-alive", "1m") + # create a point in time + response = await opensearch.create_point_in_time(index=index, + params=params.get("request-params"), + keep_alive=keep_alive) + + pit_id = response.get("pit_id") + results = { + "unit": "pages", + "success": True, + "timed_out": False, + "took": 0 + } + + for item in ["index", "routing", "preference"]: + body.pop(item, None) + index = None + + # explicitly convert to int to provoke an error otherwise + total_pages = sys.maxsize if params.get("pages") == "all" else int(mandatory(params, "pages", self)) + try: + for page in range(1, total_pages + 1): + body["pit"] = {"id": pit_id, + "keep_alive": keep_alive} + + response = await self._raw_search( + opensearch, doc_type=None, index=index, body=body.copy(), + params=request_params, headers=headers) + parsed, last_sort = self._extractor(response, True, results.get("hits")) + results["pages"] = page + results["weight"] = page + if results.get("hits") is None: + results["hits"] = parsed.get("hits.total.value") + results["hits_relation"] = parsed.get("hits.total.relation") + results["took"] += parsed.get("took") + # when this evaluates to True, keep it for the final result + if not results["timed_out"]: + results["timed_out"] = parsed.get("timed_out") + + if results.get("hits") / size > page: + body["search_after"] = last_sort + else: + # body needs to be un-mutated for the next iteration (preferring to do this over a deepcopy at the start) + for item in ["pit", "search_after"]: + body.pop(item, None) + break + finally: + if pit_id: + body = { + "pit_id": [pit_id] + } + await opensearch.delete_point_in_time(body=body, params=request_params, headers=None) + + return results + async def _scroll_query(opensearch, params): hits = 0 hits_relation = None @@ -936,15 +1003,15 @@ async def _scroll_query(opensearch, params): all_results_collected = (size is not None and hits < size) or hits == 0 else: r = await opensearch.transport.perform_request("GET", "/_search/scroll", - body={"scroll_id": scroll_id, "scroll": "10s"}, - params=request_params, - headers=headers) + body={"scroll_id": scroll_id, "scroll": "10s"}, + params=request_params, + headers=headers) props = parse(r, ["timed_out", "took"], ["hits.hits"]) timed_out = timed_out or props.get("timed_out", False) took += props.get("took", 0) # is the list of hits empty? all_results_collected = props.get("hits.hits", False) - retrieved_pages +=1 + retrieved_pages += 1 if all_results_collected: break finally: @@ -953,8 +1020,9 @@ async def _scroll_query(opensearch, params): try: await opensearch.clear_scroll(body={"scroll_id": [scroll_id]}) except BaseException: - self.logger.exception("Could not clear scroll [%s]. This will lead to excessive resource usage in " - "OpenSearch and will skew your benchmark results.", scroll_id) + self.logger.exception( + "Could not clear scroll [%s]. This will lead to excessive resource usage in " + "OpenSearch and will skew your benchmark results.", scroll_id) return { "weight": retrieved_pages, @@ -969,6 +1037,8 @@ async def _scroll_query(opensearch, params): search_method = params.get("operation-type") if search_method == "paginated-search": return await _search_after_query(opensearch, params) + elif search_method == "point-in-time-search": + return await _point_in_time_query(opensearch, params) elif search_method == "scroll-search": return await _scroll_query(opensearch, params) elif "pages" in params: @@ -1017,7 +1087,7 @@ def __call__(self, response: BytesIO, get_point_in_time: bool, hits_total: Optio if get_point_in_time and not parsed.get("pit_id"): raise exceptions.BenchmarkAssertionError("Paginated query failure: " - "pit_id was expected but not found in the response.") + "pit_id was expected but not found in the response.") # standardize these before returning... parsed["hits.total.value"] = parsed.pop("hits.total.value", parsed.pop("hits.total", hits_total)) parsed["hits.total.relation"] = parsed.get("hits.total.relation", "eq") @@ -1079,7 +1149,8 @@ def status(v): result = { "weight": 1, "unit": "ops", - "success": status(cluster_status) >= status(expected_cluster_status) and relocating_shards <= expected_relocating_shards, + "success": status(cluster_status) >= status( + expected_cluster_status) and relocating_shards <= expected_relocating_shards, "cluster-status": cluster_status, "relocating-shards": relocating_shards } @@ -1094,10 +1165,10 @@ def __repr__(self, *args, **kwargs): class PutPipeline(Runner): async def __call__(self, opensearch, params): await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self), - body=mandatory(params, "body", self), - master_timeout=params.get("master-timeout"), - timeout=params.get("timeout"), - ) + body=mandatory(params, "body", self), + master_timeout=params.get("master-timeout"), + timeout=params.get("timeout"), + ) def __repr__(self, *args, **kwargs): return "put-pipeline" @@ -1206,7 +1277,7 @@ async def __call__(self, opensearch, params): request_params = mandatory(params, "request-params", self) for template, body in templates: await opensearch.cluster.put_component_template(name=template, body=body, - params=request_params) + params=request_params) return { "weight": len(templates), "unit": "ops", @@ -1234,7 +1305,8 @@ async def _exists(name): ops_count = 0 for template_name in template_names: if not only_if_exists: - await opensearch.cluster.delete_component_template(name=template_name, params=request_params, ignore=[404]) + await opensearch.cluster.delete_component_template(name=template_name, params=request_params, + ignore=[404]) ops_count += 1 elif only_if_exists and await _exists(template_name): self.logger.info("Component Index template [%s] already exists. Deleting it.", template_name) @@ -1246,7 +1318,6 @@ async def _exists(name): "success": True } - def __repr__(self, *args, **kwargs): return "delete-component-template" @@ -1304,8 +1375,8 @@ async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) for template, body in templates: await opensearch.indices.put_template(name=template, - body=body, - params=request_params) + body=body, + params=request_params) return { "weight": len(templates), "unit": "ops", @@ -1386,7 +1457,8 @@ async def __call__(self, opensearch, params): if "data" in node["roles"]: node_names.append(node["name"]) if not node_names: - raise exceptions.BenchmarkAssertionError("Could not choose a suitable shrink-node automatically. Specify it explicitly.") + raise exceptions.BenchmarkAssertionError( + "Could not choose a suitable shrink-node automatically. Specify it explicitly.") for source_index in source_indices: shrink_node = random.choice(node_names) @@ -1395,13 +1467,13 @@ async def __call__(self, opensearch, params): # prepare index for shrinking await opensearch.indices.put_settings(index=source_index, - body={ - "settings": { - "index.routing.allocation.require._name": shrink_node, - "index.blocks.write": "true" - } - }, - preserve_existing=True) + body={ + "settings": { + "index.routing.allocation.require._name": shrink_node, + "index.blocks.write": "true" + } + }, + preserve_existing=True) self.logger.info("Waiting for relocation to finish for index [%s] ...", source_index) await self._wait_for(opensearch, source_index, f"shard relocation for index [{source_index}]") @@ -1412,7 +1484,7 @@ async def __call__(self, opensearch, params): target_body["settings"]["index.blocks.write"] = None # kick off the shrink operation index_suffix = remove_prefix(source_index, source_indices_stem) - final_target_index = target_index if len(index_suffix) == 0 else target_index+index_suffix + final_target_index = target_index if len(index_suffix) == 0 else target_index + index_suffix await opensearch.indices.shrink(index=source_index, target=final_target_index, body=target_body) self.logger.info("Waiting for shrink to finish for index [%s] ...", source_index) @@ -1437,16 +1509,17 @@ async def __call__(self, opensearch, params): path = mandatory(params, "path", self) if not path.startswith("/"): self.logger.error("RawRequest failed. Path parameter: [%s] must begin with a '/'.", path) - raise exceptions.BenchmarkAssertionError(f"RawRequest [{path}] failed. Path parameter must begin with a '/'.") + raise exceptions.BenchmarkAssertionError( + f"RawRequest [{path}] failed. Path parameter must begin with a '/'.") if not bool(headers): - #counter-intuitive, but preserves prior behavior + # counter-intuitive, but preserves prior behavior headers = None await opensearch.transport.perform_request(method=params.get("method", "GET"), - url=path, - headers=headers, - body=params.get("body"), - params=request_params) + url=path, + headers=headers, + body=params.get("body"), + params=request_params) def __repr__(self, *args, **kwargs): return "raw-request" @@ -1472,6 +1545,7 @@ class DeleteSnapshotRepository(Runner): """ Deletes a snapshot repository """ + async def __call__(self, opensearch, params): await opensearch.snapshot.delete_repository(repository=mandatory(params, "repository", repr(self))) @@ -1483,11 +1557,12 @@ class CreateSnapshotRepository(Runner): """ Creates a new snapshot repository """ + async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) await opensearch.snapshot.create_repository(repository=mandatory(params, "repository", repr(self)), - body=mandatory(params, "body", repr(self)), - params=request_params) + body=mandatory(params, "body", repr(self)), + params=request_params) def __repr__(self, *args, **kwargs): return "create-snapshot-repository" @@ -1497,6 +1572,7 @@ class CreateSnapshot(Runner): """ Creates a new snapshot repository """ + async def __call__(self, opensearch, params): wait_for_completion = params.get("wait-for-completion", False) repository = mandatory(params, "repository", repr(self)) @@ -1505,9 +1581,9 @@ async def __call__(self, opensearch, params): mandatory(params, "body", repr(self)) api_kwargs = self._default_kw_params(params) await opensearch.snapshot.create(repository=repository, - snapshot=snapshot, - wait_for_completion=wait_for_completion, - **api_kwargs) + snapshot=snapshot, + wait_for_completion=wait_for_completion, + **api_kwargs) def __repr__(self, *args, **kwargs): return "create-snapshot" @@ -1524,8 +1600,8 @@ async def __call__(self, opensearch, params): while not snapshot_done: response = await opensearch.snapshot.status(repository=repository, - snapshot=snapshot, - ignore_unavailable=True) + snapshot=snapshot, + ignore_unavailable=True) if "snapshots" in response: response_state = response["snapshots"][0]["state"] @@ -1563,12 +1639,13 @@ class RestoreSnapshot(Runner): """ Restores a snapshot from an already registered repository """ + async def __call__(self, opensearch, params): api_kwargs = self._default_kw_params(params) await opensearch.snapshot.restore(repository=mandatory(params, "repository", repr(self)), - snapshot=mandatory(params, "snapshot", repr(self)), - wait_for_completion=params.get("wait-for-completion", False), - **api_kwargs) + snapshot=mandatory(params, "snapshot", repr(self)), + wait_for_completion=params.get("wait-for-completion", False), + **api_kwargs) def __repr__(self, *args, **kwargs): return "restore-snapshot" @@ -1640,7 +1717,8 @@ async def __call__(self, opensearch, params): transform_id = mandatory(params, "transform-id", self) body = mandatory(params, "body", self) defer_validation = params.get("defer-validation", False) - await opensearch.transform.put_transform(transform_id=transform_id, body=body, defer_validation=defer_validation) + await opensearch.transform.put_transform(transform_id=transform_id, body=body, + defer_validation=defer_validation) def __repr__(self, *args, **kwargs): return "create-transform" @@ -1708,10 +1786,10 @@ async def __call__(self, opensearch, params): if not self._start_time: self._start_time = time.monotonic() await opensearch.transform.stop_transform(transform_id=transform_id, - force=force, - timeout=timeout, - wait_for_completion=False, - wait_for_checkpoint=wait_for_checkpoint) + force=force, + timeout=timeout, + wait_for_completion=False, + wait_for_checkpoint=wait_for_checkpoint) while True: stats_response = await opensearch.transform.get_transform_stats(transform_id=transform_id) @@ -1786,8 +1864,8 @@ class SubmitAsyncSearch(Runner): async def __call__(self, opensearch, params): request_params = params.get("request-params", {}) response = await opensearch.async_search.submit(body=mandatory(params, "body", self), - index=params.get("index"), - params=request_params) + index=params.get("index"), + params=request_params) op_name = mandatory(params, "name", self) # id may be None if the operation has already returned @@ -1815,7 +1893,7 @@ async def __call__(self, opensearch, params): stats = {} for search_id, search in async_search_ids(searches): response = await opensearch.async_search.get(id=search_id, - params=request_params) + params=request_params) is_running = response["is_running"] success = success and not is_running if not is_running: @@ -1869,7 +1947,7 @@ async def __call__(self, opensearch, params): pit_op = params.get("with-point-in-time-from", None) request_params = params.get("request-params", {}) if pit_op is None: - await opensearch.delete_point_in_time(all="_all", params=request_params) + await opensearch.delete_point_in_time(all=True, params=request_params) else: pit_id = CompositeContext.get(pit_op) body = { @@ -1930,13 +2008,15 @@ def _ctx(): try: return CompositeContext.ctx.get() except LookupError: - raise exceptions.BenchmarkAssertionError("This operation is only allowed inside a composite operation.") from None + raise exceptions.BenchmarkAssertionError( + "This operation is only allowed inside a composite operation.") from None class Composite(Runner): """ Executes a complex request structure which is measured by Benchmark as one composite operation. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.supported_op_types = [ @@ -1979,7 +2059,8 @@ async def run_stream(self, opensearch, stream, connection_limit): timings.append(timing) else: - raise exceptions.BenchmarkAssertionError("Requests structure must contain [stream] or [operation-type].") + raise exceptions.BenchmarkAssertionError( + "Requests structure must contain [stream] or [operation-type].") except BaseException: # stop all already created tasks in case of exceptions for s in streams: diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index b854178d1..a1f533529 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -581,9 +581,10 @@ class OperationType(Enum): DeleteAsyncSearch = 11 PaginatedSearch = 12 ScrollSearch = 13 - CreatePointInTime = 14 - DeletePointInTime = 15 - ListAllPointInTime = 16 + PointInTimeSearch = 14 + CreatePointInTime = 15 + DeletePointInTime = 16 + ListAllPointInTime = 17 # administrative actions ForceMerge = 1001 @@ -640,6 +641,8 @@ def from_hyphenated_string(cls, v): return OperationType.ScrollSearch elif v == "paginated-search": return OperationType.PaginatedSearch + elif v == "point-in-time-search": + return OperationType.PointInTimeSearch elif v == "cluster-health": return OperationType.ClusterHealth elif v == "bulk":