From 7a7fa3a8e776813d99311e261966f6efccdbc48a Mon Sep 17 00:00:00 2001 From: Ian Hoang <51065478+IanHoang@users.noreply.github.com> Date: Tue, 14 Nov 2023 15:00:25 -0600 Subject: [PATCH] =?UTF-8?q?Merge=20into=20Renaming=20Components:=20Rename?= =?UTF-8?q?=20parameters=20for=20Distributed=20Workload=20Generation=20-?= =?UTF-8?q?=20Issue=20258=20=20(#=E2=80=A6=20(#414)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: vivek palakkat Co-authored-by: dosa_chammandi Co-authored-by: Maddox Schmidlkofer --- osbenchmark/benchmark.py | 6 +++--- osbenchmark/worker_coordinator/runner.py | 6 +++--- .../worker_coordinator/worker_coordinator.py | 10 +++++----- osbenchmark/workload/workload.py | 6 +++--- tests/worker_coordinator/runner_test.py | 17 +++++++++++------ .../worker_coordinator_test.py | 4 ++-- 6 files changed, 27 insertions(+), 22 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index ff5527e89..73ee46527 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -470,7 +470,7 @@ def add_workload_source(subparser): "(default: localhost:9200).", default="") # actually the default is pipeline specific and it is set later test_execution_parser.add_argument( - "--load-worker-coordinator-hosts", + "--worker-ips", help="Define a comma-separated list of hosts which should generate load (default: localhost).", default="localhost") test_execution_parser.add_argument( @@ -859,8 +859,8 @@ def dispatch_sub_command(arg_parser, args, cfg): cfg.add( config.Scope.applicationOverride, "worker_coordinator", - "load_worker_coordinator_hosts", - opts.csv_to_list(args.load_worker_coordinator_hosts)) + "worker_ips", + opts.csv_to_list(args.worker_ips)) cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) configure_workload_params(arg_parser, args, cfg) configure_connection_params(arg_parser, args, cfg) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 92ba65d9a..7837f1826 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -72,7 +72,7 @@ def register_default_runners(): register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True) # We treat the following as administrative commands and thus already start to wrap them in a retry. register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True) - register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True) + register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True) register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True) register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True) register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True) @@ -1092,7 +1092,7 @@ def __repr__(self, *args, **kwargs): return "cluster-health" -class PutPipeline(Runner): +class CreateIngestPipeline(Runner): async def __call__(self, opensearch, params): await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self), body=mandatory(params, "body", self), @@ -1101,7 +1101,7 @@ async def __call__(self, opensearch, params): ) def __repr__(self, *args, **kwargs): - return "put-pipeline" + return "create-ingest-pipeline" # TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474 class CreateSearchPipeline(Runner): diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 98b6f551e..05ac03535 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -528,7 +528,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor self.workload = None self.test_procedure = None self.metrics_store = None - self.load_worker_coordinator_hosts = [] + self.worker_ips = [] self.workers = [] # which client ids are assigned to which workers? self.clients_per_worker = {} @@ -636,7 +636,7 @@ def prepare_benchmark(self, t): # are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs. self.prepare_telemetry(os_clients, enable=not uses_static_responses) - for host in self.config.opts("worker_coordinator", "load_worker_coordinator_hosts"): + for host in self.config.opts("worker_coordinator", "worker_ips"): host_config = { # for simplicity we assume that all benchmark machines have the same specs "cores": num_cores(self.config) @@ -646,9 +646,9 @@ def prepare_benchmark(self, t): else: host_config["host"] = host - self.load_worker_coordinator_hosts.append(host_config) + self.worker_ips.append(host_config) - self.target.prepare_workload([h["host"] for h in self.load_worker_coordinator_hosts], self.config, self.workload) + self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload) def start_benchmark(self): self.logger.info("Benchmark is about to start.") @@ -669,7 +669,7 @@ def start_benchmark(self): if allocator.clients < 128: self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations])) - worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients) + worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients) worker_id = 0 for assignment in worker_assignments: host = assignment["host"] diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 4a1e20cf8..38ae6d24d 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -588,7 +588,7 @@ class OperationType(Enum): # administrative actions ForceMerge = 1001 ClusterHealth = 1002 - PutPipeline = 1003 + CreateIngestPipeline = 1003 Refresh = 1004 CreateIndex = 1005 DeleteIndex = 1006 @@ -647,8 +647,8 @@ def from_hyphenated_string(cls, v): return OperationType.Bulk elif v == "raw-request": return OperationType.RawRequest - elif v == "put-pipeline": - return OperationType.PutPipeline + elif v == "create-ingest-pipeline": + return OperationType.CreateIngestPipeline elif v == "refresh": return OperationType.Refresh elif v == "create-index": diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 699865323..d01ef5828 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2207,13 +2207,13 @@ async def test_search_pipeline_using_request_params(self, opensearch): ) opensearch.clear_scroll.assert_not_called() -class PutPipelineRunnerTests(TestCase): +class CreateIngestPipelineRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_create_pipeline(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "id": "rename", @@ -2239,13 +2239,16 @@ async def test_create_pipeline(self, opensearch): async def test_param_body_mandatory(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "id": "rename" } with self.assertRaisesRegex(exceptions.DataError, - "Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. " + "Parameter source " + "for operation 'create-ingest-pipeline' " + "did not provide the " + "mandatory parameter 'body'. " "Add it to your parameter source and try again."): await r(opensearch, params) @@ -2256,13 +2259,15 @@ async def test_param_body_mandatory(self, opensearch): async def test_param_id_mandatory(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "body": {} } with self.assertRaisesRegex(exceptions.DataError, - "Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. " + "Parameter source for " + "operation 'create-ingest-pipeline' did" + " not provide the mandatory parameter 'id'. " "Add it to your parameter source and try again."): await r(opensearch, params) diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index bf7865288..31391f3c4 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -110,7 +110,7 @@ def setUp(self): self.cfg.add(config.Scope.application, "client", "hosts", WorkerCoordinatorTests.Holder(all_hosts={"default": ["localhost:9200"]})) self.cfg.add(config.Scope.application, "client", "options", WorkerCoordinatorTests.Holder(all_client_options={"default": {}})) - self.cfg.add(config.Scope.application, "worker_coordinator", "load_worker_coordinator_hosts", ["localhost"]) + self.cfg.add(config.Scope.application, "worker_coordinator", "worker_ips", ["localhost"]) self.cfg.add(config.Scope.application, "results_publishing", "datastore.type", "in-memory") default_test_procedure = workload.TestProcedure("default", default=True, schedule=[ @@ -135,7 +135,7 @@ def create_test_worker_coordinator_target(self): @mock.patch("osbenchmark.utils.net.resolve") def test_start_benchmark_and_prepare_workload(self, resolve): # override load worker_coordinator host - self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", ["10.5.5.1", "10.5.5.2"]) + self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "worker_ips", ["10.5.5.1", "10.5.5.2"]) resolve.side_effect = ["10.5.5.1", "10.5.5.2"] target = self.create_test_worker_coordinator_target()