Skip to content

Commit

Permalink
Add support to parse parquet files
Browse files Browse the repository at this point in the history
This commit adds feature to accept parquet file as
1) target index dataset file 2) neighbor dataset
3) query dataset . It also supports partition.
For every partition, one iterator will be created.
Since, parquet file can be very big and it doesn't
support index based loading, we will use batch iterator
and load one batch at a time and extract vectors for
given start and end offset.
We will also use variable to hold batch if complete
batch is not processed. Once the batch is processed,
it will clear the variable.

Signed-off-by: Vijayan Balasubramanian <[email protected]>
  • Loading branch information
VijayanB committed Feb 19, 2024
1 parent aced2b3 commit f379487
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 7 deletions.
115 changes: 114 additions & 1 deletion osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import h5py
import numpy as np
import pyarrow.parquet as pq

from osbenchmark.exceptions import InvalidExtensionException
from osbenchmark.utils.parse import ConfigurationError
Expand Down Expand Up @@ -65,19 +66,22 @@ def reset(self):
"""


def get_data_set(data_set_format: str, path: str, context: Context):
def get_data_set(data_set_format: str, path: str, context: Context = None, column_name: str = None):
"""
Factory method to get instance of Dataset for given format.
Args:
data_set_format: File format like hdf5, bigann
path: Data set file path
context: Dataset Context Enum
column_name: Dataset column name that contains expected dataset
Returns: DataSet instance
"""
if data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(path, context)
if data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(path)
if data_set_format == ParquetDataSet.FORMAT_NAME:
return ParquetDataSet(path, column_name)
raise ConfigurationError("Invalid data set format")


Expand Down Expand Up @@ -144,6 +148,115 @@ def parse_context(context: Context) -> str:
raise Exception("Unsupported context")


class ParquetDataSet(DataSet):
""" Column-oriented data storage format designed for efficient data storage and retrieval
"""

FORMAT_NAME = "parquet"
DEFAULT_BATCH_SIZE = 500

def __init__(self, dataset_path: str, column: str):
if not dataset_path:
raise ConfigurationError("dataset path cannot be None or empty")
if not column:
raise ConfigurationError("dataset column name cannot be None or empty")
self.dataset_path = dataset_path
self.file = None
self.column = column
self.current = self.BEGINNING
self.batch_iterator = None
self.prev_batch = []
self.file_batch_start = 0
self.file_batch_end = 0

def get_batch(self):
if self.prev_batch:
return self.prev_batch
batch = next(self.batch_iterator)
self.file_batch_start = self.file_batch_end
self.file_batch_end = self.file_batch_start + len(batch)
return batch

def _extract_vectors(self, start_offset, end_offset):
"""
Since Parquet files are huge, it is not advisable to load complete file in memory.
Hence, we will iterate as a batch and extract vectors from one or multiple batches
based on start and end offset.
@param start_offset: start offset to extract vectors from parquet file
@param end_offset: start offset to extract vectors from parquet file
@return: array of vectors
"""
vectors = []
while True:
batch = self.get_batch()
if not batch:
break
batch_length = len(batch)
if start_offset > self.file_batch_end:
# run through batches till we get first batch where start_offset is inside the batch
continue
if start_offset < self.file_batch_start:
# update start offset if we have to extract vectors across batches
start_offset = self.file_batch_start
# mark the beginning of extract
copy_begin_offset = batch_length - (self.file_batch_end - start_offset)
if end_offset <= self.file_batch_end:
# mark the end of extract if end offset is within current batch
copy_end_offset = batch_length - (self.file_batch_end - end_offset)
vectors.extend(batch[copy_begin_offset:copy_end_offset][0].to_pandas())
# since we haven't exhausted current batch, push this batch into memory
# and use this as current batch for next read
self.prev_batch = batch
break
# end offset is greater than current batch offset, hence, copy till end of the batch.
vectors.extend(batch[copy_begin_offset:][0].to_pandas())
# remove previously saved batch once complete batch is read
self.prev_batch = []
return np.array(vectors)

def _load(self):
if not self.file:
self.file = pq.ParquetFile(self.dataset_path)
self.batch_iterator = self.file.iter_batches(batch_size=self.DEFAULT_BATCH_SIZE, columns=[self.column])
self.prev_batch = []
self.file_batch_start = 0
self.file_batch_end = 0

def read(self, chunk_size: int):
# load file first before read
self._load()
if self.current >= self.size():
return None
end_offset = self.current + chunk_size
if end_offset > self.size():
end_offset = self.size()

vectors = self._extract_vectors(self.current, end_offset)
self.current = end_offset
return vectors

def seek(self, offset: int):
if offset < self.BEGINNING:
raise Exception("Offset must be greater than or equal to 0")

if offset >= self.size():
raise Exception("Offset must be less than the data set size")

self.current = offset

def size(self):
# load file first before return size
self._load()
return self.file.metadata.num_rows

def reset(self):
self.current = self.BEGINNING
# Force all attributes to reset. This can be achieved by resetting file. Hence,
# subsequent dataset function calls loads file again from beginning
self.file = None


class BigANNVectorDataSet(DataSet):
""" Data-set format for vector data-sets for `Big ANN Benchmarks
<https://big-ann-benchmarks.com/index.html#bench-datasets>`_
Expand Down
11 changes: 8 additions & 3 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ def __init__(self, workload, params, context: Context, **kwargs):
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params, "")
self.data_set_corpus = parse_string_parameter("data_set_corpus", params, "")
self.data_set_name = params.get("data_set_name")
self._validate_data_set(self.data_set_path, self.data_set_corpus)
self.total_num_vectors: int = parse_int_parameter("num_vectors", params, -1)
self.num_vectors = 0
Expand Down Expand Up @@ -952,7 +953,7 @@ def partition(self, partition_index, total_partitions):
self.data_set_path = data_set_path[0]
if self.data_set is None:
self.data_set: DataSet = get_data_set(
self.data_set_format, self.data_set_path, self.context)
self.data_set_format, self.data_set_path, self.context, self.data_set_name)
# if value is -1 or greater than dataset size, use dataset size as num_vectors
if self.total_num_vectors < 0 or self.total_num_vectors > self.data_set.size():
self.total_num_vectors = self.data_set.size()
Expand All @@ -973,7 +974,8 @@ def partition(self, partition_index, total_partitions):
partition_x.data_set = get_data_set(
self.data_set_format,
self.data_set_path,
self.context
self.context,
self.data_set_name,
)
partition_x.data_set.seek(partition_x.offset)
partition_x.current = partition_x.offset
Expand Down Expand Up @@ -1033,6 +1035,7 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource):
PARAMS_NAME_FILTER = "filter"
PARAMS_NAME_REPETITIONS = "repetitions"
PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format"
PARAMS_NAME_NEIGHBORS_DATA_SET_NAME = "neighbors_data_set_name"
PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path"
PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS = "neighbors_data_set_corpus"
PARAMS_NAME_OPERATION_TYPE = "operation-type"
Expand All @@ -1051,6 +1054,7 @@ def __init__(self, workloads, params, query_params, **kwargs):
self.neighbors_data_set_format = parse_string_parameter(
self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format)
self.neighbors_data_set_path = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH)
self.neighbors_data_set_name = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_NAME)
self.neighbors_data_set_corpus = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS)
self._validate_data_set(self.neighbors_data_set_path, self.neighbors_data_set_corpus)
self.neighbors_data_set = None
Expand Down Expand Up @@ -1112,7 +1116,8 @@ def partition(self, partition_index, total_partitions):
self.neighbors_data_set_path = self.data_set_path
# add neighbor instance to partition
partition.neighbors_data_set = get_data_set(
self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS)
self.neighbors_data_set_format, self.neighbors_data_set_path,
Context.NEIGHBORS, self.neighbors_data_set_name)
partition.neighbors_data_set.seek(partition.offset)
return partition

