diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 85aa0cd25..88854f118 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -12,6 +12,7 @@ import h5py import numpy as np +import pyarrow.parquet as pq from osbenchmark.exceptions import InvalidExtensionException from osbenchmark.utils.parse import ConfigurationError @@ -65,19 +66,22 @@ def reset(self): """ -def get_data_set(data_set_format: str, path: str, context: Context): +def get_data_set(data_set_format: str, path: str, context: Context = None, column_name: str = None): """ Factory method to get instance of Dataset for given format. Args: data_set_format: File format like hdf5, bigann path: Data set file path context: Dataset Context Enum + column_name: Dataset column name that contains expected dataset Returns: DataSet instance """ if data_set_format == HDF5DataSet.FORMAT_NAME: return HDF5DataSet(path, context) if data_set_format == BigANNVectorDataSet.FORMAT_NAME: return BigANNVectorDataSet(path) + if data_set_format == ParquetDataSet.FORMAT_NAME: + return ParquetDataSet(path, column_name) raise ConfigurationError("Invalid data set format") @@ -144,6 +148,115 @@ def parse_context(context: Context) -> str: raise Exception("Unsupported context") +class ParquetDataSet(DataSet): + """ Column-oriented data storage format designed for efficient data storage and retrieval + """ + + FORMAT_NAME = "parquet" + DEFAULT_BATCH_SIZE = 500 + + def __init__(self, dataset_path: str, column: str): + if not dataset_path: + raise ConfigurationError("dataset path cannot be None or empty") + if not column: + raise ConfigurationError("dataset column name cannot be None or empty") + self.dataset_path = dataset_path + self.file = None + self.column = column + self.current = self.BEGINNING + self.batch_iterator = None + self.prev_batch = [] + self.file_batch_start = 0 + self.file_batch_end = 0 + + def get_batch(self): + if self.prev_batch: + return self.prev_batch + batch = next(self.batch_iterator) + self.file_batch_start = self.file_batch_end + self.file_batch_end = self.file_batch_start + len(batch) + return batch + + def _extract_vectors(self, start_offset, end_offset): + """ + Since Parquet files are huge, it is not advisable to load complete file in memory. + Hence, we will iterate as a batch and extract vectors from one or multiple batches + based on start and end offset. + + @param start_offset: start offset to extract vectors from parquet file + @param end_offset: start offset to extract vectors from parquet file + @return: array of vectors + """ + vectors = [] + while True: + batch = self.get_batch() + if not batch: + break + batch_length = len(batch) + if start_offset > self.file_batch_end: + # run through batches till we get first batch where start_offset is inside the batch + continue + if start_offset < self.file_batch_start: + # update start offset if we have to extract vectors across batches + start_offset = self.file_batch_start + # mark the beginning of extract + copy_begin_offset = batch_length - (self.file_batch_end - start_offset) + if end_offset <= self.file_batch_end: + # mark the end of extract if end offset is within current batch + copy_end_offset = batch_length - (self.file_batch_end - end_offset) + vectors.extend(batch[copy_begin_offset:copy_end_offset][0].to_pandas()) + # since we haven't exhausted current batch, push this batch into memory + # and use this as current batch for next read + self.prev_batch = batch + break + # end offset is greater than current batch offset, hence, copy till end of the batch. + vectors.extend(batch[copy_begin_offset:][0].to_pandas()) + # remove previously saved batch once complete batch is read + self.prev_batch = [] + return np.array(vectors) + + def _load(self): + if not self.file: + self.file = pq.ParquetFile(self.dataset_path) + self.batch_iterator = self.file.iter_batches(batch_size=self.DEFAULT_BATCH_SIZE, columns=[self.column]) + self.prev_batch = [] + self.file_batch_start = 0 + self.file_batch_end = 0 + + def read(self, chunk_size: int): + # load file first before read + self._load() + if self.current >= self.size(): + return None + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + vectors = self._extract_vectors(self.current, end_offset) + self.current = end_offset + return vectors + + def seek(self, offset: int): + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + self.current = offset + + def size(self): + # load file first before return size + self._load() + return self.file.metadata.num_rows + + def reset(self): + self.current = self.BEGINNING + # Force all attributes to reset. This can be achieved by resetting file. Hence, + # subsequent dataset function calls loads file again from beginning + self.file = None + + class BigANNVectorDataSet(DataSet): """ Data-set format for vector data-sets for `Big ANN Benchmarks `_ diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index a8e95793c..06838820c 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -888,6 +888,7 @@ def __init__(self, workload, params, context: Context, **kwargs): self.data_set_format = parse_string_parameter("data_set_format", params) self.data_set_path = parse_string_parameter("data_set_path", params, "") self.data_set_corpus = parse_string_parameter("data_set_corpus", params, "") + self.data_set_name = params.get("data_set_name") self._validate_data_set(self.data_set_path, self.data_set_corpus) self.total_num_vectors: int = parse_int_parameter("num_vectors", params, -1) self.num_vectors = 0 @@ -952,7 +953,7 @@ def partition(self, partition_index, total_partitions): self.data_set_path = data_set_path[0] if self.data_set is None: self.data_set: DataSet = get_data_set( - self.data_set_format, self.data_set_path, self.context) + self.data_set_format, self.data_set_path, self.context, self.data_set_name) # if value is -1 or greater than dataset size, use dataset size as num_vectors if self.total_num_vectors < 0 or self.total_num_vectors > self.data_set.size(): self.total_num_vectors = self.data_set.size() @@ -973,7 +974,8 @@ def partition(self, partition_index, total_partitions): partition_x.data_set = get_data_set( self.data_set_format, self.data_set_path, - self.context + self.context, + self.data_set_name, ) partition_x.data_set.seek(partition_x.offset) partition_x.current = partition_x.offset @@ -1033,6 +1035,7 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): PARAMS_NAME_FILTER = "filter" PARAMS_NAME_REPETITIONS = "repetitions" PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" + PARAMS_NAME_NEIGHBORS_DATA_SET_NAME = "neighbors_data_set_name" PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS = "neighbors_data_set_corpus" PARAMS_NAME_OPERATION_TYPE = "operation-type" @@ -1051,6 +1054,7 @@ def __init__(self, workloads, params, query_params, **kwargs): self.neighbors_data_set_format = parse_string_parameter( self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format) self.neighbors_data_set_path = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH) + self.neighbors_data_set_name = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_NAME) self.neighbors_data_set_corpus = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS) self._validate_data_set(self.neighbors_data_set_path, self.neighbors_data_set_corpus) self.neighbors_data_set = None @@ -1112,7 +1116,8 @@ def partition(self, partition_index, total_partitions): self.neighbors_data_set_path = self.data_set_path # add neighbor instance to partition partition.neighbors_data_set = get_data_set( - self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + self.neighbors_data_set_format, self.neighbors_data_set_path, + Context.NEIGHBORS, self.neighbors_data_set_name) partition.neighbors_data_set.seek(partition.offset) return partition diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py index f638d2859..87e2a79c6 100644 --- a/tests/utils/dataset_helper.py +++ b/tests/utils/dataset_helper.py @@ -10,8 +10,9 @@ import h5py import numpy as np +import pyarrow as pa -from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet +from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet, ParquetDataSet DEFAULT_RANDOM_STRING_LENGTH = 8 @@ -166,6 +167,39 @@ def _build_data_set(self, context: DataSetBuildContext): context.vectors.tofile(f) +class ParquetBuilder(DataSetBuilder): + + def __init__(self, column_name): + super().__init__() + self.data_set_meta_data = dict() + self.column_name = column_name + + def _validate_data_set_context(self, context: DataSetBuildContext): + if context.path not in self.data_set_meta_data.keys(): + self.data_set_meta_data[context.path] = { + context.data_set_context: context + } + return + + if context.data_set_context in \ + self.data_set_meta_data[context.path].keys(): + raise IllegalDataSetBuildContext("Path and context for data set " + "are already present in builder.") + + self.data_set_meta_data[context.path][context.data_set_context] = context + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext != ParquetDataSet.FORMAT_NAME: + raise IllegalDataSetBuildContext("Invalid file extension") + + def _build_data_set(self, context: DataSetBuildContext): + pa_table = pa.Table.from_arrays(arrays=[context.vectors.tolist()], names=[self.column_name]) + pa.parquet.write_table(pa_table, context.path) + + def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray: rng = np.random.default_rng() return rng.random(size=(num_vectors, dimension), dtype=np.float32) @@ -188,7 +222,8 @@ def create_data_set( dimension: int, extension: str, data_set_context: Context, - data_set_dir + data_set_dir, + column_name="", ) -> str: file_name_base = ''.join(random.choice(string.ascii_letters) for _ in range(DEFAULT_RANDOM_STRING_LENGTH)) @@ -201,6 +236,8 @@ def create_data_set( if extension == HDF5DataSet.FORMAT_NAME: HDF5Builder().add_data_set_build_context(context).build() + elif extension == ParquetDataSet.FORMAT_NAME: + ParquetBuilder(column_name).add_data_set_build_context(context).build() else: BigANNBuilder().add_data_set_build_context(context).build() diff --git a/tests/utils/dataset_test.py b/tests/utils/dataset_test.py index a864d2372..f173b7d31 100644 --- a/tests/utils/dataset_test.py +++ b/tests/utils/dataset_test.py @@ -6,12 +6,13 @@ import tempfile from unittest import TestCase -from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet +from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet, ParquetDataSet from osbenchmark.utils.parse import ConfigurationError from tests.utils.dataset_helper import create_data_set DEFAULT_INDEX_NAME = "test-index" DEFAULT_FIELD_NAME = "test-field" +DEFAULT_DATASET_NAME = "emb" DEFAULT_CONTEXT = Context.INDEX DEFAULT_NUM_VECTORS = 10 DEFAULT_DIMENSION = 10 @@ -33,6 +34,39 @@ def testHDF5AsAcceptableDataSetFormat(self): self.assertEqual(data_set_instance.FORMAT_NAME, HDF5DataSet.FORMAT_NAME) self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + def testParquetAsAcceptableDataSetFormat(self): + with tempfile.TemporaryDirectory() as data_set_dir: + valid_data_set_path = create_data_set( + 700, + DEFAULT_DIMENSION, + ParquetDataSet.FORMAT_NAME, + DEFAULT_CONTEXT, + data_set_dir, + DEFAULT_DATASET_NAME, + ) + data_set_instance = get_data_set("parquet", valid_data_set_path, column_name=DEFAULT_DATASET_NAME) + self.assertEqual(data_set_instance.FORMAT_NAME, ParquetDataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), 700) + actual_vectors = [] + while True: + partial = data_set_instance.read(200) + if partial is None: + self.assertEqual(len(actual_vectors), 700) + break + # last fetch will have 100 records + self.assertLessEqual(len(partial), 200) + actual_vectors.extend(partial) + # Try with reset + data_set_instance.reset() + actual_vectors = [] + while True: + partial = data_set_instance.read(100) + if partial is None: + self.assertEqual(len(actual_vectors), 700) + break + self.assertEqual(len(partial), 100) + actual_vectors.extend(partial) + def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self): float_extension = "fbin" data_set_dir = tempfile.mkdtemp()