Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert renaming commits from main #436

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def add_workload_source(subparser):
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later
test_execution_parser.add_argument(
"--worker-ips",
"--load-worker-coordinator-hosts",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_execution_parser.add_argument(
Expand Down Expand Up @@ -859,8 +859,8 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"worker_ips",
opts.csv_to_list(args.worker_ips))
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def register_default_runners():
register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True)
register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ def __repr__(self, *args, **kwargs):
return "cluster-health"


class CreateIngestPipeline(Runner):
class PutPipeline(Runner):
async def __call__(self, opensearch, params):
await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self),
body=mandatory(params, "body", self),
Expand All @@ -1240,7 +1240,7 @@ async def __call__(self, opensearch, params):
)

def __repr__(self, *args, **kwargs):
return "create-ingest-pipeline"
return "put-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
Expand Down
10 changes: 5 additions & 5 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor
self.workload = None
self.test_procedure = None
self.metrics_store = None
self.worker_ips = []
self.load_worker_coordinator_hosts = []
self.workers = []
# which client ids are assigned to which workers?
self.clients_per_worker = {}
Expand Down Expand Up @@ -637,7 +637,7 @@ def prepare_benchmark(self, t):
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(os_clients, enable=not uses_static_responses)

for host in self.config.opts("worker_coordinator", "worker_ips"):
for host in self.config.opts("worker_coordinator", "load_worker_coordinator_hosts"):
host_config = {
# for simplicity we assume that all benchmark machines have the same specs
"cores": num_cores(self.config)
Expand All @@ -647,9 +647,9 @@ def prepare_benchmark(self, t):
else:
host_config["host"] = host

self.worker_ips.append(host_config)
self.load_worker_coordinator_hosts.append(host_config)

self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload)
self.target.prepare_workload([h["host"] for h in self.load_worker_coordinator_hosts], self.config, self.workload)

def start_benchmark(self):
self.logger.info("Benchmark is about to start.")
Expand All @@ -670,7 +670,7 @@ def start_benchmark(self):
if allocator.clients < 128:
self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))

worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients)
worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients)
worker_id = 0
for assignment in worker_assignments:
host = assignment["host"]
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ class OperationType(Enum):
# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
CreateIngestPipeline = 1003
PutPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
Expand Down Expand Up @@ -653,8 +653,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "create-ingest-pipeline":
return OperationType.CreateIngestPipeline
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
Expand Down
17 changes: 6 additions & 11 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2737,13 +2737,13 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open
)


class CreateIngestPipelineRunnerTests(TestCase):
class PutPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_create_pipeline(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"id": "rename",
Expand All @@ -2769,16 +2769,13 @@ async def test_create_pipeline(self, opensearch):
async def test_param_body_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"id": "rename"
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source "
"for operation 'create-ingest-pipeline' "
"did not provide the "
"mandatory parameter 'body'. "
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand All @@ -2789,15 +2786,13 @@ async def test_param_body_mandatory(self, opensearch):
async def test_param_id_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for "
"operation 'create-ingest-pipeline' did"
" not provide the mandatory parameter 'id'. "
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand Down
4 changes: 2 additions & 2 deletions tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def setUp(self):
self.cfg.add(config.Scope.application, "client", "hosts",
WorkerCoordinatorTests.Holder(all_hosts={"default": ["localhost:9200"]}))
self.cfg.add(config.Scope.application, "client", "options", WorkerCoordinatorTests.Holder(all_client_options={"default": {}}))
self.cfg.add(config.Scope.application, "worker_coordinator", "worker_ips", ["localhost"])
self.cfg.add(config.Scope.application, "worker_coordinator", "load_worker_coordinator_hosts", ["localhost"])
self.cfg.add(config.Scope.application, "results_publishing", "datastore.type", "in-memory")

default_test_procedure = workload.TestProcedure("default", default=True, schedule=[
Expand All @@ -135,7 +135,7 @@ def create_test_worker_coordinator_target(self):
@mock.patch("osbenchmark.utils.net.resolve")
def test_start_benchmark_and_prepare_workload(self, resolve):
# override load worker_coordinator host
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "worker_ips", ["10.5.5.1", "10.5.5.2"])
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", ["10.5.5.1", "10.5.5.2"])
resolve.side_effect = ["10.5.5.1", "10.5.5.2"]

target = self.create_test_worker_coordinator_target()
Expand Down
Loading