Expand Down
41 changes: 39 additions & 2 deletions tests/utils/dataset_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import h5py
import numpy as np
import pyarrow as pa

from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet
from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet, ParquetDataSet

DEFAULT_RANDOM_STRING_LENGTH = 8

Expand Down Expand Up @@ -166,6 +167,39 @@ def _build_data_set(self, context: DataSetBuildContext):
context.vectors.tofile(f)


class ParquetBuilder(DataSetBuilder):

def __init__(self, column_name):
super().__init__()
self.data_set_meta_data = dict()
self.column_name = column_name

def _validate_data_set_context(self, context: DataSetBuildContext):
if context.path not in self.data_set_meta_data.keys():
self.data_set_meta_data[context.path] = {
context.data_set_context: context
}
return

if context.data_set_context in \
self.data_set_meta_data[context.path].keys():
raise IllegalDataSetBuildContext("Path and context for data set "
"are already present in builder.")

self.data_set_meta_data[context.path][context.data_set_context] = context

@staticmethod
def _validate_extension(context: DataSetBuildContext):
ext = context.path.split('.')[-1]

if ext != ParquetDataSet.FORMAT_NAME:
raise IllegalDataSetBuildContext("Invalid file extension")

def _build_data_set(self, context: DataSetBuildContext):
pa_table = pa.Table.from_arrays(arrays=[context.vectors.tolist()], names=[self.column_name])
pa.parquet.write_table(pa_table, context.path)


