Skip to content

Commit

Permalink
Add Vector Search Custom Bulk
Browse files Browse the repository at this point in the history
Added params and runner to support vector search
bulk operations. This custom bulk vector search
support partitioning files like bigann and hdf5
into multiple bulk request based on bulk size.

Signed-off-by: Vijayan Balasubramanian <[email protected]>
  • Loading branch information
VijayanB committed Dec 28, 2023
1 parent 6782594 commit 2a9eadb
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 3 deletions.
2 changes: 1 addition & 1 deletion osbenchmark/utils/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def parse_string_parameter(key: str, params: dict, default: str = None) -> str:
if key not in params:
if key not in params or not params[key]:
if default is not None:
return default
raise ConfigurationError(
Expand Down
34 changes: 33 additions & 1 deletion osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
from typing import List, Optional

import ijson
from opensearchpy import ConnectionTimeout

from osbenchmark import exceptions, workload
from osbenchmark.utils import convert

# Mapping from operation type to specific runner
from osbenchmark.utils.parse import parse_int_parameter, parse_string_parameter

__RUNNERS = {}

Expand All @@ -58,6 +60,7 @@ def register_default_runners():
register_runner(workload.OperationType.PaginatedSearch, Query(), async_runner=True)
register_runner(workload.OperationType.ScrollSearch, Query(), async_runner=True)
register_runner(workload.OperationType.VectorSearch, Query(), async_runner=True)
register_runner(workload.OperationType.BulkVectorDataSet, BulkVectorDataSet(), async_runner=True)
register_runner(workload.OperationType.RawRequest, RawRequest(), async_runner=True)
register_runner(workload.OperationType.Composite, Composite(), async_runner=True)
register_runner(workload.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True)
Expand Down Expand Up @@ -625,6 +628,35 @@ def __repr__(self, *args, **kwargs):
return "bulk-index"


# TODO: Add retry logic to BulkIndex, so that we can remove BulkVectorDataSet and use BulkIndex.
class BulkVectorDataSet(Runner):
"""
Bulk inserts vector search dataset of type hdf5, bigann
"""

NAME = "bulk-vector-data-set"

async def __call__(self, opensearch, params):
size = parse_int_parameter("size", params)
retries = parse_int_parameter("retries", params, 0) + 1

for attempt in range(retries):
try:
await opensearch.bulk(
body=params["body"]
)

return size, "docs"
except ConnectionTimeout:
self.logger.warning("Bulk vector ingestion timed out. Retrying attempt: %d", attempt)

raise TimeoutError("Failed to submit bulk request in specified number "
"of retries: {}".format(retries))

def __repr__(self, *args, **kwargs):
return self.NAME


class ForceMerge(Runner):
"""
Runs a force merge operation against OpenSearch.
Expand Down Expand Up @@ -1051,7 +1083,7 @@ def calculate_recall(predictions, neighbors, top_k):
if _is_empty_search_results(response_json):
self.logger.info("Vector search query returned no results.")
return result
id_field = params.get("id-field-name", "_id")
id_field = parse_string_parameter("id-field-name", params, "_id")
candidates = []
for hit in response_json['hits']['hits']:
field_value = _get_field_value(hit, id_field)
Expand Down
94 changes: 94 additions & 0 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any

import numpy as np

from osbenchmark import exceptions
from osbenchmark.utils import io
Expand Down Expand Up @@ -1001,6 +1004,96 @@ def _build_vector_search_query_body(self, vector) -> dict:
}


class BulkVectorsDataSetParamSource(ParamSource):
def __init__(self, workload, params, **kwargs):
super().__init__(workload, params, **kwargs)
self.delegate_param_source = BulkVectorsFromDataSetParamSource(params)

def partition(self, partition_index, total_partitions):
return self.delegate_param_source.partition(partition_index, total_partitions)

def params(self):
raise exceptions.WorkloadConfigError("Do not use a VectorSearchParamSource without partitioning")


class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
""" Create bulk index requests from a data set of vectors.
Attributes:
bulk_size: number of vectors per request
retries: number of times to retry the request when it fails
"""

DEFAULT_RETRIES = 10
PARAMS_NAME_ID_FIELD_NAME = "id-field-name"
DEFAULT_ID_FIELD_NAME = "_id"

def __init__(self, params):
super().__init__(params, Context.INDEX)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params,
self.DEFAULT_RETRIES)
self.index_name: str = parse_string_parameter("index", params)
self.id_field_name: str = parse_string_parameter(
self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME)

def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
injection format.
Args:
offset: to start counting from
partition: An array of vectors to transform.
action: Bulk API action.
Returns:
An array of transformed vectors in bulk format.
"""
actions = []
_ = [
actions.extend([action(self.id_field_name, i + self.current), None])
for i in range(len(partition))
]
bulk_contents = []
add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME
for vec, identifier in zip(partition.tolist(), range(self.current, self.current + len(partition))):
row = {self.field_name: vec}
if add_id_field_to_body:
row.update({self.id_field_name: identifier})
bulk_contents.append(row)
actions[1::2] = bulk_contents
return actions

def params(self):
"""
Returns: A bulk index parameter with vectors from a data set.
"""
# TODO: Fix below logic to make sure we index only total number of documents as mentioned in the params.
if self.current >= self.num_vectors + self.offset:
raise StopIteration

def action(id_field_name, doc_id):
# support only index operation
bulk_action = 'index'
metadata = {
'_index': self.index_name
}
# Add id field to metadata only if it is _id
if id_field_name == self.DEFAULT_ID_FIELD_NAME:
metadata.update({id_field_name: doc_id})
return {bulk_action: metadata}

partition = self.data_set.read(self.bulk_size)
body = self.bulk_transform(partition, action)
size = len(body) // 2
self.current += size
self.percent_completed = self.current / self.total

return {
"body": body,
"retries": self.retries,
"size": size
}


def get_target(workload, params):
if len(workload.indices) == 1:
default_target = workload.indices[0].name
Expand Down Expand Up @@ -1418,6 +1511,7 @@ def read_bulk(self):


register_param_source_for_operation(workload.OperationType.Bulk, BulkIndexParamSource)
register_param_source_for_operation(workload.OperationType.BulkVectorDataSet, BulkVectorsDataSetParamSource)
register_param_source_for_operation(workload.OperationType.Search, SearchParamSource)
register_param_source_for_operation(workload.OperationType.VectorSearch, VectorSearchParamSource)
register_param_source_for_operation(workload.OperationType.CreateIndex, CreateIndexParamSource)
Expand Down
3 changes: 3 additions & 0 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ class OperationType(Enum):
DeletePointInTime = 15
ListAllPointInTime = 16
VectorSearch = 17
BulkVectorDataSet = 18

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -644,6 +645,8 @@ def from_hyphenated_string(cls, v):
return OperationType.PaginatedSearch
elif v == "vector-search":
return OperationType.VectorSearch
elif v == "bulk-vector-data-set":
return OperationType.BulkVectorDataSet
elif v == "cluster-health":
return OperationType.ClusterHealth
elif v == "bulk":
Expand Down
139 changes: 138 additions & 1 deletion tests/workload/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
from osbenchmark.utils.dataset import Context, HDF5DataSet
from osbenchmark.utils.parse import ConfigurationError
from osbenchmark.workload import params, workload
from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource
from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionParamSource, \
BulkVectorsFromDataSetParamSource
from tests.utils.dataset_helper import create_data_set
from tests.utils.dataset_test import DEFAULT_NUM_VECTORS

Expand Down Expand Up @@ -2774,3 +2775,139 @@ def _check_params(
self.assertEqual(len(neighbor), expected_dimension)
size = body.get("size")
self.assertEqual(size, expected_size if expected_size else expected_k)


class BulkVectorsFromDataSetParamSourceTestCase(TestCase):

DEFAULT_INDEX_NAME = "test-partition-index"
DEFAULT_VECTOR_FIELD_NAME = "test-vector-field"
DEFAULT_CONTEXT = Context.INDEX
DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME
DEFAULT_NUM_VECTORS = 10
DEFAULT_DIMENSION = 10
DEFAULT_RANDOM_STRING_LENGTH = 8
DEFAULT_ID_FIELD_NAME = "_id"

def setUp(self) -> None:
self.data_set_dir = tempfile.mkdtemp()

def tearDown(self):
shutil.rmtree(self.data_set_dir)

def test_params_default(self):
num_vectors = 49
bulk_size = 10
data_set_path = create_data_set(
num_vectors,
self.DEFAULT_DIMENSION,
self.DEFAULT_TYPE,
Context.INDEX,
self.data_set_dir
)

test_param_source_params = {
"index": self.DEFAULT_INDEX_NAME,
"field": self.DEFAULT_VECTOR_FIELD_NAME,
"data_set_format": self.DEFAULT_TYPE,
"data_set_path": data_set_path,
"bulk_size": bulk_size,
"id-field-name": self.DEFAULT_ID_FIELD_NAME,
}
bulk_param_source = BulkVectorsFromDataSetParamSource(test_param_source_params)

# Check each payload returned
vectors_consumed = 0
while vectors_consumed < num_vectors:
expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size)
actual_params = bulk_param_source.params()
self._check_params(
actual_params,
self.DEFAULT_INDEX_NAME,
self.DEFAULT_VECTOR_FIELD_NAME,
self.DEFAULT_DIMENSION,
expected_num_vectors,
self.DEFAULT_ID_FIELD_NAME,
)
vectors_consumed += expected_num_vectors

# Assert last call creates stop iteration
self.assertRaises(
StopIteration,
lambda: bulk_param_source.params()
)

def test_params_custom(self):
num_vectors = 49
bulk_size = 10
data_set_path = create_data_set(
num_vectors,
self.DEFAULT_DIMENSION,
self.DEFAULT_TYPE,
Context.INDEX,
self.data_set_dir
)

test_param_source_params = {
"index": self.DEFAULT_INDEX_NAME,
"field": self.DEFAULT_VECTOR_FIELD_NAME,
"data_set_format": self.DEFAULT_TYPE,
"data_set_path": data_set_path,
"bulk_size": bulk_size,
"id-field-name": "id",
}
bulk_param_source = BulkVectorsFromDataSetParamSource(test_param_source_params)

# Check each payload returned
vectors_consumed = 0
while vectors_consumed < num_vectors:
expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size)
actual_params = bulk_param_source.params()
self._check_params(
actual_params,
self.DEFAULT_INDEX_NAME,
self.DEFAULT_VECTOR_FIELD_NAME,
self.DEFAULT_DIMENSION,
expected_num_vectors,
"id",
)
vectors_consumed += expected_num_vectors

