From 6d09dba3390e24a1758e068ac66e5ee1bf13d347 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Sun, 17 Dec 2023 16:56:59 -0800 Subject: [PATCH] Add vector search param source Added new param source to partition vector dataset and neighbors. This will be passed to runner to perform search and compare response with neighbors for recall calculation. This param source extends Search ParamSource to inherit search's other query parameters. Vector Param Source will add additional paramter that are required for vector serach operation type. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/worker_coordinator/runner.py | 2 +- osbenchmark/workload/params.py | 203 ++++++++++++++++++- tests/workload/params_test.py | 244 ++++++++++++++++++++++- 3 files changed, 444 insertions(+), 5 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 2cff1216f..a313d90f0 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1041,7 +1041,7 @@ def calculate_recall(predictions, neighbors, top_k): if _is_empty_search_results(response_json): self.logger.info("Vector search query returned no results.") return result - id_field = params.get("id_field_name", "_id") + id_field = params.get("id-field-name", "_id") candidates = [] for hit in response_json['hits']['hits']: if id_field in hit: # Will add to candidates if field value is present diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 5cf0bd59a..ec04f320c 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -23,20 +23,22 @@ # under the License. import collections +import copy import inspect import logging import math import numbers import operator import random -from abc import ABC - import time +from abc import ABC, abstractmethod from enum import Enum from osbenchmark import exceptions -from osbenchmark.workload import workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import DataSet, get_data_set, Context +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter +from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} __PARAM_SOURCES_BY_NAME = {} @@ -797,6 +799,200 @@ def params(self): parsed_params.update(self._client_params()) return parsed_params + +class VectorSearchParamSource(SearchParamSource): + def __init__(self, workload, params, **kwargs): + super().__init__(workload, params, **kwargs) + self.delegate_param_source = VectorSearchPartitionParamSource(params, self.query_params) + + def partition(self, partition_index, total_partitions): + return self.delegate_param_source.partition(partition_index, total_partitions) + + def params(self): + raise exceptions.WorkloadConfigError("Do not use a VectorSearchParamSource without partitioning") + + +class VectorDataSetPartitionParamSource(ABC): + """ Abstract class that can read vectors from a data set and partition the + vectors across multiple clients. + + Attributes: + field_name: Name of the field to generate the query for + data_set_format: Format data set is serialized with. bigann or hdf5 + data_set_path: Path to data set + context: Context the data set will be used in. + data_set: Structure containing meta data about data and ability to read + num_vectors: Number of vectors to use from the data set + total: Number of vectors for the partition + current: Current vector offset in data set + infinite: Property of param source signalling that it can be exhausted + percent_completed: Progress indicator for how exhausted data set is + offset: Offset into the data set to start at. Relevant when there are + multiple partitions + """ + + def __init__(self, params, context: Context): + self.field_name: str = parse_string_parameter("field", params) + + self.context = context + self.data_set_format = parse_string_parameter("data_set_format", params) + self.data_set_path = parse_string_parameter("data_set_path", params) + self.data_set: DataSet = get_data_set( + self.data_set_format, self.data_set_path, self.context) + + num_vectors: int = parse_int_parameter( + "num_vectors", params, self.data_set.size()) + # if value is -1 or greater than dataset size, use dataset size as num_vectors + self.num_vectors = self.data_set.size() if ( + num_vectors < 0 or num_vectors > self.data_set.size()) else num_vectors + self.total = self.num_vectors + self.current = 0 + self.infinite = False + self.percent_completed = 0 + self.offset = 0 + + def _is_last_partition(self, partition_index, total_partitions): + return partition_index == total_partitions - 1 + + def partition(self, partition_index, total_partitions): + """ + Splits up the parameters source so that multiple clients can read data + from it. + Args: + partition_index: index of one particular partition + total_partitions: total number of partitions data set is split into + + Returns: + The parameter source for this particular partition + """ + partition_x = copy.copy(self) + + num_vectors = int(self.num_vectors / total_partitions) + + # if partition is not divided equally, add extra docs to the last partition + if self.num_vectors % total_partitions != 0 and self._is_last_partition(partition_index, total_partitions): + num_vectors += self.num_vectors - (num_vectors * total_partitions) + + partition_x.num_vectors = num_vectors + partition_x.offset = int(partition_index * partition_x.num_vectors) + # We need to create a new instance of the data set for each client + partition_x.data_set = get_data_set( + self.data_set_format, + self.data_set_path, + self.context + ) + partition_x.data_set.seek(partition_x.offset) + partition_x.current = partition_x.offset + return partition_x + + @abstractmethod + def params(self): + """ + Returns: A single parameter from this source + """ + + +class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): + """ Parameter source for k-NN. Queries are created from data set + provided. + + Attributes: + k: The number of results to return for the search + repetitions: Number of times to re-run query dataset from beginning + neighbors_data_set_format: neighbor's dataset format type like hdf5, bigann + neighbors_data_set_path: neighbor's dataset file path + operation-type: search method type + id-field-name: field name that will have unique identifier id in document + request-params: query parameters that can be passed to search request + """ + PARAMS_NAME_K = "k" + PARAMS_NAME_BODY = "body" + PARAMS_NAME_REPETITIONS = "repetitions" + PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" + PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" + PARAMS_NAME_OPERATION_TYPE = "operation-type" + PARAMS_VALUE_VECTOR_SEARCH = "vector-search" + PARAMS_NAME_ID_FIELD_NAME = "id-field-name" + PARAMS_NAME_REQUEST_PARAMS = "request-params" + PARAMS_NAME_SOURCE = "_source" + PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results" + + def __init__(self, params, query_params): + super().__init__(params, Context.QUERY) + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) + self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) + self.current_rep = 1 + self.neighbors_data_set_format = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format) + self.neighbors_data_set_path = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH, params, self.data_set_path) + self.neighbors_data_set: DataSet = get_data_set( + self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + operation_type = parse_string_parameter(self.PARAMS_NAME_OPERATION_TYPE, params, + self.PARAMS_VALUE_VECTOR_SEARCH) + self.query_params = query_params + self.query_params.update({ + self.PARAMS_NAME_K: self.k, + self.PARAMS_NAME_OPERATION_TYPE: operation_type, + self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), + }) + + def _update_request_params(self): + request_params = self.query_params.get(self.PARAMS_NAME_REQUEST_PARAMS, {}) + request_params[self.PARAMS_NAME_SOURCE] = request_params.get( + self.PARAMS_NAME_SOURCE, "false") + request_params[self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS] = request_params.get( + self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false") + self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params}) + + def params(self): + """ + Returns: A query parameter with a vector and neighbor from a data set + """ + is_dataset_exhausted = self.current >= self.num_vectors + self.offset + + if is_dataset_exhausted and self.current_rep < self.repetitions: + self.data_set.seek(self.offset) + self.current = self.offset + self.current_rep += 1 + elif is_dataset_exhausted: + raise StopIteration + vector = self.data_set.read(1)[0] + neighbor = self.neighbors_data_set.read(1)[0] + true_neighbors = list(map(str, neighbor[:self.k])) + self.query_params.update({ + "neighbors": true_neighbors, + }) + self._update_request_params() + + self.query_params.update({ + self.PARAMS_NAME_BODY: self._build_vector_search_query_body(self.field_name, vector)}) + self.current += 1 + self.percent_completed = self.current / self.total + return self.query_params + + def _build_vector_search_query_body(self, field_name: str, vector) -> dict: + """Builds a k-NN request that can be used to execute an approximate nearest + neighbor search against a k-NN plugin index + Args: + field_name: name of field to search + vector: vector used for query + Returns: + A dictionary containing the body used for search query + """ + return { + "size": self.k, + "query": { + "knn": { + field_name: { + "vector": vector, + "k": self.k + } + } + } + } + + def get_target(workload, params): if len(workload.indices) == 1: default_target = workload.indices[0].name @@ -1215,6 +1411,7 @@ def read_bulk(self): register_param_source_for_operation(workload.OperationType.Bulk, BulkIndexParamSource) register_param_source_for_operation(workload.OperationType.Search, SearchParamSource) +register_param_source_for_operation(workload.OperationType.VectorSearch, VectorSearchParamSource) register_param_source_for_operation(workload.OperationType.CreateIndex, CreateIndexParamSource) register_param_source_for_operation(workload.OperationType.DeleteIndex, DeleteIndexParamSource) register_param_source_for_operation(workload.OperationType.CreateDataStream, CreateDataStreamParamSource) diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 6505f8648..211cfbed5 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -24,11 +24,20 @@ # pylint: disable=protected-access import random +import shutil +import tempfile from unittest import TestCase +import numpy as np + from osbenchmark import exceptions -from osbenchmark.workload import params, workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import Context, HDF5DataSet +from osbenchmark.utils.parse import ConfigurationError +from osbenchmark.workload import params, workload +from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource +from tests.utils.dataset_helper import create_data_set +from tests.utils.dataset_test import DEFAULT_NUM_VECTORS class StaticBulkReader: @@ -2476,3 +2485,236 @@ def test_force_merge_all_params(self): self.assertEqual(30, p["request-timeout"]) self.assertEqual(1, p["max-num-segments"]) self.assertEqual("polling", p["mode"]) + + +class VectorSearchParamSourceTests(TestCase): + DEFAULT_INDEX_NAME = "test-index" + DEFAULT_FIELD_NAME = "test-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + # Create a data set we know to be valid for convenience + self.valid_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_missing_params(self): + empty_params = dict() + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + empty_params, VectorSearchParamSourceTests.DEFAULT_CONTEXT) + ) + + def test_invalid_data_set_format(self): + invalid_data_set_format = "invalid-data-set-format" + + test_param_source_params = { + "index": VectorSearchParamSourceTests.DEFAULT_INDEX_NAME, + "field": VectorSearchParamSourceTests.DEFAULT_FIELD_NAME, + "data_set_format": invalid_data_set_format, + "data_set_path": self.valid_data_set_path, + } + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_invalid_data_set_path(self): + invalid_data_set_path = "invalid-data-set-path" + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": invalid_data_set_path, + } + self.assertRaises( + FileNotFoundError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_partition_hdf5(self): + num_vectors = 100 + + hdf5_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": hdf5_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vectors_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vectors_per_partition + ) + + def test_partition_bigann(self): + num_vectors = 100 + float_extension = "fbin" + + bigann_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + float_extension, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": "bigann", + "data_set_path": bigann_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vecs_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vecs_per_partition + ) + + def _test_partition( + self, + test_param_source: VectorDataSetPartitionParamSource, + num_partitions: int, + vec_per_partition: int + ): + for i in range(num_partitions): + test_param_source_i = test_param_source.partition(i, num_partitions) + self.assertEqual(test_param_source_i.num_vectors, vec_per_partition) + self.assertEqual(test_param_source_i.offset, i * vec_per_partition) + + class TestVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): + """ + Empty implementation of ABC VectorsFromDataSetParamSource so that we can + test the concrete methods. + """ + + def params(self): + pass + + +class VectorSearchPartitionPartitionParamSourceTestCase(TestCase): + + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_params(self): + # Create a data set + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir + ) + + # Create a QueryVectorsFromDataSetParamSource with relevant params + test_param_source_params = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "k": k, + } + query_param_source = VectorSearchPartitionParamSource( + test_param_source_params, {"index": self.DEFAULT_INDEX_NAME, "request-params": {}} + ) + + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + self._check_params( + query_param_source.params(), + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k + ) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source.params() + + def _check_params( + self, + params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int + ): + body = params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + query_knn = query.get("knn") + self.assertIsInstance(query_knn, dict) + field = query_knn.get(expected_field) + self.assertIsInstance(field, dict) + vector = field.get("vector") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + k = field.get("k") + self.assertEqual(k, expected_k) + neighbor = params.get("neighbors") + self.assertIsInstance(neighbor, list) + self.assertEqual(len(neighbor), expected_dimension)