Skip to content

Commit

Permalink
initial implementation of should_calculate_recall
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Roblin <[email protected]>
  • Loading branch information
finnroblin committed Aug 26, 2024
1 parent 2532d77 commit 5b21773
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
59 changes: 41 additions & 18 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from functools import total_ordering
from io import BytesIO
from os.path import commonprefix
from os import cpu_count as os_cpu_count
from typing import List, Optional

import ijson
Expand Down Expand Up @@ -1320,29 +1321,47 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F

return correct / min_num_of_results

def _set_initial_recall_values(params: dict, result: dict) -> None:
# Add recall@k and recall@1 to the initial result only if k is present in the params and calculate_recall is true
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})

def _get_should_calculate_recall(params: dict) -> bool:
num_clients = params.get("num_clients", 0)
if num_clients == 0:
self.logger.debug("Expected num_clients to be specified but was not.")
cpu_count = os_cpu_count()
if cpu_count < num_clients:
self.logger.warning("Number of clients, %s, specified is greater than the number of CPUs, %s, available."\
"This will lead to unperformant context switching on load generation host. Performance "\
"metrics may not be accurate. Skipping recall calculation.", num_clients, cpu_count)
return False
return params.get("calculate-recall", True)

result = {
"weight": 1,
"unit": "ops",
"success": True,
}
# Add recall@k and recall@1 to the initial result only if k is present in the params
if "k" in params:
result.update({
"recall@k": 0,
"recall@1": 0
})
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
elif "max_distance" in params:
result.update({
"recall@max_distance": 0,
"recall@max_distance_1": 0
})
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
elif "min_score" in params:
result.update({
"recall@min_score": 0,
"recall@min_score_1": 0
})
# deal with clients here. Need to get num_clients
should_calculate_recall = _get_should_calculate_recall(params)
if should_calculate_recall:
_set_initial_recall_values(params, result)

doc_type = params.get("type")
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)
Expand All @@ -1366,6 +1385,10 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
if _is_empty_search_results(response_json):
self.logger.info("Vector search query returned no results.")
return result

if not should_calculate_recall:
return result

id_field = parse_string_parameter("id-field-name", params, "_id")
candidates = []
for hit in response_json['hits']['hits']:
Expand Down
3 changes: 3 additions & 0 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,9 @@ async def __call__(self, *args, **kwargs):
processing_start = time.perf_counter()
self.schedule_handle.before_request(processing_start)
async with self.opensearch["default"].new_request_context() as request_context:
# add num_clients to the parameter so that vector search runner can skip calculating recall
# if num_clients > cpu_count().
params.update({"num_clients": self.task.clients})
total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error)
request_start = request_context.request_start
request_end = request_context.request_end
Expand Down
2 changes: 2 additions & 0 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ def __init__(self, workload, params, **kwargs):
f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'")
request_cache = params.get("cache", None)
detailed_results = params.get("detailed-results", False)
calculate_recall = params.get("calculate-recall", True)
query_body = params.get("body", None)
pages = params.get("pages", None)
results_per_page = params.get("results-per-page", None)
Expand All @@ -561,6 +562,7 @@ def __init__(self, workload, params, **kwargs):
"type": type_name,
"cache": request_cache,
"detailed-results": detailed_results,
"calculate-recall": calculate_recall,
"request-params": request_params,
"response-compression-enabled": response_compression_enabled,
"body": query_body
Expand Down

0 comments on commit 5b21773

Please sign in to comment.