Skip to content

Commit

Permalink
Merge into Renaming Components: Rename parameters for Distributed Wor…
Browse files Browse the repository at this point in the history
…kload Generation - Issue 258 (#… (#414)

Signed-off-by: vivek palakkat <[email protected]>
Co-authored-by: dosa_chammandi <[email protected]>
Co-authored-by: Maddox Schmidlkofer <[email protected]>
  • Loading branch information
3 people authored Nov 14, 2023
1 parent e57d521 commit 7a7fa3a
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 22 deletions.
6 changes: 3 additions & 3 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def add_workload_source(subparser):
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later
test_execution_parser.add_argument(
"--load-worker-coordinator-hosts",
"--worker-ips",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_execution_parser.add_argument(
Expand Down Expand Up @@ -859,8 +859,8 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
"worker_ips",
opts.csv_to_list(args.worker_ips))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def register_default_runners():
register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True)
register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ def __repr__(self, *args, **kwargs):
return "cluster-health"


class PutPipeline(Runner):
class CreateIngestPipeline(Runner):
async def __call__(self, opensearch, params):
await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self),
body=mandatory(params, "body", self),
Expand All @@ -1101,7 +1101,7 @@ async def __call__(self, opensearch, params):
)

def __repr__(self, *args, **kwargs):
return "put-pipeline"
return "create-ingest-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
Expand Down
10 changes: 5 additions & 5 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor
self.workload = None
self.test_procedure = None
self.metrics_store = None
self.load_worker_coordinator_hosts = []
self.worker_ips = []
self.workers = []
# which client ids are assigned to which workers?
self.clients_per_worker = {}
Expand Down Expand Up @@ -636,7 +636,7 @@ def prepare_benchmark(self, t):
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(os_clients, enable=not uses_static_responses)

for host in self.config.opts("worker_coordinator", "load_worker_coordinator_hosts"):
for host in self.config.opts("worker_coordinator", "worker_ips"):
host_config = {
# for simplicity we assume that all benchmark machines have the same specs
"cores": num_cores(self.config)
Expand All @@ -646,9 +646,9 @@ def prepare_benchmark(self, t):
else:
host_config["host"] = host

self.load_worker_coordinator_hosts.append(host_config)
self.worker_ips.append(host_config)

self.target.prepare_workload([h["host"] for h in self.load_worker_coordinator_hosts], self.config, self.workload)
self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload)

def start_benchmark(self):
self.logger.info("Benchmark is about to start.")
Expand All @@ -669,7 +669,7 @@ def start_benchmark(self):
if allocator.clients < 128:
self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))

worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients)
worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients)
worker_id = 0
for assignment in worker_assignments:
host = assignment["host"]
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ class OperationType(Enum):
# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
PutPipeline = 1003
CreateIngestPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
Expand Down Expand Up @@ -647,8 +647,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "create-ingest-pipeline":
return OperationType.CreateIngestPipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
Expand Down
17 changes: 11 additions & 6 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2207,13 +2207,13 @@ async def test_search_pipeline_using_request_params(self, opensearch):
)
opensearch.clear_scroll.assert_not_called()

class PutPipelineRunnerTests(TestCase):
class CreateIngestPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_create_pipeline(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename",
Expand All @@ -2239,13 +2239,16 @@ async def test_create_pipeline(self, opensearch):
async def test_param_body_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename"
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. "
"Parameter source "
"for operation 'create-ingest-pipeline' "
"did not provide the "
"mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand All @@ -2256,13 +2259,15 @@ async def test_param_body_mandatory(self, opensearch):
async def test_param_id_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. "
"Parameter source for "
"operation 'create-ingest-pipeline' did"
" not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand Down
4 changes: 2 additions & 2 deletions tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def setUp(self):
self.cfg.add(config.Scope.application, "client", "hosts",
WorkerCoordinatorTests.Holder(all_hosts={"default": ["localhost:9200"]}))
self.cfg.add(config.Scope.application, "client", "options", WorkerCoordinatorTests.Holder(all_client_options={"default": {}}))
self.cfg.add(config.Scope.application, "worker_coordinator", "load_worker_coordinator_hosts", ["localhost"])
self.cfg.add(config.Scope.application, "worker_coordinator", "worker_ips", ["localhost"])
self.cfg.add(config.Scope.application, "results_publishing", "datastore.type", "in-memory")

default_test_procedure = workload.TestProcedure("default", default=True, schedule=[
Expand All @@ -135,7 +135,7 @@ def create_test_worker_coordinator_target(self):
@mock.patch("osbenchmark.utils.net.resolve")
def test_start_benchmark_and_prepare_workload(self, resolve):
# override load worker_coordinator host
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", ["10.5.5.1", "10.5.5.2"])
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "worker_ips", ["10.5.5.1", "10.5.5.2"])
resolve.side_effect = ["10.5.5.1", "10.5.5.2"]

target = self.create_test_worker_coordinator_target()
Expand Down

0 comments on commit 7a7fa3a

Please sign in to comment.