From f7eb80021bffa8fbd26f5344e43e37f2caf6531b Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Wed, 13 Mar 2024 14:18:25 -0700 Subject: [PATCH 1/2] Update force merge polling to use async operation Change force merge polling logic to use wait_for_completion to false, to make it async and use task's get api to check whether task is completed or not to exit from force merge. Here, request end time is calcuated only after task is completed. This request end time will not be 100% accurate, since, we use polling to check whether task status is completed or not. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/worker_coordinator/runner.py | 40 +++--- tests/worker_coordinator/runner_test.py | 167 ++++++++++------------- 2 files changed, 94 insertions(+), 113 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index f629d7f6f..c6acce5a2 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -682,30 +682,38 @@ class ForceMerge(Runner): Runs a force merge operation against OpenSearch. """ + PARAM_WAIT_FOR_COMPLETION = "wait_for_completion" + async def __call__(self, opensearch, params): - # pylint: disable=import-outside-toplevel - import opensearchpy max_num_segments = params.get("max-num-segments") mode = params.get("mode") merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": - complete = False - try: - request_context_holder.on_client_request_start() - await opensearch.indices.forcemerge(**merge_params) - request_context_holder.on_client_request_end() - complete = True - except opensearchpy.ConnectionTimeout: - pass - while not complete: - await asyncio.sleep(params.get("poll-period")) - tasks = await opensearch.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks + if params.get(self.PARAM_WAIT_FOR_COMPLETION): + self.logger.warning( + "%s is set for polling. It will be updated to false", self.PARAM_WAIT_FOR_COMPLETION) + merge_params[self.PARAM_WAIT_FOR_COMPLETION] = "false" + request_context_holder.on_client_request_start() + response_task = await opensearch.indices.forcemerge(**merge_params) + while True: + force_merge_task_id = response_task['task'] + task = await opensearch.tasks.get(task_id=force_merge_task_id) + if not task: + self.logger.error("Failed to get task for task id: [%s]", force_merge_task_id) + request_context_holder.on_client_request_end() + raise exceptions.BenchmarkAssertionError( + "Force merge request failure: task was expected but not found in the get tasks api response.") + if 'completed' not in task: request_context_holder.on_client_request_end() - complete = True + raise exceptions.BenchmarkAssertionError( + "Force merge request failure: 'completed' was expected but not found " + "in the get task api response.") + if task['completed']: + request_context_holder.on_client_request_end() + break + await asyncio.sleep(params.get("poll-period")) else: request_context_holder.on_client_request_start() await opensearch.indices.forcemerge(**merge_params) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index ecbde21a1..6969937c1 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -1154,122 +1154,95 @@ async def test_force_merge_with_params(self, opensearch, on_client_request_start opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') - @mock.patch("opensearchpy.OpenSearch") - @run_async - async def test_force_merge_with_polling_no_timeout(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future() - - force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", 'poll-period': 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") - @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ - as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } - }), - as_future({ - "nodes": {} - }) - ] + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.return_value = as_future({ + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } + }) force_merge = runner.ForceMerge() - await force_merge(opensearch, params={"index": "_all", "mode": "polling", "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all") + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", 'poll-period': 10, "wait_for_completion": True}) + opensearch.indices.forcemerge.assert_called_once_with(index="_all", wait_for_completion='false') + opensearch.tasks.get.assert_called_once_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async async def test_force_merge_with_polling_and_params(self, opensearch, on_client_request_start, on_client_request_end): - opensearch.indices.forcemerge.return_value = as_future(exception=opensearchpy.ConnectionTimeout()) - opensearch.tasks.list.side_effect = [ + opensearch.indices.forcemerge.return_value = as_future({"task": "7PtzISisT5SiwlBGUi2GzQ:2820798"}) + opensearch.tasks.get.side_effect = [ as_future({ - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": [ - "data", - "ingest", - "master", - "remote_cluster_client", - "transform" - ], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region" - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {} - } - } - } - } + "completed": False, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": False, + "cancelled": False, + "headers": {} + }, + "response": {} }), as_future({ - "nodes": {} + "completed": True, + "task": { + "node": "7PtzISisT5SiwlBGUi2GzQ", + "id": 2820798, + "type": "transport", + "action": "indices:admin/forcemerge", + "description": "Force-merge indices [_all], , onlyExpungeDeletes[false], flush[true]", + "start_time_in_millis": 1711389911601, + "running_time_in_nanos": 2806258, + "cancellable": "false", + "cancelled": "false", + "headers": {} + }, + "response": { + "_shards": { + "total": 10, + "successful": 10, + "failed": 0 + } + } }) ] force_merge = runner.ForceMerge() # request-timeout should be ignored as mode:polling - await force_merge(opensearch, params={"index" : "_all", "mode": "polling", "max-num-segments": 1, - "request-timeout": 50000, "poll-period": 0}) - opensearch.indices.forcemerge.assert_called_once_with(index="_all", max_num_segments=1, request_timeout=50000) + await force_merge(opensearch, params={ + "index": "_all", "mode": "polling", "max-num-segments": 1, "request-timeout": 50000, "poll-period": 10 + }) + opensearch.indices.forcemerge.assert_called_once_with( + index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion='false') + opensearch.tasks.get.assert_called_with(task_id="7PtzISisT5SiwlBGUi2GzQ:2820798") + self.assertEqual(opensearch.tasks.get.call_count, 2) class IndicesStatsRunnerTests(TestCase): From 698a06883cbe3a6d72de164266263e1a5fffea54 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Fri, 19 Apr 2024 12:16:16 -0700 Subject: [PATCH 2/2] Fixed review comments Signed-off-by: Vijayan Balasubramanian --- osbenchmark/worker_coordinator/runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index c6acce5a2..100e39e84 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -690,10 +690,11 @@ async def __call__(self, opensearch, params): merge_params = self._default_kw_params(params) if max_num_segments: merge_params["max_num_segments"] = max_num_segments + # Request end time will not be 100% accurate, since we are using polling + # to check whether task status is completed or not. if mode == "polling": - if params.get(self.PARAM_WAIT_FOR_COMPLETION): - self.logger.warning( - "%s is set for polling. It will be updated to false", self.PARAM_WAIT_FOR_COMPLETION) + self.logger.warning( + "%s will be updated to false to run force merge in asynchronous way", self.PARAM_WAIT_FOR_COMPLETION) merge_params[self.PARAM_WAIT_FOR_COMPLETION] = "false" request_context_holder.on_client_request_start() response_task = await opensearch.indices.forcemerge(**merge_params)