Skip to content

Commit

Permalink
Generalized to update settings and added max slice param
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Gaievski <[email protected]>
  • Loading branch information
martin-gaievski committed Jul 23, 2024
1 parent d25b145 commit 41b0429
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
10 changes: 7 additions & 3 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def register_default_runners():
register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True)
register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True)
register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True)
register_runner(workload.OperationType.EnableConcurrentSegmentSearch, Retry(EnableConcurrentSegmentSearch()), async_runner=True)
register_runner(workload.OperationType.UpdateConcurrentSegmentSearchSettings,
Retry(UpdateConcurrentSegmentSearchSettings()), async_runner=True)

def runner_for(operation_type):
try:
Expand Down Expand Up @@ -2698,16 +2699,19 @@ async def __call__(self, opensearch, params):
def __repr__(self, *args, **kwargs):
return "deploy-ml-model"

class EnableConcurrentSegmentSearch(Runner):
class UpdateConcurrentSegmentSearchSettings(Runner):
@time_func
async def __call__(self, opensearch, params):
enable_setting = params.get("enable", "false")
max_slice_count = params.get("max_slice_count", None)
body = {
"persistent": {
"search.concurrent_segment_search.enabled": enable_setting
}
}
if max_slice_count is not None:
body["persistent"]["search.concurrent_segment_search.max_slice_count"] = max_slice_count
await opensearch.cluster.put_settings(body=body)

def __repr__(self, *args, **kwargs):
return "enable-concurrent-segment-search"
return "update-concurrent-segment-search-settings"
6 changes: 3 additions & 3 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ class OperationType(Enum):
DeleteMlModel = 1041
RegisterMlModel = 1042
DeployMlModel = 1043
EnableConcurrentSegmentSearch = 1044
UpdateConcurrentSegmentSearchSettings = 1044

@property
def admin_op(self):
Expand Down Expand Up @@ -753,8 +753,8 @@ def from_hyphenated_string(cls, v):
return OperationType.TrainKnnModel
elif v == "delete-knn-model":
return OperationType.DeleteKnnModel
elif v == "enable-concurrent-segment-search":
return OperationType.EnableConcurrentSegmentSearch
elif v == "update-concurrent-segment-search-settings":
return OperationType.UpdateConcurrentSegmentSearchSettings
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
45 changes: 43 additions & 2 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6786,7 +6786,7 @@ async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_

self.assertEqual(0, opensearch.transport.perform_request.call_count)

class EnableConcurrentSegmentSearchTests(TestCase):
class UpdateConcurrentSegmentSearchSettingsTests(TestCase):
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
Expand All @@ -6797,11 +6797,52 @@ async def test_enable_concurrent_segment_search(self, opensearch, on_client_requ
"enable": "true"
}

r = runner.EnableConcurrentSegmentSearch()
r = runner.UpdateConcurrentSegmentSearchSettings()
await r(opensearch, params)

opensearch.cluster.put_settings.assert_called_once_with(body={
"persistent": {
"search.concurrent_segment_search.enabled": "true"
}
})

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_max_slice_count(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.cluster.put_settings.return_value = as_future()
params = {
"max_slice_count": 2
}

r = runner.UpdateConcurrentSegmentSearchSettings()
await r(opensearch, params)

opensearch.cluster.put_settings.assert_called_once_with(body={
"persistent": {
"search.concurrent_segment_search.enabled": "false",
"search.concurrent_segment_search.max_slice_count": 2
}
})

@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end')
@mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start')
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_concurrent_segment_search_settings(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.cluster.put_settings.return_value = as_future()
params = {
"enable": "true",
"max_slice_count": 2
}

r = runner.UpdateConcurrentSegmentSearchSettings()
await r(opensearch, params)

opensearch.cluster.put_settings.assert_called_once_with(body={
"persistent": {
"search.concurrent_segment_search.enabled": "true",
"search.concurrent_segment_search.max_slice_count": 2
}
})

0 comments on commit 41b0429

Please sign in to comment.