From b56713b13a572243da335d6782ff0866fbd64dd2 Mon Sep 17 00:00:00 2001 From: Finn Roblin Date: Mon, 26 Aug 2024 11:05:45 -0700 Subject: [PATCH] Support filters for vector search Signed-off-by: Finn Roblin --- osbenchmark/utils/dataset.py | 5 + osbenchmark/utils/parse.py | 16 +- osbenchmark/worker_coordinator/runner.py | 26 +- osbenchmark/workload/params.py | 192 +++++++----- tests/utils/dataset_helper.py | 49 +++ tests/worker_coordinator/runner_test.py | 120 +++++--- tests/workload/params_test.py | 370 ++++++++++++++++++----- 7 files changed, 591 insertions(+), 187 deletions(-) diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 7e773d586..0e990609a 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -27,6 +27,7 @@ class Context(Enum): MAX_DISTANCE_NEIGHBORS = 4 MIN_SCORE_NEIGHBORS = 5 PARENTS = 6 + ATTRIBUTES = 7 class DataSet(ABC): @@ -133,6 +134,7 @@ def size(self): def reset(self): self.current = self.BEGINNING + # pylint: disable=R0911 @staticmethod def parse_context(context: Context) -> str: if context == Context.NEIGHBORS: @@ -152,6 +154,9 @@ def parse_context(context: Context) -> str: if context == Context.MIN_SCORE_NEIGHBORS: return "min_score_neighbors" + if context == Context.ATTRIBUTES: + return "attributes" + raise Exception("Unsupported context") diff --git a/osbenchmark/utils/parse.py b/osbenchmark/utils/parse.py index f7ca381fe..213965ce5 100644 --- a/osbenchmark/utils/parse.py +++ b/osbenchmark/utils/parse.py @@ -22,7 +22,7 @@ def parse_string_parameter(key: str, params: dict, default: str = None) -> str: def parse_int_parameter(key: str, params: dict, default: int = None) -> int: if key not in params: - if default: + if default is not None: return default raise ConfigurationError( "Value cannot be None for param {}".format(key) @@ -46,3 +46,17 @@ def parse_float_parameter(key: str, params: dict, default: float = None) -> floa return params[key] raise ConfigurationError("Value must be a float for param {}".format(key)) + + +def parse_bool_parameter(key: str, params: dict, default: bool = None) -> bool: + if key not in params: + if default is not None: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if isinstance(params[key], bool): + return params[key] + + raise ConfigurationError("Value must be a bool for param {}".format(key)) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 21ed83b6a..7d8d69de7 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1254,6 +1254,17 @@ def _get_field_value(content, field_name): return _get_field_value(content["_source"], field_name) return None + def binary_search_for_last_negative_1(neighbors): + low = 0 + high = len(neighbors) + while low < high: + mid = (low + high) // 2 + if neighbors[mid] == "-1": + high = mid + else: + low = mid + 1 + return low - 1 + def calculate_topk_search_recall(predictions, neighbors, top_k): """ Calculates the recall by comparing top_k neighbors with predictions. @@ -1270,7 +1281,20 @@ def calculate_topk_search_recall(predictions, neighbors, top_k): self.logger.info("No neighbors are provided for recall calculation") return 0.0 min_num_of_results = min(top_k, len(neighbors)) + last_neighbor_is_negative_1 = int(neighbors[min_num_of_results-1]) == -1 truth_set = neighbors[:min_num_of_results] + if last_neighbor_is_negative_1: + self.logger.debug("Last neighbor is -1") + last_neighbor_idx = binary_search_for_last_negative_1(truth_set) + + # Note: we do - 1 since list indexing is inclusive, and we want to ignore the first '-1' in neighbors. + truth_set = truth_set[:last_neighbor_idx-1] + if not truth_set: + self.logger.info("No true neighbors after filtering, returning recall = 1.\n" + "Total neighbors in prediction: [%d].", len(predictions)) + return 1.0 + + for j in range(min_num_of_results): if j >= len(predictions): self.logger.info("No more neighbors in prediction to compare against ground truth.\n" @@ -1280,7 +1304,7 @@ def calculate_topk_search_recall(predictions, neighbors, top_k): if predictions[j] in truth_set: correct += 1.0 - return correct / min_num_of_results + return correct / len(truth_set) def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=False): """ diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 270fb6178..92cc61c98 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -40,7 +40,7 @@ from osbenchmark import exceptions from osbenchmark.utils import io from osbenchmark.utils.dataset import DataSet, get_data_set, Context -from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} @@ -1041,12 +1041,12 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): request-params: query parameters that can be passed to search request """ PARAMS_NAME_K = "k" - PARAMS_NAME_MAX_DISTANCE = "max_distance" - PARAMS_NAME_MIN_SCORE = "min_score" PARAMS_NAME_BODY = "body" PARAMS_NAME_SIZE = "size" PARAMS_NAME_QUERY = "query" PARAMS_NAME_FILTER = "filter" + PARAMS_NAME_FILTER_TYPE = "filter_type" + PARAMS_NAME_FILTER_BODY = "filter_body" PARAMS_NAME_REPETITIONS = "repetitions" PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" @@ -1057,27 +1057,11 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): PARAMS_NAME_REQUEST_PARAMS = "request-params" PARAMS_NAME_SOURCE = "_source" PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results" - MIN_SCORE_QUERY_TYPE = "min_score" - MAX_DISTANCE_QUERY_TYPE = "max_distance" - KNN_QUERY_TYPE = "knn" - DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE = 10000 def __init__(self, workloads, params, query_params, **kwargs): super().__init__(workloads, params, Context.QUERY, **kwargs) self.logger = logging.getLogger(__name__) - self.k = None - self.distance = None - self.score = None - if self.PARAMS_NAME_K in params: - self.k = parse_int_parameter(self.PARAMS_NAME_K, params) - self.query_type = self.KNN_QUERY_TYPE - if self.PARAMS_NAME_MAX_DISTANCE in params: - self.distance = parse_float_parameter(self.PARAMS_NAME_MAX_DISTANCE, params) - self.query_type = self.MAX_DISTANCE_QUERY_TYPE - if self.PARAMS_NAME_MIN_SCORE in params: - self.score = parse_float_parameter(self.PARAMS_NAME_MIN_SCORE, params) - self.query_type = self.MIN_SCORE_QUERY_TYPE - self._validate_query_type_parameters() + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) self.current_rep = 1 self.neighbors_data_set_format = parse_string_parameter( @@ -1090,24 +1074,28 @@ def __init__(self, workloads, params, query_params, **kwargs): self.PARAMS_VALUE_VECTOR_SEARCH) self.query_params = query_params self.query_params.update({ + self.PARAMS_NAME_K: self.k, self.PARAMS_NAME_OPERATION_TYPE: operation_type, self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), }) - if self.PARAMS_NAME_K in params: - self.query_params.update({ - self.PARAMS_NAME_K: self.k - }) - if self.PARAMS_NAME_MAX_DISTANCE in params: + + self.filter_type = self.query_params.get(self.PARAMS_NAME_FILTER_TYPE) + self.filter_body = self.query_params.get(self.PARAMS_NAME_FILTER_BODY) + + + if self.PARAMS_NAME_FILTER in params: self.query_params.update({ - self.PARAMS_NAME_MAX_DISTANCE: self.distance + self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER) }) - if self.PARAMS_NAME_MIN_SCORE in params: + + if self.PARAMS_NAME_FILTER_TYPE in params: self.query_params.update({ - self.PARAMS_NAME_MIN_SCORE: self.score + self.PARAMS_NAME_FILTER_TYPE: params.get(self.PARAMS_NAME_FILTER_TYPE) }) - if self.PARAMS_NAME_FILTER in params: + + if self.PARAMS_NAME_FILTER_BODY in params: self.query_params.update({ - self.PARAMS_NAME_FILTER: params.get(self.PARAMS_NAME_FILTER) + self.PARAMS_NAME_FILTER_BODY: params.get(self.PARAMS_NAME_FILTER_BODY) }) # if neighbors data set is defined as corpus, extract corresponding corpus from workload # and add it to corpora list @@ -1115,10 +1103,6 @@ def __init__(self, workloads, params, query_params, **kwargs): neighbors_corpora = self.extract_corpora(self.neighbors_data_set_corpus, self.neighbors_data_set_format) self.corpora.extend(corpora for corpora in neighbors_corpora if corpora not in self.corpora) - def _validate_query_type_parameters(self): - if bool(self.k) + bool(self.distance) + bool(self.score) > 1: - raise ValueError("Only one of k, max_distance, or min_score can be specified in vector search.") - @staticmethod def _validate_neighbors_data_set(file_path, corpus): if file_path and corpus: @@ -1133,31 +1117,26 @@ def _update_request_params(self): self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false") self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params}) - def _get_query_neighbors(self): - if self.query_type == self.KNN_QUERY_TYPE: - return Context.NEIGHBORS - if self.query_type == self.MIN_SCORE_QUERY_TYPE: - return Context.MIN_SCORE_NEIGHBORS - if self.query_type == self.MAX_DISTANCE_QUERY_TYPE: - return Context.MAX_DISTANCE_NEIGHBORS - raise Exception("Unknown query type [%s]" % self.query_type) - - def _get_query_size(self): - if self.query_type == self.KNN_QUERY_TYPE: - return self.k - return self.DEFAULT_RADIAL_SEARCH_QUERY_RESULT_SIZE - def _update_body_params(self, vector): # accept body params if passed from workload, else, create empty dictionary body_params = self.query_params.get(self.PARAMS_NAME_BODY) or dict() if self.PARAMS_NAME_SIZE not in body_params: - body_params[self.PARAMS_NAME_SIZE] = self._get_query_size() + body_params[self.PARAMS_NAME_SIZE] = self.k if self.PARAMS_NAME_QUERY in body_params: self.logger.warning( "[%s] param from body will be replaced with vector search query.", self.PARAMS_NAME_QUERY) - efficient_filter = self.query_params.get(self.PARAMS_NAME_FILTER) + + self.logger.info("Here, we have query_params: %s ", self.query_params) + filter_type=self.query_params.get(self.PARAMS_NAME_FILTER_TYPE) + filter_body=self.query_params.get(self.PARAMS_NAME_FILTER_BODY) + efficient_filter = filter_body if filter_type == "efficient" else None + # override query params with vector search query - body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter) + body_params[self.PARAMS_NAME_QUERY] = self._build_vector_search_query_body(vector, efficient_filter, filter_type, filter_body) + + if filter_type == "post_filter": + body_params["post_filter"] = filter_body + self.query_params.update({self.PARAMS_NAME_BODY: body_params}) def partition(self, partition_index, total_partitions): @@ -1171,7 +1150,7 @@ def partition(self, partition_index, total_partitions): self.neighbors_data_set_path = self.data_set_path # add neighbor instance to partition partition.neighbors_data_set = get_data_set( - self.neighbors_data_set_format, self.neighbors_data_set_path, self._get_query_neighbors()) + self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) partition.neighbors_data_set.seek(partition.offset) return partition @@ -1190,7 +1169,7 @@ def params(self): raise StopIteration vector = self.data_set.read(1)[0] neighbor = self.neighbors_data_set.read(1)[0] - true_neighbors = list(map(str, neighbor[:self.k] if self.k else neighbor)) + true_neighbors = list(map(str, neighbor[:self.k])) self.query_params.update({ "neighbors": true_neighbors, }) @@ -1200,35 +1179,18 @@ def params(self): self.percent_completed = self.current / self.total return self.query_params - def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict: - """Builds a vector search request that can be used to execute an approximate nearest + def _build_vector_search_query_body(self, vector, efficient_filter=None, filter_type=None, filter_body=None) -> dict: + """Builds a k-NN request that can be used to execute an approximate nearest neighbor search against a k-NN plugin index Args: vector: vector used for query - efficient_filter: efficient filter used for query Returns: A dictionary containing the body used for search query """ - query = {} - if self.query_type == self.KNN_QUERY_TYPE: - query.update({ - "k": self.k, - }) - elif self.query_type == self.MIN_SCORE_QUERY_TYPE: - query.update({ - "min_score": self.score, - }) - elif self.query_type == self.MAX_DISTANCE_QUERY_TYPE: - query.update({ - "max_distance": self.distance, - }) - else: - raise Exception("Unknown query type [%s]" % self.query_type) - - query.update({ + query = { "vector": vector, - }) - + "k": self.k, + } if efficient_filter: query.update({ "filter": efficient_filter, @@ -1241,7 +1203,7 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict } if self.is_nested: - outer_field_name, _ = self.get_split_fields() + outer_field_name, _inner_field_name = self.get_split_fields() return { "nested": { "path": outer_field_name, @@ -1249,8 +1211,36 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict } } + if filter_type and not efficient_filter and not filter_type == "post_filter": + return self._knn_query_with_filter(vector, knn_search_query, filter_type, filter_body) + return knn_search_query + def _knn_query_with_filter(self, vector, knn_query, filter_type, filter_body) -> dict: + if filter_type == "script": + return { + "script_score": { + "query": {"bool": {"filter": filter_body}}, + "script": { + "source": "knn_score", + "lang": "knn", + "params": { + "field": self.field_name, + "query_value": vector, + "space_type": "l2" + } + } + } + } + + if filter_type == "boolean": + return { + "bool": { + "filter": filter_body, + "must": [knn_query] + } + } + raise exceptions.ConfigurationError("Unsupported filter type: %s" % filter_type) class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): """ Create bulk index requests from a data set of vectors. @@ -1272,6 +1262,7 @@ def __init__(self, workload, params, **kwargs): self.id_field_name: str = parse_string_parameter( self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME ) + self.filter_attributes: List[Any] = params.get("filter_attributes", []) self.action_buffer = None self.num_nested_vectors = 10 @@ -1303,8 +1294,43 @@ def partition(self, partition_index, total_partitions): ) partition.parent_data_set.seek(partition.offset) + if self.filter_attributes: + partition.attributes_data_set = get_data_set( + self.parent_data_set_format, self.parent_data_set_path, Context.ATTRIBUTES + ) + partition.attributes_data_set.seek(partition.offset) + return partition + def bulk_transform_add_attributes(self, partition: np.ndarray, action, attributes: np.ndarray) -> List[Dict[str, Any]]: + """attributes is a (partition_len x 3) matrix. """ + actions = [] + + _ = [ + actions.extend([action(self.id_field_name, i + self.current), None]) + for i in range(len(partition)) + ] + bulk_contents = [] + + add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME + for vec, attribute_list, identifier in zip( + partition.tolist(), attributes.tolist(), range(self.current, self.current + len(partition)) + ): + row = {self.field_name: vec} + for idx, attribute_name in zip(range(len(self.filter_attributes)), self.filter_attributes): + attribute = attribute_list[idx].decode() + if attribute != "None": + row.update({attribute_name : attribute}) + if add_id_field_to_body: + row.update({self.id_field_name: identifier}) + bulk_contents.append(row) + + actions[1::2] = bulk_contents + + self.logger.info("Actions: %s", actions) + return actions + + def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[str, Any]]: """ Create bulk ingest actions for data with a non-nested field. @@ -1333,7 +1359,7 @@ def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[ def bulk_transform( - self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray] + self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray], attributes: Optional[np.ndarray] ) -> List[Dict[str, Any]]: """Partitions and transforms a list of vectors into OpenSearch's bulk injection format. @@ -1345,9 +1371,12 @@ def bulk_transform( An array of transformed vectors in bulk format. """ - if not self.is_nested: + if not self.is_nested and not self.filter_attributes: return self.bulk_transform_non_nested(partition, action) + # TODO: Assumption: we won't add attributes if we're also doing a nested query. + if self.filter_attributes: + return self.bulk_transform_add_attributes(partition, action, attributes) actions = [] outer_field_name, inner_field_name = self.get_split_fields() @@ -1430,7 +1459,12 @@ def action(id_field_name, doc_id): else: parent_ids = None - body = self.bulk_transform(partition, action, parent_ids) + if self.filter_attributes: + attributes = self.attributes_data_set.read(bulk_size) + else: + attributes = None + + body = self.bulk_transform(partition, action, parent_ids, attributes) size = len(body) // 2 if not self.is_nested: diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py index c13a27b39..2a565b7eb 100644 --- a/tests/utils/dataset_helper.py +++ b/tests/utils/dataset_helper.py @@ -193,6 +193,27 @@ def _build_data_set(self, context: DataSetBuildContext): # file with distance. context.vectors.tofile(f) + +def create_attributes(num_vectors: int) -> np.ndarray: + rng = np.random.default_rng() + + # Random strings and None + strings = ["str1", "str2", "str3"] + + # First column: random choice from strings + col1 = rng.choice(strings, num_vectors).astype("S10") + + # Second column: random choice from strings + col2 = rng.choice(strings, num_vectors).astype("S10") + + # Third column: random numbers between 0 and 100 + col3 = rng.integers(0, 101, num_vectors).astype("S10") + + # Combine columns into a single array + random_vector = np.column_stack((col1, col2, col3)) + + return random_vector + def create_parent_ids(num_vectors: int, group_size: int = 10) -> np.ndarray: num_ids = (num_vectors + group_size - 1) // group_size # Calculate total number of different IDs needed ids = np.arange(1, num_ids + 1) # Create an array of IDs starting from 1 @@ -245,6 +266,34 @@ def create_data_set( return data_set_path +def create_attributes_data_set( + num_vectors: int, + dimension: int, + extension: str, + data_set_context: Context, + data_set_dir, + file_path: str = None +) -> str: + if file_path: + data_set_path = file_path + else: + file_name_base = ''.join(random.choice(string.ascii_letters) for _ in + range(DEFAULT_RANDOM_STRING_LENGTH)) + data_set_file_name = "{}.{}".format(file_name_base, extension) + data_set_path = os.path.join(data_set_dir, data_set_file_name) + context = DataSetBuildContext( + data_set_context, + create_attributes(num_vectors), + data_set_path) + + if extension == HDF5DataSet.FORMAT_NAME: + HDF5Builder().add_data_set_build_context(context).build() + else: + BigANNVectorBuilder().add_data_set_build_context(context).build() + + return data_set_path + + def create_parent_data_set( num_vectors: int, dimension: int, diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index fbf2ecff4..a33953676 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -3125,7 +3125,7 @@ async def test_query_vector_search_with_custom_id_field_inside_source(self, open @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_radial_search_with_min_score(self, opensearch, on_client_request_start, on_client_request_end): + async def test_calculate_recall_with_negative_one_neighbors(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -3142,10 +3142,6 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl { "_id": 102, "_score": 0.88 - }, - { - "_id": 103, - "_score": 0.87 } ] } @@ -3159,8 +3155,8 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl "operation-type": "vector-search", "detailed-results": True, "response-compression-enabled": False, - "min_score": 0.80, - "neighbors": [101, 102, 103], + "k": 4, + "neighbors": [101, 102, -1, -1], "body": { "query": { "knn": { @@ -3169,27 +3165,19 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl 5, 4 ], - "min_score": 0.80, + "k": 3 } - } - } + }} } } async with query_runner: result = await query_runner(opensearch, params) - self.assertEqual(1, result["weight"]) - self.assertEqual("ops", result["unit"]) - self.assertEqual(3, result["hits"]) - self.assertEqual("eq", result["hits_relation"]) - self.assertFalse(result["timed_out"]) - self.assertEqual(5, result["took"]) + self.assertEqual(result["recall@k"], 1.0) self.assertIn("recall_time_ms", result.keys()) - self.assertIn("recall@min_score", result.keys()) - self.assertEqual(result["recall@min_score"], 1.0) - self.assertIn("recall@min_score_1", result.keys()) - self.assertEqual(result["recall@min_score_1"], 1.0) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) self.assertNotIn("error-type", result.keys()) opensearch.transport.perform_request.assert_called_once_with( @@ -3204,7 +3192,7 @@ async def test_query_vector_radial_search_with_min_score(self, opensearch, on_cl @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_query_vector_radial_search_with_max_distance(self, opensearch, on_client_request_start, on_client_request_end): + async def test_calculate_recall_with_some_negative_one_neighbors(self, opensearch, on_client_request_start, on_client_request_end): search_response = { "timed_out": False, "took": 5, @@ -3221,10 +3209,74 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on { "_id": 102, "_score": 0.88 + } + ] + } + } + opensearch.transport.perform_request.return_value = as_future(io.StringIO(json.dumps(search_response))) + + query_runner = runner.Query() + + params = { + "index": "unittest", + "operation-type": "vector-search", + "detailed-results": True, + "response-compression-enabled": False, + "k": 6, + "neighbors": [101, 102, 103, 104, -1, -1], + "body": { + "query": { + "knn": { + "location": { + "vector": [ + 5, + 4 + ], + "k": 3 + } + }} + } + } + + async with query_runner: + result = await query_runner(opensearch, params) + + self.assertEqual(result["recall@k"], 0.5) + self.assertIn("recall_time_ms", result.keys()) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) + self.assertNotIn("error-type", result.keys()) + + opensearch.transport.perform_request.assert_called_once_with( + "GET", + "/unittest/_search", + params={}, + body=params["body"], + headers={"Accept-Encoding": "identity"} + ) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_calculate_recall_with_intermediate_negative_one_neighbors(self, opensearch, + on_client_request_start, on_client_request_end): + search_response = { + "timed_out": False, + "took": 5, + "hits": { + "total": { + "value": 3, + "relation": "eq" + }, + "hits": [ + { + "_id": 101, + "_score": 0.95 }, { - "_id": 103, - "_score": 0.87 + "_id": 102, + "_score": 0.88 } ] } @@ -3238,8 +3290,8 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on "operation-type": "vector-search", "detailed-results": True, "response-compression-enabled": False, - "max_distance": 15.0, - "neighbors": [101, 102, 103, 104], + "k": 4, + "neighbors": [101, 103,102, 104, -1], "body": { "query": { "knn": { @@ -3248,27 +3300,19 @@ async def test_query_vector_radial_search_with_max_distance(self, opensearch, on 5, 4 ], - "max_distance": 15.0, + "k": 3 } - } - } + }} } } async with query_runner: result = await query_runner(opensearch, params) - self.assertEqual(1, result["weight"]) - self.assertEqual("ops", result["unit"]) - self.assertEqual(3, result["hits"]) - self.assertEqual("eq", result["hits_relation"]) - self.assertFalse(result["timed_out"]) - self.assertEqual(5, result["took"]) + self.assertEqual(result["recall@k"], 0.5) self.assertIn("recall_time_ms", result.keys()) - self.assertIn("recall@max_distance", result.keys()) - self.assertEqual(result["recall@max_distance"], 0.75) - self.assertIn("recall@max_distance_1", result.keys()) - self.assertEqual(result["recall@max_distance_1"], 1.0) + self.assertIn("recall@1", result.keys()) + self.assertEqual(result["recall@1"], 1) self.assertNotIn("error-type", result.keys()) opensearch.transport.perform_request.assert_called_once_with( diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 17c904e3e..51921f5f4 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -37,7 +37,7 @@ from osbenchmark.workload import params, workload from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource, \ BulkVectorsFromDataSetParamSource -from tests.utils.dataset_helper import create_data_set, create_parent_data_set +from tests.utils.dataset_helper import create_data_set, create_attributes_data_set, create_parent_data_set from tests.utils.dataset_test import DEFAULT_NUM_VECTORS @@ -2900,7 +2900,7 @@ def test_params_default(self): with self.assertRaises(StopIteration): query_param_source_partition.params() - def test_params_custom_body(self): + def test_post_filter(self): # Create a data set k = 12 data_set_path = create_data_set( @@ -2915,134 +2915,188 @@ def test_params_custom_body(self): self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.NEIGHBORS, - self.data_set_dir + self.data_set_dir, ) - filter_body = { - "key": "value" - } # Create a QueryVectorsFromDataSetParamSource with relevant params + + POST_FILTER_BODY = {"range": {"price": {"gte": 5, "lte": 10}}} test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, "k": k, - "filter": filter_body, + "filter_type": "post_filter", + "filter_body": POST_FILTER_BODY, } query_param_source = VectorSearchPartitionParamSource( workload.Workload(name="unit-test"), - test_param_source_params, { + test_param_source_params, + { "index": self.DEFAULT_INDEX_NAME, "request-params": {}, "body": { "size": 100, - } - } + }, + }, ) query_param_source_partition = query_param_source.partition(0, 1) # Check each for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() self._check_params( - query_param_source_partition.params(), + params, self.DEFAULT_FIELD_NAME, self.DEFAULT_DIMENSION, k, 100, - filter_body, ) + post_filter = params.get("body").get("post_filter") + self.assertIsInstance(post_filter, dict) + self.assertEqual(post_filter, POST_FILTER_BODY) # Assert last call creates stop iteration with self.assertRaises(StopIteration): query_param_source_partition.params() - def test_params_when_multiple_query_type_provided_then_raise_exception(self): + def test_bool_filter(self): # Create a data set + k = 12 data_set_path = create_data_set( self.DEFAULT_NUM_VECTORS, self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.QUERY, - self.data_set_dir + self.data_set_dir, ) neighbors_data_set_path = create_data_set( self.DEFAULT_NUM_VECTORS, self.DEFAULT_DIMENSION, self.DEFAULT_TYPE, Context.NEIGHBORS, - self.data_set_dir + self.data_set_dir, ) + # Create a QueryVectorsFromDataSetParamSource with relevant params - test_param_source_params_1 = { + BOOL_FILTER_BODY = { + "bool": { + "must": [ + {"range": {"rating": {"gte": 8, "lte": 10}}}, + {"term": {"parking": "true"}}, + ] + } + } + test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, - "k": 10, - "min_score": 0.5, + "k": k, + "filter_type": "boolean", + "filter_body": BOOL_FILTER_BODY, } + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params, + { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + }, + }, + ) + query_param_source_partition = query_param_source.partition(0, 1) - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_1, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() + self._check_params_bool( + params, + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k, + 100, + BOOL_FILTER_BODY, ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + # post_filter = params.get("body").get("post_filter") + # self.assertIsInstance(post_filter, dict) + # self.assertEqual(post_filter, BOOL_FILTER_BODY) - test_param_source_params_2 = { - "field": self.DEFAULT_FIELD_NAME, - "data_set_format": self.DEFAULT_TYPE, - "data_set_path": data_set_path, - "neighbors_data_set_path": neighbors_data_set_path, - "k": 10, - "max_distance": 100.0, - } + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source_partition.params() - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_2, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } - ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + def test_script_score_filter(self): + # Create a data set + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir, + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir, + ) - test_param_source_params_3 = { + # Create a QueryVectorsFromDataSetParamSource with relevant params + + SCRIPT_SCORE_FILTER_BODY = { + "bool": { + "must": [ + {"range": {"rating": {"gte": 8, "lte": 10}}}, + {"term": {"parking": "true"}}, + ] + } + } + test_param_source_params = { "field": self.DEFAULT_FIELD_NAME, "data_set_format": self.DEFAULT_TYPE, "data_set_path": data_set_path, "neighbors_data_set_path": neighbors_data_set_path, - "min_score": 0.5, - "max_distance": 100.0, - "k": 10, + "k": k, + "filter_type": "script", + "filter_body": SCRIPT_SCORE_FILTER_BODY, } + query_param_source = VectorSearchPartitionParamSource( + workload.Workload(name="unit-test"), + test_param_source_params, + { + "index": self.DEFAULT_INDEX_NAME, + "request-params": {}, + "body": { + "size": 100, + }, + }, + ) + query_param_source_partition = query_param_source.partition(0, 1) - with self.assertRaisesRegex(ValueError, "Only one of k, max_distance, or min_score can be specified in vector search."): - query_param_source = VectorSearchPartitionParamSource( - workload.Workload(name="unit-test"), - test_param_source_params_3, { - "index": self.DEFAULT_INDEX_NAME, - "request-params": {}, - "body": { - "size": 100, - } - } + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + params = query_param_source_partition.params() + self._check_params_script_score( + params, + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k, + 100, + SCRIPT_SCORE_FILTER_BODY, ) - # This line won't be executed if exception is raised during initialization - query_param_source.partition(0, 1) + # post_filter = params.get("body").get("post_filter") + # self.assertIsInstance(post_filter, dict) + # self.assertEqual(post_filter, BOOL_FILTER_BODY) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source_partition.params() def _check_params( self, @@ -3073,6 +3127,78 @@ def _check_params( self.assertEqual(size, expected_size if expected_size else expected_k) self.assertEqual(field.get("filter"), expected_filter) + def _check_params_bool( + self, + actual_params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int, + expected_size=None, + expected_bool_query=None, + check_vectors=True, + ): + body = actual_params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + query_bool = query.get("bool") + self.assertIsInstance(query_bool, dict) + filter = query_bool.get("filter") + self.assertIsInstance(filter, dict) + self.assertEqual(filter, expected_bool_query) + + must_clause = query_bool.get("must") + self.assertIsInstance(must_clause, list) + + if check_vectors: + knn_dict = must_clause[0] + + repacked = {"body": {"query": knn_dict, "size": body.get("size") }, + "neighbors": actual_params.get("neighbors") + } + + self._check_params(repacked, expected_field, expected_dimension, expected_k,expected_size) + + def _check_params_script_score( + self, + actual_params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int, + expected_size=None, + expected_script_query=None + ): + body = actual_params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + script_score_query = query.get("script_score") + self.assertIsInstance(script_score_query, dict) + bool_from_script_score = script_score_query.get("query").get("bool").get("filter") + + self.assertEqual(bool_from_script_score, expected_script_query) + + script = script_score_query.get("script") + self.assertIsInstance(script, dict) + + source = script.get("source") + self.assertEqual(source, "knn_score") + + lang = script.get("lang") + self.assertEqual(lang, "knn") + + params = script.get("params") + self.assertIsInstance(params, dict) + + field = params.get("field") + self.assertEqual(field, expected_field) + + vector = params.get("query_value") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + + space_type = params.get("space_type") + self.assertEqual(space_type, "l2") # TODO change this once it's all modifiable. class BulkVectorsFromDataSetParamSourceTestCase(TestCase): @@ -3208,6 +3334,114 @@ def _check_params( self.assertTrue(expected_id_field in req_body) +class BulkVectorsAttributeCase(TestCase): + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_VECTOR_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + DEFAULT_ID_FIELD_NAME = "_id" + ATTRIBUTES_LIST = ['taste', 'color', 'age'] + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_params_efficient_filter( + self + ): + num_vectors = 49 + bulk_size = 10 + data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.INDEX, + self.data_set_dir + ) + parent_data_set_path = create_attributes_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.ATTRIBUTES, + self.data_set_dir, + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_VECTOR_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "bulk_size": bulk_size, + "id-field-name": self.DEFAULT_ID_FIELD_NAME, + "filter_attributes": self.ATTRIBUTES_LIST + } + bulk_param_source = BulkVectorsFromDataSetParamSource( + workload.Workload(name="unit-test"), test_param_source_params + ) + bulk_param_source.parent_data_set_path = parent_data_set_path + bulk_param_source_partition = bulk_param_source.partition(0, 1) + # Check each payload returned + vectors_consumed = 0 + while vectors_consumed < num_vectors: + expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size) + actual_params = bulk_param_source_partition.params() + self._check_params_attributes( + actual_params, + self.DEFAULT_INDEX_NAME, + self.DEFAULT_VECTOR_FIELD_NAME, + self.DEFAULT_DIMENSION, + expected_num_vectors, + self.DEFAULT_ID_FIELD_NAME, + ) + vectors_consumed += expected_num_vectors + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + bulk_param_source_partition.params() + + def _check_params_attributes( + self, + actual_params: dict, + expected_index: str, + expected_vector_field: str, + expected_dimension: int, + expected_num_vectors_in_payload: int, + expected_id_field: str, + ): + size = actual_params.get("size") + self.assertEqual(size, expected_num_vectors_in_payload) + body = actual_params.get("body") + self.assertIsInstance(body, list) + self.assertEqual(len(body) // 2, expected_num_vectors_in_payload) + + # Bulk payload has 2 parts: first one is the header and the second one + # is the body. The header will have the index name and the body will + # have the vector + for header, req_body in zip(*[iter(body)] * 2): + index = header.get("index") + self.assertIsInstance(index, dict) + + index_name = index.get("_index") + self.assertEqual(index_name, expected_index) + + vector = req_body.get(expected_vector_field) + self.assertIsInstance(vector, list) + self.assertEqual(len(vector), expected_dimension) + + for attribute in self.ATTRIBUTES_LIST: + self.assertTrue(attribute in req_body) + if expected_id_field in index: + self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field) + self.assertFalse(expected_id_field in req_body) + continue + self.assertTrue(expected_id_field in req_body) + + class VectorsNestedCase(TestCase): DEFAULT_INDEX_NAME = "test-partition-index" DEFAULT_VECTOR_FIELD_NAME = "nested.test-vector-field"