diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 7d8d69de7..b879b5a19 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -37,6 +37,7 @@ from functools import total_ordering from io import BytesIO from os.path import commonprefix +import multiprocessing from typing import List, Optional import ijson @@ -1344,29 +1345,49 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F return correct / min_num_of_results + def _set_initial_recall_values(params: dict, result: dict) -> None: + # Add recall@k and recall@1 to the initial result only if k is present in the params and calculate_recall is true + if "k" in params: + result.update({ + "recall@k": 0, + "recall@1": 0 + }) + # Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params + elif "max_distance" in params: + result.update({ + "recall@max_distance": 0, + "recall@max_distance_1": 0 + }) + # Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params + elif "min_score" in params: + result.update({ + "recall@min_score": 0, + "recall@min_score_1": 0 + }) + + def _get_should_calculate_recall(params: dict) -> bool: + # set in global config (benchmark.ini) and passed by AsyncExecutor + num_clients = params.get("num_clients", 0) + if num_clients == 0: + self.logger.debug("Expected num_clients to be specified but was not.") + # default is set for runner unit tests based on default logic for available.cores in worker_coordinator + cpu_count = params.get("num_cores", multiprocessing.cpu_count()) + if cpu_count < num_clients: + self.logger.warning("Number of clients, %s, specified is greater than the number of CPUs, %s, available."\ + "This will lead to unperformant context switching on load generation host. Performance "\ + "metrics may not be accurate. Skipping recall calculation.", num_clients, cpu_count) + return False + return params.get("calculate-recall", True) + result = { "weight": 1, "unit": "ops", "success": True, } - # Add recall@k and recall@1 to the initial result only if k is present in the params - if "k" in params: - result.update({ - "recall@k": 0, - "recall@1": 0 - }) - # Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params - elif "max_distance" in params: - result.update({ - "recall@max_distance": 0, - "recall@max_distance_1": 0 - }) - # Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params - elif "min_score" in params: - result.update({ - "recall@min_score": 0, - "recall@min_score_1": 0 - }) + # deal with clients here. Need to get num_clients + should_calculate_recall = _get_should_calculate_recall(params) + if should_calculate_recall: + _set_initial_recall_values(params, result) doc_type = params.get("type") response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers) @@ -1390,6 +1411,10 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F if _is_empty_search_results(response_json): self.logger.info("Vector search query returned no results.") return result + + if not should_calculate_recall: + return result + id_field = parse_string_parameter("id-field-name", params, "_id") candidates = [] for hit in response_json['hits']['hits']: diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 3d7fb18cc..7b17e4959 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -1525,7 +1525,7 @@ def os_clients(all_hosts, all_client_options): schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task]) async_executor = AsyncExecutor( client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete, - task.error_behavior(self.abort_on_error)) + task.error_behavior(self.abort_on_error), self.cfg) final_executor = AsyncProfiler(async_executor) if self.profiling_enabled else async_executor aws.append(final_executor()) run_start = time.perf_counter() @@ -1577,7 +1577,7 @@ async def __call__(self, *args, **kwargs): class AsyncExecutor: - def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error): + def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, complete, on_error, config=None): """ Executes tasks according to the schedule for a given operation. @@ -1599,6 +1599,7 @@ def __init__(self, client_id, task, schedule, opensearch, sampler, cancel, compl self.complete = complete self.on_error = on_error self.logger = logging.getLogger(__name__) + self.cfg = config async def __call__(self, *args, **kwargs): task_completes_parent = self.task.completes_parent @@ -1624,6 +1625,14 @@ async def __call__(self, *args, **kwargs): processing_start = time.perf_counter() self.schedule_handle.before_request(processing_start) async with self.opensearch["default"].new_request_context() as request_context: + # add num_clients to the parameter so that vector search runner can skip calculating recall + # if num_clients > cpu_count(). + if params: + if params.get("operation-type") == "vector-search": + available_cores = int(self.cfg.opts("system", "available.cores", mandatory=False, + default_value=multiprocessing.cpu_count())) + params.update({"num_clients": self.task.clients, "num_cores": available_cores}) + total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error) request_start = request_context.request_start request_end = request_context.request_end diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 9ba99f713..0fb179695 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -549,6 +549,7 @@ def __init__(self, workload, params, **kwargs): f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'") request_cache = params.get("cache", None) detailed_results = params.get("detailed-results", False) + calculate_recall = params.get("calculate-recall", True) query_body = params.get("body", None) pages = params.get("pages", None) results_per_page = params.get("results-per-page", None) @@ -561,6 +562,7 @@ def __init__(self, workload, params, **kwargs): "type": type_name, "cache": request_cache, "detailed-results": detailed_results, + "calculate-recall": calculate_recall, "request-params": request_params, "response-compression-enabled": response_compression_enabled, "body": query_body @@ -850,6 +852,8 @@ def params(self): class VectorSearchParamSource(SearchParamSource): def __init__(self, workload, params, **kwargs): + # print workload + logging.getLogger(__name__).info("Workload: [%s], params: [%s]", workload, params) super().__init__(workload, params, **kwargs) self.delegate_param_source = VectorSearchPartitionParamSource(workload, params, self.query_params, **kwargs) self.corpora = self.delegate_param_source.corpora diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index a33953676..b43bd53be 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -2578,7 +2578,6 @@ async def test_train_timeout(self, opensearch, sleep, on_client_request_start, o with self.assertRaisesRegex(TimeoutError, f'Failed to create model: {self.model_id} within {self.retries} retries'): await runner_under_test(opensearch, self.request) - class VectorSearchQueryRunnerTests(TestCase): @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @@ -3323,6 +3322,273 @@ async def test_calculate_recall_with_intermediate_negative_one_neighbors(self, o headers={"Accept-Encoding": "identity"} ) + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_query_vector_search_should_skip_calculate_recall(self, opensearch, on_client_request_start, on_client_request_end): + num_clients = 9 + class WorkerCoordinatorTestParamSource: + def __init__(self, workload=None, params=None, **kwargs): + if params is None: + params = {} + self._indices = workload.indices + self._params = params + self._current = 1 + self._total = params.get("size") + self.infinite = self._total is None + + def partition(self, partition_index, total_partitions): + return self + + @property + def percent_completed(self): + if self.infinite: + return None + return self._current / self._total + + def params(self): + if not self.infinite and self._current > self._total: + raise StopIteration() + self._current += 1 + return self._params + # pylint: disable=C0415 + from osbenchmark.worker_coordinator import worker_coordinator + # pylint: disable=C0415 + from osbenchmark.workload import params + # pylint: disable=C0415 + from osbenchmark import workload, config + + # create task here + # sampler is mock + # create actual schedule w new params + + opensearch.init_request_context.return_value = { + "client_request_start": 0, + "request_start": 1, + "request_end": 11, + "client_request_end": 12 + } + + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 + }, + { + "_id": 102, + "_score": 0.88 + }, + { + "_id": 103, + "_score": 0.1 + } + ] + } + } + opensearch.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + + params.register_param_source_for_name("worker-coordinator-test-param-source", WorkerCoordinatorTestParamSource) + test_workload = workload.Workload(name="unittest", description="unittest workload", + indices=None, + test_procedures=None) + + task = workload.Task("time-based", workload.Operation("time-based", + workload.OperationType.VectorSearch.to_hyphenated_string(), + params={ + "index": "_all", + "type": None, + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "k": 3, + "neighbors": [101, 102, 103], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "k": 3 + } + }} + }, + "request-params": {}, + "cache": False + }, + param_source="worker-coordinator-test-param-source"), + warmup_time_period=0.5, time_period=0.5, clients=num_clients, + params={ "clients": num_clients}, + completes_parent=False) + + sampler = worker_coordinator.Sampler(start_timestamp=0) + + runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) + param_source = workload.operation_parameters(test_workload, task) + # pylint: disable=C0415 + import threading + schedule = worker_coordinator.schedule_for(task, 0, param_source) + # pylint: disable=C0415 + def create_config(): + cfg = config.Config() + cfg.add(config.Scope.application, "system", "available.cores", 8) + return cfg + cfg = create_config() + executor = worker_coordinator.AsyncExecutor(client_id=0, task=task, schedule=schedule, opensearch={"default": opensearch}, + sampler=sampler, cancel=threading.Event(), complete=threading.Event(), + on_error="continue", config=cfg) + # will run executor + vector search query runner. + await executor() + + # make copy of samples since they disappear once first accessed. + samples = sampler.samples + recall_k = samples[0].request_meta_data.get("recall@k") + self.assertEqual(recall_k, None) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_query_vector_search_should_actually_calculate_recall_with_default_value(self, opensearch, + on_client_request_start, on_client_request_end): + with mock.patch("os.cpu_count", return_value=8): + num_clients = 8 + class WorkerCoordinatorTestParamSource: + def __init__(self, workload=None, params=None, **kwargs): + if params is None: + params = {} + self._indices = workload.indices + self._params = params + self._current = 1 + self._total = params.get("size") + self.infinite = self._total is None + + def partition(self, partition_index, total_partitions): + return self + + @property + def percent_completed(self): + if self.infinite: + return None + return self._current / self._total + + def params(self): + if not self.infinite and self._current > self._total: + raise StopIteration() + self._current += 1 + return self._params + # pylint: disable=C0415 + from osbenchmark.worker_coordinator import worker_coordinator + # pylint: disable=C0415 + from osbenchmark.workload import params + # pylint: disable=C0415 + from osbenchmark import workload, config + + # create task here + # sampler is mock + # create actual schedule w new params + + opensearch.init_request_context.return_value = { + "client_request_start": 0, + "request_start": 1, + "request_end": 11, + "client_request_end": 12 + } + + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 + }, + { + "_id": 102, + "_score": 0.88 + }, + { + "_id": 103, + "_score": 0.1 + } + ] + } + } + opensearch.transport.perform_request = mock.AsyncMock(return_value=io.StringIO(json.dumps(search_response))) + + params.register_param_source_for_name("worker-coordinator-test-param-source", WorkerCoordinatorTestParamSource) + test_workload = workload.Workload(name="unittest", description="unittest workload", + indices=None, + test_procedures=None) + + task = workload.Task("time-based", workload.Operation("time-based", + workload.OperationType.VectorSearch.to_hyphenated_string(), + params={ + "index": "_all", + "type": None, + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "k": 3, + "neighbors": [101, 102, 103], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "k": 3 + } + }} + }, + "request-params": {}, + "cache": False + }, + param_source="worker-coordinator-test-param-source"), + warmup_time_period=0.5, time_period=0.5, clients=num_clients, + params={ "clients": num_clients}, + completes_parent=False) + + sampler = worker_coordinator.Sampler(start_timestamp=0) + + runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) + param_source = workload.operation_parameters(test_workload, task) + # pylint: disable=C0415 + import threading + schedule = worker_coordinator.schedule_for(task, 0, param_source) + def create_config(): + cfg = config.Config() + cfg.add(config.Scope.application, "system", "available.cores", 8) + return cfg + cfg = create_config() + executor = worker_coordinator.AsyncExecutor(client_id=0, task=task, schedule=schedule, opensearch={"default": opensearch}, + sampler=sampler, cancel=threading.Event(), complete=threading.Event(), + on_error="continue",config=cfg) + # will run executor + vector search query runner. + await executor() + + # make copy of samples since they disappear once first accessed. + samples = sampler.samples + recall_k = samples[0].request_meta_data.get("recall@k") + self.assertEqual(recall_k, 1.0) + class PutPipelineRunnerTests(TestCase): @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 51921f5f4..edff169af 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -2271,7 +2271,8 @@ def test_passes_cache(self): }) p = source.params() - self.assertEqual(10, len(p)) + self.assertEqual(11, len(p)) + self.assertEqual(True, p["calculate-recall"]) self.assertEqual("index1", p["index"]) self.assertIsNone(p["type"]) self.assertIsNone(p["request-timeout"]) @@ -2290,7 +2291,6 @@ def test_passes_cache(self): def test_uses_data_stream(self): ds1 = workload.DataStream(name="data-stream-1") - source = params.SearchParamSource(workload=workload.Workload(name="unit-test", data_streams=[ds1]), params={ "body": { "query": { @@ -2307,7 +2307,8 @@ def test_uses_data_stream(self): }) p = source.params() - self.assertEqual(10, len(p)) + self.assertEqual(11, len(p)) + self.assertEqual(True, p["calculate-recall"]) self.assertEqual("data-stream-1", p["index"]) self.assertIsNone(p["type"]) self.assertEqual(1.0, p["request-timeout"]) @@ -2354,7 +2355,8 @@ def test_passes_request_parameters(self): }) p = source.params() - self.assertEqual(10, len(p)) + self.assertEqual(11, len(p)) + self.assertEqual(True, p["calculate-recall"]) self.assertEqual("index1", p["index"]) self.assertIsNone(p["type"]) self.assertIsNone(p["request-timeout"]) @@ -2390,7 +2392,8 @@ def test_user_specified_overrides_defaults(self): }) p = source.params() - self.assertEqual(10, len(p)) + self.assertEqual(11, len(p)) + self.assertEqual(True, p["calculate-recall"]) self.assertEqual("_all", p["index"]) self.assertEqual("type1", p["type"]) self.assertDictEqual({}, p["request-params"]) @@ -2423,7 +2426,8 @@ def test_user_specified_data_stream_overrides_defaults(self): }) p = source.params() - self.assertEqual(10, len(p)) + self.assertEqual(11, len(p)) + self.assertEqual(True, p["calculate-recall"]) self.assertEqual("data-stream-2", p["index"]) self.assertIsNone(p["type"]) self.assertEqual(1.0, p["request-timeout"])