diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 2cff1216f..a313d90f0 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -1041,7 +1041,7 @@ def calculate_recall(predictions, neighbors, top_k): if _is_empty_search_results(response_json): self.logger.info("Vector search query returned no results.") return result - id_field = params.get("id_field_name", "_id") + id_field = params.get("id-field-name", "_id") candidates = [] for hit in response_json['hits']['hits']: if id_field in hit: # Will add to candidates if field value is present diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 5cf0bd59a..ec04f320c 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -23,20 +23,22 @@ # under the License. import collections +import copy import inspect import logging import math import numbers import operator import random -from abc import ABC - import time +from abc import ABC, abstractmethod from enum import Enum from osbenchmark import exceptions -from osbenchmark.workload import workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import DataSet, get_data_set, Context +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter +from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} __PARAM_SOURCES_BY_NAME = {} @@ -797,6 +799,200 @@ def params(self): parsed_params.update(self._client_params()) return parsed_params + +class VectorSearchParamSource(SearchParamSource): + def __init__(self, workload, params, **kwargs): + super().__init__(workload, params, **kwargs) + self.delegate_param_source = VectorSearchPartitionParamSource(params, self.query_params) + + def partition(self, partition_index, total_partitions): + return self.delegate_param_source.partition(partition_index, total_partitions) + + def params(self): + raise exceptions.WorkloadConfigError("Do not use a VectorSearchParamSource without partitioning") + + +class VectorDataSetPartitionParamSource(ABC): + """ Abstract class that can read vectors from a data set and partition the + vectors across multiple clients. + + Attributes: + field_name: Name of the field to generate the query for + data_set_format: Format data set is serialized with. bigann or hdf5 + data_set_path: Path to data set + context: Context the data set will be used in. + data_set: Structure containing meta data about data and ability to read + num_vectors: Number of vectors to use from the data set + total: Number of vectors for the partition + current: Current vector offset in data set + infinite: Property of param source signalling that it can be exhausted + percent_completed: Progress indicator for how exhausted data set is + offset: Offset into the data set to start at. Relevant when there are + multiple partitions + """ + + def __init__(self, params, context: Context): + self.field_name: str = parse_string_parameter("field", params) + + self.context = context + self.data_set_format = parse_string_parameter("data_set_format", params) + self.data_set_path = parse_string_parameter("data_set_path", params) + self.data_set: DataSet = get_data_set( + self.data_set_format, self.data_set_path, self.context) + + num_vectors: int = parse_int_parameter( + "num_vectors", params, self.data_set.size()) + # if value is -1 or greater than dataset size, use dataset size as num_vectors + self.num_vectors = self.data_set.size() if ( + num_vectors < 0 or num_vectors > self.data_set.size()) else num_vectors + self.total = self.num_vectors + self.current = 0 + self.infinite = False + self.percent_completed = 0 + self.offset = 0 + + def _is_last_partition(self, partition_index, total_partitions): + return partition_index == total_partitions - 1 + + def partition(self, partition_index, total_partitions): + """ + Splits up the parameters source so that multiple clients can read data + from it. + Args: + partition_index: index of one particular partition + total_partitions: total number of partitions data set is split into + + Returns: + The parameter source for this particular partition + """ + partition_x = copy.copy(self) + + num_vectors = int(self.num_vectors / total_partitions) + + # if partition is not divided equally, add extra docs to the last partition + if self.num_vectors % total_partitions != 0 and self._is_last_partition(partition_index, total_partitions): + num_vectors += self.num_vectors - (num_vectors * total_partitions) + + partition_x.num_vectors = num_vectors + partition_x.offset = int(partition_index * partition_x.num_vectors) + # We need to create a new instance of the data set for each client + partition_x.data_set = get_data_set( + self.data_set_format, + self.data_set_path, + self.context + ) + partition_x.data_set.seek(partition_x.offset) + partition_x.current = partition_x.offset + return partition_x + + @abstractmethod + def params(self): + """ + Returns: A single parameter from this source + """ + + +class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): + """ Parameter source for k-NN. Queries are created from data set + provided. + + Attributes: + k: The number of results to return for the search + repetitions: Number of times to re-run query dataset from beginning + neighbors_data_set_format: neighbor's dataset format type like hdf5, bigann + neighbors_data_set_path: neighbor's dataset file path + operation-type: search method type + id-field-name: field name that will have unique identifier id in document + request-params: query parameters that can be passed to search request + """ + PARAMS_NAME_K = "k" + PARAMS_NAME_BODY = "body" + PARAMS_NAME_REPETITIONS = "repetitions" + PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" + PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" + PARAMS_NAME_OPERATION_TYPE = "operation-type" + PARAMS_VALUE_VECTOR_SEARCH = "vector-search" + PARAMS_NAME_ID_FIELD_NAME = "id-field-name" + PARAMS_NAME_REQUEST_PARAMS = "request-params" + PARAMS_NAME_SOURCE = "_source" + PARAMS_NAME_ALLOW_PARTIAL_RESULTS = "allow_partial_search_results" + + def __init__(self, params, query_params): + super().__init__(params, Context.QUERY) + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) + self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) + self.current_rep = 1 + self.neighbors_data_set_format = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format) + self.neighbors_data_set_path = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH, params, self.data_set_path) + self.neighbors_data_set: DataSet = get_data_set( + self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + operation_type = parse_string_parameter(self.PARAMS_NAME_OPERATION_TYPE, params, + self.PARAMS_VALUE_VECTOR_SEARCH) + self.query_params = query_params + self.query_params.update({ + self.PARAMS_NAME_K: self.k, + self.PARAMS_NAME_OPERATION_TYPE: operation_type, + self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), + }) + + def _update_request_params(self): + request_params = self.query_params.get(self.PARAMS_NAME_REQUEST_PARAMS, {}) + request_params[self.PARAMS_NAME_SOURCE] = request_params.get( + self.PARAMS_NAME_SOURCE, "false") + request_params[self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS] = request_params.get( + self.PARAMS_NAME_ALLOW_PARTIAL_RESULTS, "false") + self.query_params.update({self.PARAMS_NAME_REQUEST_PARAMS: request_params}) + + def params(self): + """ + Returns: A query parameter with a vector and neighbor from a data set + """ + is_dataset_exhausted = self.current >= self.num_vectors + self.offset + + if is_dataset_exhausted and self.current_rep < self.repetitions: + self.data_set.seek(self.offset) + self.current = self.offset + self.current_rep += 1 + elif is_dataset_exhausted: + raise StopIteration + vector = self.data_set.read(1)[0] + neighbor = self.neighbors_data_set.read(1)[0] + true_neighbors = list(map(str, neighbor[:self.k])) + self.query_params.update({ + "neighbors": true_neighbors, + }) + self._update_request_params() + + self.query_params.update({ + self.PARAMS_NAME_BODY: self._build_vector_search_query_body(self.field_name, vector)}) + self.current += 1 + self.percent_completed = self.current / self.total + return self.query_params + + def _build_vector_search_query_body(self, field_name: str, vector) -> dict: + """Builds a k-NN request that can be used to execute an approximate nearest + neighbor search against a k-NN plugin index + Args: + field_name: name of field to search + vector: vector used for query + Returns: + A dictionary containing the body used for search query + """ + return { + "size": self.k, + "query": { + "knn": { + field_name: { + "vector": vector, + "k": self.k + } + } + } + } + + def get_target(workload, params): if len(workload.indices) == 1: default_target = workload.indices[0].name @@ -1215,6 +1411,7 @@ def read_bulk(self): register_param_source_for_operation(workload.OperationType.Bulk, BulkIndexParamSource) register_param_source_for_operation(workload.OperationType.Search, SearchParamSource) +register_param_source_for_operation(workload.OperationType.VectorSearch, VectorSearchParamSource) register_param_source_for_operation(workload.OperationType.CreateIndex, CreateIndexParamSource) register_param_source_for_operation(workload.OperationType.DeleteIndex, DeleteIndexParamSource) register_param_source_for_operation(workload.OperationType.CreateDataStream, CreateDataStreamParamSource) diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 6505f8648..211cfbed5 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -24,11 +24,20 @@ # pylint: disable=protected-access import random +import shutil +import tempfile from unittest import TestCase +import numpy as np + from osbenchmark import exceptions -from osbenchmark.workload import params, workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import Context, HDF5DataSet +from osbenchmark.utils.parse import ConfigurationError +from osbenchmark.workload import params, workload +from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource +from tests.utils.dataset_helper import create_data_set +from tests.utils.dataset_test import DEFAULT_NUM_VECTORS class StaticBulkReader: @@ -2476,3 +2485,236 @@ def test_force_merge_all_params(self): self.assertEqual(30, p["request-timeout"]) self.assertEqual(1, p["max-num-segments"]) self.assertEqual("polling", p["mode"]) + + +class VectorSearchParamSourceTests(TestCase): + DEFAULT_INDEX_NAME = "test-index" + DEFAULT_FIELD_NAME = "test-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + # Create a data set we know to be valid for convenience + self.valid_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_missing_params(self): + empty_params = dict() + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + empty_params, VectorSearchParamSourceTests.DEFAULT_CONTEXT) + ) + + def test_invalid_data_set_format(self): + invalid_data_set_format = "invalid-data-set-format" + + test_param_source_params = { + "index": VectorSearchParamSourceTests.DEFAULT_INDEX_NAME, + "field": VectorSearchParamSourceTests.DEFAULT_FIELD_NAME, + "data_set_format": invalid_data_set_format, + "data_set_path": self.valid_data_set_path, + } + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_invalid_data_set_path(self): + invalid_data_set_path = "invalid-data-set-path" + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": invalid_data_set_path, + } + self.assertRaises( + FileNotFoundError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_partition_hdf5(self): + num_vectors = 100 + + hdf5_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": hdf5_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vectors_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vectors_per_partition + ) + + def test_partition_bigann(self): + num_vectors = 100 + float_extension = "fbin" + + bigann_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + float_extension, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": "bigann", + "data_set_path": bigann_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vecs_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vecs_per_partition + ) + + def _test_partition( + self, + test_param_source: VectorDataSetPartitionParamSource, + num_partitions: int, + vec_per_partition: int + ): + for i in range(num_partitions): + test_param_source_i = test_param_source.partition(i, num_partitions) + self.assertEqual(test_param_source_i.num_vectors, vec_per_partition) + self.assertEqual(test_param_source_i.offset, i * vec_per_partition) + + class TestVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): + """ + Empty implementation of ABC VectorsFromDataSetParamSource so that we can + test the concrete methods. + """ + + def params(self): + pass + + +class VectorSearchPartitionPartitionParamSourceTestCase(TestCase): + + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_params(self): + # Create a data set + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir + ) + + # Create a QueryVectorsFromDataSetParamSource with relevant params + test_param_source_params = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "k": k, + } + query_param_source = VectorSearchPartitionParamSource( + test_param_source_params, {"index": self.DEFAULT_INDEX_NAME, "request-params": {}} + ) + + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + self._check_params( + query_param_source.params(), + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k + ) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source.params() + + def _check_params( + self, + params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int + ): + body = params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + query_knn = query.get("knn") + self.assertIsInstance(query_knn, dict) + field = query_knn.get(expected_field) + self.assertIsInstance(field, dict) + vector = field.get("vector") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + k = field.get("k") + self.assertEqual(k, expected_k) + neighbor = params.get("neighbors") + self.assertIsInstance(neighbor, list) + self.assertEqual(len(neighbor), expected_dimension)