# Assert last call creates stop iteration
self.assertRaises(
StopIteration,
lambda: bulk_param_source.params()
)

def _check_params(
self,
actual_params: dict,
expected_index: str,
expected_vector_field: str,
expected_dimension: int,
expected_num_vectors_in_payload: int,
expected_id_field: str,
):
size = actual_params.get("size")
self.assertEqual(size, expected_num_vectors_in_payload)
body = actual_params.get("body")
self.assertIsInstance(body, list)
self.assertEqual(len(body) // 2, expected_num_vectors_in_payload)

# Bulk payload has 2 parts: first one is the header and the second one
# is the body. The header will have the index name and the body will
# have the vector
for header, req_body in zip(*[iter(body)] * 2):
index = header.get("index")
self.assertIsInstance(index, dict)

index_name = index.get("_index")
self.assertEqual(index_name, expected_index)

vector = req_body.get(expected_vector_field)
self.assertIsInstance(vector, list)
self.assertEqual(len(vector), expected_dimension)
if expected_id_field in index:
self.assertEqual(self.DEFAULT_ID_FIELD_NAME, expected_id_field)
self.assertFalse(expected_id_field in req_body)
continue
self.assertTrue(expected_id_field in req_body)

0 comments on commit 2a9eadb

Please sign in to comment.