Skip to content

Commit

Permalink
Revert "renamed put pipeline to create ingest pipeline (#399)"
Browse files Browse the repository at this point in the history
This reverts commit e73664a.
  • Loading branch information
Ian Hoang authored and gkamat committed Jan 10, 2024
1 parent 6a073e7 commit 1e46fce
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 17 deletions.
6 changes: 3 additions & 3 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def register_default_runners():
register_runner(workload.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(workload.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(workload.OperationType.CreateIngestPipeline, Retry(CreateIngestPipeline()), async_runner=True)
register_runner(workload.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(workload.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(workload.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(workload.OperationType.DeleteIndex, Retry(DeleteIndex()), async_runner=True)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ def __repr__(self, *args, **kwargs):
return "cluster-health"


class CreateIngestPipeline(Runner):
class PutPipeline(Runner):
async def __call__(self, opensearch, params):
await opensearch.ingest.put_pipeline(id=mandatory(params, "id", self),
body=mandatory(params, "body", self),
Expand All @@ -1240,7 +1240,7 @@ async def __call__(self, opensearch, params):
)

def __repr__(self, *args, **kwargs):
return "create-ingest-pipeline"
return "put-pipeline"

# TODO: refactor it after python client support search pipeline https://github.com/opensearch-project/opensearch-py/issues/474
class CreateSearchPipeline(Runner):
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ class OperationType(Enum):
# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
CreateIngestPipeline = 1003
PutPipeline = 1003
Refresh = 1004
CreateIndex = 1005
DeleteIndex = 1006
Expand Down Expand Up @@ -653,8 +653,8 @@ def from_hyphenated_string(cls, v):
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "create-ingest-pipeline":
return OperationType.CreateIngestPipeline
elif v == "put-pipeline":
return OperationType.PutPipeline
elif v == "refresh":
return OperationType.Refresh
elif v == "create-index":
Expand Down
17 changes: 6 additions & 11 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2737,13 +2737,13 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open
)


class CreateIngestPipelineRunnerTests(TestCase):
class PutPipelineRunnerTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_create_pipeline(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"id": "rename",
Expand All @@ -2769,16 +2769,13 @@ async def test_create_pipeline(self, opensearch):
async def test_param_body_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"id": "rename"
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source "
"for operation 'create-ingest-pipeline' "
"did not provide the "
"mandatory parameter 'body'. "
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand All @@ -2789,15 +2786,13 @@ async def test_param_body_mandatory(self, opensearch):
async def test_param_id_mandatory(self, opensearch):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.CreateIngestPipeline()
r = runner.PutPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for "
"operation 'create-ingest-pipeline' did"
" not provide the mandatory parameter 'id'. "
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand Down

0 comments on commit 1e46fce

Please sign in to comment.