diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 92ba65d9a..7837f1826 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -72,7 +72,7 @@ def register_default_runners(): register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True) # We treat the following as administrative commands and thus already start to wrap them in a retry. register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True) - register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True) + register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True) register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True) register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True) register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True) @@ -1092,7 +1092,7 @@ def __repr__(self, *args, **kwargs): return "cluster-health" -class PutPipeline(Runner): +class CreateIngestPipeline(Runner): async def __call__(self, opensearch, params): await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self), body=mandatory(params, "body", self), @@ -1101,7 +1101,7 @@ async def __call__(self, opensearch, params): ) def __repr__(self, *args, **kwargs): - return "put-pipeline" + return "create-ingest-pipeline" # TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474 class CreateSearchPipeline(Runner): diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 4a1e20cf8..38ae6d24d 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -588,7 +588,7 @@ class OperationType(Enum): # administrative actions ForceMerge = 1001 ClusterHealth = 1002 - PutPipeline = 1003 + CreateIngestPipeline = 1003 Refresh = 1004 CreateIndex = 1005 DeleteIndex = 1006 @@ -647,8 +647,8 @@ def from_hyphenated_string(cls, v): return OperationType.Bulk elif v == "raw-request": return OperationType.RawRequest - elif v == "put-pipeline": - return OperationType.PutPipeline + elif v == "create-ingest-pipeline": + return OperationType.CreateIngestPipeline elif v == "refresh": return OperationType.Refresh elif v == "create-index": diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 699865323..bfcd39040 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2207,13 +2207,13 @@ async def test_search_pipeline_using_request_params(self, opensearch): ) opensearch.clear_scroll.assert_not_called() -class PutPipelineRunnerTests(TestCase): +class CreateIngestPipelineRunnerTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_create_pipeline(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "id": "rename", @@ -2239,7 +2239,7 @@ async def test_create_pipeline(self, opensearch): async def test_param_body_mandatory(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "id": "rename" @@ -2256,7 +2256,7 @@ async def test_param_body_mandatory(self, opensearch): async def test_param_id_mandatory(self, opensearch): opensearch.ingest.put_pipeline.return_value = as_future() - r = runner.PutPipeline() + r = runner.CreateIngestPipeline() params = { "body": {}