def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray:
rng = np.random.default_rng()
return rng.random(size=(num_vectors, dimension), dtype=np.float32)
Expand All @@ -188,7 +222,8 @@ def create_data_set(
dimension: int,
extension: str,
data_set_context: Context,
data_set_dir
data_set_dir,
column_name="",
) -> str:
file_name_base = ''.join(random.choice(string.ascii_letters) for _ in
range(DEFAULT_RANDOM_STRING_LENGTH))
Expand All @@ -201,6 +236,8 @@ def create_data_set(

if extension == HDF5DataSet.FORMAT_NAME:
HDF5Builder().add_data_set_build_context(context).build()
elif extension == ParquetDataSet.FORMAT_NAME:
ParquetBuilder(column_name).add_data_set_build_context(context).build()
else:
BigANNBuilder().add_data_set_build_context(context).build()

Expand Down
36 changes: 35 additions & 1 deletion tests/utils/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import tempfile
from unittest import TestCase

from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet
from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet, ParquetDataSet
from osbenchmark.utils.parse import ConfigurationError
from tests.utils.dataset_helper import create_data_set

DEFAULT_INDEX_NAME = "test-index"
DEFAULT_FIELD_NAME = "test-field"
DEFAULT_DATASET_NAME = "emb"
DEFAULT_CONTEXT = Context.INDEX
DEFAULT_NUM_VECTORS = 10
DEFAULT_DIMENSION = 10
Expand All @@ -33,6 +34,39 @@ def testHDF5AsAcceptableDataSetFormat(self):
self.assertEqual(data_set_instance.FORMAT_NAME, HDF5DataSet.FORMAT_NAME)
self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS)

def testParquetAsAcceptableDataSetFormat(self):
with tempfile.TemporaryDirectory() as data_set_dir:
valid_data_set_path = create_data_set(
700,
DEFAULT_DIMENSION,
ParquetDataSet.FORMAT_NAME,
DEFAULT_CONTEXT,
data_set_dir,
DEFAULT_DATASET_NAME,
)
data_set_instance = get_data_set("parquet", valid_data_set_path, column_name=DEFAULT_DATASET_NAME)
self.assertEqual(data_set_instance.FORMAT_NAME, ParquetDataSet.FORMAT_NAME)
self.assertEqual(data_set_instance.size(), 700)
actual_vectors = []
while True:
partial = data_set_instance.read(200)
if partial is None:
self.assertEqual(len(actual_vectors), 700)
break
# last fetch will have 100 records
self.assertLessEqual(len(partial), 200)
actual_vectors.extend(partial)
# Try with reset
data_set_instance.reset()
actual_vectors = []
while True:
partial = data_set_instance.read(100)
if partial is None:
self.assertEqual(len(actual_vectors), 700)
break
self.assertEqual(len(partial), 100)
actual_vectors.extend(partial)

def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self):
float_extension = "fbin"
data_set_dir = tempfile.mkdtemp()
Expand Down

0 comments on commit f379487

Please sign in to comment.