Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

renamed put pipeline to create ingest pipeline #399

Merged
merged 8 commits into from
Nov 10, 2023
6 changes: 3 additions & 3 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def register_default_runners():
register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True)
register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ def __repr__(self, *args, **kwargs):
return "cluster-health"


class PutPipeline(Runner):
class CreateIngestPipeline(Runner):
async def __call__(self, opensearch, params):
await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self),
body=mandatory(params, "body", self),
Expand All @@ -1101,7 +1101,7 @@ async def __call__(self, opensearch, params):
)

def __repr__(self, *args, **kwargs):
return "put-pipeline"
return "create-ingest-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ class OperationType(Enum):
# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
PutPipeline = 1003
CreateIngestPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
Expand Down Expand Up @@ -647,8 +647,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "create-ingest-pipeline":
return OperationType.CreateIngestPipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
Expand Down
17 changes: 11 additions & 6 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2207,13 +2207,13 @@ async def test_search_pipeline_using_request_params(self, opensearch):
)
opensearch.clear_scroll.assert_not_called()

class PutPipelineRunnerTests(TestCase):
class CreateIngestPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_create_pipeline(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename",
Expand All @@ -2239,13 +2239,16 @@ async def test_create_pipeline(self, opensearch):
async def test_param_body_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename"
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. "
"Parameter source "
"for operation 'create-ingest-pipeline' "
"did not provide the "
"mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand All @@ -2256,13 +2259,15 @@ async def test_param_body_mandatory(self, opensearch):
async def test_param_id_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. "
"Parameter source for "
"operation 'create-ingest-pipeline' did"
" not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand Down
Loading