Skip to content

Commit

Permalink
Fix bulk size to retrieve only expected number of vectors
Browse files Browse the repository at this point in the history
Previously we always fetch vector size equal to bulk size. This will
add extra vectors if number of vectors to index is less than bulk size.

Signed-off-by: Vijayan Balasubramanian <[email protected]>
  • Loading branch information
VijayanB committed Jan 31, 2024
1 parent 36ae46a commit 01b5058
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ class VectorDataSetPartitionParamSource(ParamSource):
data_set_path: Path to data set
context: Context the data set will be used in.
data_set: Structure containing meta data about data and ability to read
num_vectors: Number of vectors to use from the data set
total_num_vectors: Number of vectors to use from the data set
num_vectors: Number of vectors to use for given partition
total: Number of vectors for the partition
current: Current vector offset in data set
infinite: Property of param source signalling that it can be exhausted
Expand All @@ -840,7 +841,8 @@ def __init__(self, workload, params, context: Context, **kwargs):
self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params)
self.num_vectors: int = parse_int_parameter("num_vectors", params, -1)
self.total_num_vectors: int = parse_int_parameter("num_vectors", params, -1)
self.num_vectors = 0
self.total = 1
self.current = 0
self.percent_completed = 0
Expand Down Expand Up @@ -869,20 +871,21 @@ def partition(self, partition_index, total_partitions):
self.data_set: DataSet = get_data_set(
self.data_set_format, self.data_set_path, self.context)
# if value is -1 or greater than dataset size, use dataset size as num_vectors
if self.num_vectors < 0 or self.num_vectors > self.data_set.size():
self.num_vectors = self.data_set.size()
self.total = self.num_vectors
if self.total_num_vectors < 0 or self.total_num_vectors > self.data_set.size():
self.total_num_vectors = self.data_set.size()
self.total = self.total_num_vectors

partition_x = copy.copy(self)

num_vectors = int(self.num_vectors / total_partitions)
min_num_vectors_per_partition = int(self.total_num_vectors / total_partitions)
partition_x.offset = int(partition_index * min_num_vectors_per_partition)
partition_x.num_vectors = min_num_vectors_per_partition

# if partition is not divided equally, add extra docs to the last partition
if self.num_vectors % total_partitions != 0 and self._is_last_partition(partition_index, total_partitions):
num_vectors += self.num_vectors - (num_vectors * total_partitions)
if self.total_num_vectors % total_partitions != 0 and self._is_last_partition(partition_index, total_partitions):
remaining_vectors = self.total_num_vectors - (min_num_vectors_per_partition * total_partitions)
partition_x.num_vectors += remaining_vectors

partition_x.num_vectors = num_vectors
partition_x.offset = int(partition_index * partition_x.num_vectors)
# We need to create a new instance of the data set for each client
partition_x.data_set = get_data_set(
self.data_set_format,
Expand Down Expand Up @@ -936,6 +939,7 @@ def __init__(self, workloads, params, query_params, **kwargs):
self.neighbors_data_set_format = parse_string_parameter(
self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format)
self.neighbors_data_set_path = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH)
self.neighbors_data_set = None
operation_type = parse_string_parameter(self.PARAMS_NAME_OPERATION_TYPE, params,
self.PARAMS_VALUE_VECTOR_SEARCH)
self.query_params = query_params
Expand Down Expand Up @@ -972,6 +976,7 @@ def partition(self, partition_index, total_partitions):
# add neighbor instance to partition
partition.neighbors_data_set = get_data_set(
self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS)
partition.neighbors_data_set.seek(partition.offset)
return partition

def params(self):
Expand All @@ -982,6 +987,7 @@ def params(self):

if is_dataset_exhausted and self.current_rep < self.repetitions:
self.data_set.seek(self.offset)
self.neighbors_data_set.seek(self.offset)
self.current = self.offset
self.current_rep += 1
elif is_dataset_exhausted:
Expand All @@ -996,6 +1002,7 @@ def params(self):
self._update_body_params(vector)
self.current += 1
self.percent_completed = self.current / self.total
print("params called ")
return self.query_params

def _build_vector_search_query_body(self, vector) -> dict:
Expand Down Expand Up @@ -1066,7 +1073,6 @@ def params(self):
"""
Returns: A bulk index parameter with vectors from a data set.
"""
# TODO: Fix below logic to make sure we index only total number of documents as mentioned in the params.
if self.current >= self.num_vectors + self.offset:
raise StopIteration

Expand All @@ -1081,7 +1087,10 @@ def action(id_field_name, doc_id):
metadata.update({id_field_name: doc_id})
return {bulk_action: metadata}

partition = self.data_set.read(self.bulk_size)
remaining_vectors_in_partition = self.num_vectors + self.offset - self.current
# update bulk size if number of vectors to read is less than actual bulk size
bulk_size = min(self.bulk_size, remaining_vectors_in_partition)
partition = self.data_set.read(bulk_size)
body = self.bulk_transform(partition, action)
size = len(body) // 2
self.current += size
Expand Down

0 comments on commit 01b5058

Please sign in to comment.