From f379487f84f0ac884dcb2c1a08286fe1033f8087 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Wed, 14 Feb 2024 21:54:49 -0800 Subject: [PATCH] Add support to parse parquet files This commit adds feature to accept parquet file as 1) target index dataset file 2) neighbor dataset 3) query dataset . It also supports partition. For every partition, one iterator will be created. Since, parquet file can be very big and it doesn't support index based loading, we will use batch iterator and load one batch at a time and extract vectors for given start and end offset. We will also use variable to hold batch if complete batch is not processed. Once the batch is processed, it will clear the variable. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/utils/dataset.py | 115 ++++++++++++++++++++++++++++++++- osbenchmark/workload/params.py | 11 +++- tests/utils/dataset_helper.py | 41 +++++++++++- tests/utils/dataset_test.py | 36 ++++++++++- 4 files changed, 196 insertions(+), 7 deletions(-) diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py index 85aa0cd25..88854f118 100644 --- a/osbenchmark/utils/dataset.py +++ b/osbenchmark/utils/dataset.py @@ -12,6 +12,7 @@ import h5py import numpy as np +import pyarrow.parquet as pq from osbenchmark.exceptions import InvalidExtensionException from osbenchmark.utils.parse import ConfigurationError @@ -65,19 +66,22 @@ def reset(self): """ -def get_data_set(data_set_format: str, path: str, context: Context): +def get_data_set(data_set_format: str, path: str, context: Context = None, column_name: str = None): """ Factory method to get instance of Dataset for given format. Args: data_set_format: File format like hdf5, bigann path: Data set file path context: Dataset Context Enum + column_name: Dataset column name that contains expected dataset Returns: DataSet instance """ if data_set_format == HDF5DataSet.FORMAT_NAME: return HDF5DataSet(path, context) if data_set_format == BigANNVectorDataSet.FORMAT_NAME: return BigANNVectorDataSet(path) + if data_set_format == ParquetDataSet.FORMAT_NAME: + return ParquetDataSet(path, column_name) raise ConfigurationError("Invalid data set format") @@ -144,6 +148,115 @@ def parse_context(context: Context) -> str: raise Exception("Unsupported context") +class ParquetDataSet(DataSet): + """ Column-oriented data storage format designed for efficient data storage and retrieval + """ + + FORMAT_NAME = "parquet" + DEFAULT_BATCH_SIZE = 500 + + def __init__(self, dataset_path: str, column: str): + if not dataset_path: + raise ConfigurationError("dataset path cannot be None or empty") + if not column: + raise ConfigurationError("dataset column name cannot be None or empty") + self.dataset_path = dataset_path + self.file = None + self.column = column + self.current = self.BEGINNING + self.batch_iterator = None + self.prev_batch = [] + self.file_batch_start = 0 + self.file_batch_end = 0 + + def get_batch(self): + if self.prev_batch: + return self.prev_batch + batch = next(self.batch_iterator) + self.file_batch_start = self.file_batch_end + self.file_batch_end = self.file_batch_start + len(batch) + return batch + + def _extract_vectors(self, start_offset, end_offset): + """ + Since Parquet files are huge, it is not advisable to load complete file in memory. + Hence, we will iterate as a batch and extract vectors from one or multiple batches + based on start and end offset. + + @param start_offset: start offset to extract vectors from parquet file + @param end_offset: start offset to extract vectors from parquet file + @return: array of vectors + """ + vectors = [] + while True: + batch = self.get_batch() + if not batch: + break + batch_length = len(batch) + if start_offset > self.file_batch_end: + # run through batches till we get first batch where start_offset is inside the batch + continue + if start_offset < self.file_batch_start: + # update start offset if we have to extract vectors across batches + start_offset = self.file_batch_start + # mark the beginning of extract + copy_begin_offset = batch_length - (self.file_batch_end - start_offset) + if end_offset <= self.file_batch_end: + # mark the end of extract if end offset is within current batch + copy_end_offset = batch_length - (self.file_batch_end - end_offset) + vectors.extend(batch[copy_begin_offset:copy_end_offset][0].to_pandas()) + # since we haven't exhausted current batch, push this batch into memory + # and use this as current batch for next read + self.prev_batch = batch + break + # end offset is greater than current batch offset, hence, copy till end of the batch. + vectors.extend(batch[copy_begin_offset:][0].to_pandas()) + # remove previously saved batch once complete batch is read + self.prev_batch = [] + return np.array(vectors) + + def _load(self): + if not self.file: + self.file = pq.ParquetFile(self.dataset_path) + self.batch_iterator = self.file.iter_batches(batch_size=self.DEFAULT_BATCH_SIZE, columns=[self.column]) + self.prev_batch = [] + self.file_batch_start = 0 + self.file_batch_end = 0 + + def read(self, chunk_size: int): + # load file first before read + self._load() + if self.current >= self.size(): + return None + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + vectors = self._extract_vectors(self.current, end_offset) + self.current = end_offset + return vectors + + def seek(self, offset: int): + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + self.current = offset + + def size(self): + # load file first before return size + self._load() + return self.file.metadata.num_rows + + def reset(self): + self.current = self.BEGINNING + # Force all attributes to reset. This can be achieved by resetting file. Hence, + # subsequent dataset function calls loads file again from beginning + self.file = None + + class BigANNVectorDataSet(DataSet): """ Data-set format for vector data-sets for `Big ANN Benchmarks `_ diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index a8e95793c..06838820c 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -888,6 +888,7 @@ def __init__(self, workload, params, context: Context, **kwargs): self.data_set_format = parse_string_parameter("data_set_format", params) self.data_set_path = parse_string_parameter("data_set_path", params, "") self.data_set_corpus = parse_string_parameter("data_set_corpus", params, "") + self.data_set_name = params.get("data_set_name") self._validate_data_set(self.data_set_path, self.data_set_corpus) self.total_num_vectors: int = parse_int_parameter("num_vectors", params, -1) self.num_vectors = 0 @@ -952,7 +953,7 @@ def partition(self, partition_index, total_partitions): self.data_set_path = data_set_path[0] if self.data_set is None: self.data_set: DataSet = get_data_set( - self.data_set_format, self.data_set_path, self.context) + self.data_set_format, self.data_set_path, self.context, self.data_set_name) # if value is -1 or greater than dataset size, use dataset size as num_vectors if self.total_num_vectors < 0 or self.total_num_vectors > self.data_set.size(): self.total_num_vectors = self.data_set.size() @@ -973,7 +974,8 @@ def partition(self, partition_index, total_partitions): partition_x.data_set = get_data_set( self.data_set_format, self.data_set_path, - self.context + self.context, + self.data_set_name, ) partition_x.data_set.seek(partition_x.offset) partition_x.current = partition_x.offset @@ -1033,6 +1035,7 @@ class VectorSearchPartitionParamSource(VectorDataSetPartitionParamSource): PARAMS_NAME_FILTER = "filter" PARAMS_NAME_REPETITIONS = "repetitions" PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" + PARAMS_NAME_NEIGHBORS_DATA_SET_NAME = "neighbors_data_set_name" PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS = "neighbors_data_set_corpus" PARAMS_NAME_OPERATION_TYPE = "operation-type" @@ -1051,6 +1054,7 @@ def __init__(self, workloads, params, query_params, **kwargs): self.neighbors_data_set_format = parse_string_parameter( self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format) self.neighbors_data_set_path = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH) + self.neighbors_data_set_name = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_NAME) self.neighbors_data_set_corpus = params.get(self.PARAMS_NAME_NEIGHBORS_DATA_SET_CORPUS) self._validate_data_set(self.neighbors_data_set_path, self.neighbors_data_set_corpus) self.neighbors_data_set = None @@ -1112,7 +1116,8 @@ def partition(self, partition_index, total_partitions): self.neighbors_data_set_path = self.data_set_path # add neighbor instance to partition partition.neighbors_data_set = get_data_set( - self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + self.neighbors_data_set_format, self.neighbors_data_set_path, + Context.NEIGHBORS, self.neighbors_data_set_name) partition.neighbors_data_set.seek(partition.offset) return partition diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py index f638d2859..87e2a79c6 100644 --- a/tests/utils/dataset_helper.py +++ b/tests/utils/dataset_helper.py @@ -10,8 +10,9 @@ import h5py import numpy as np +import pyarrow as pa -from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet +from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet, ParquetDataSet DEFAULT_RANDOM_STRING_LENGTH = 8 @@ -166,6 +167,39 @@ def _build_data_set(self, context: DataSetBuildContext): context.vectors.tofile(f) +class ParquetBuilder(DataSetBuilder): + + def __init__(self, column_name): + super().__init__() + self.data_set_meta_data = dict() + self.column_name = column_name + + def _validate_data_set_context(self, context: DataSetBuildContext): + if context.path not in self.data_set_meta_data.keys(): + self.data_set_meta_data[context.path] = { + context.data_set_context: context + } + return + + if context.data_set_context in \ + self.data_set_meta_data[context.path].keys(): + raise IllegalDataSetBuildContext("Path and context for data set " + "are already present in builder.") + + self.data_set_meta_data[context.path][context.data_set_context] = context + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext != ParquetDataSet.FORMAT_NAME: + raise IllegalDataSetBuildContext("Invalid file extension") + + def _build_data_set(self, context: DataSetBuildContext): + pa_table = pa.Table.from_arrays(arrays=[context.vectors.tolist()], names=[self.column_name]) + pa.parquet.write_table(pa_table, context.path) + + def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray: rng = np.random.default_rng() return rng.random(size=(num_vectors, dimension), dtype=np.float32) @@ -188,7 +222,8 @@ def create_data_set( dimension: int, extension: str, data_set_context: Context, - data_set_dir + data_set_dir, + column_name="", ) -> str: file_name_base = ''.join(random.choice(string.ascii_letters) for _ in range(DEFAULT_RANDOM_STRING_LENGTH)) @@ -201,6 +236,8 @@ def create_data_set( if extension == HDF5DataSet.FORMAT_NAME: HDF5Builder().add_data_set_build_context(context).build() + elif extension == ParquetDataSet.FORMAT_NAME: + ParquetBuilder(column_name).add_data_set_build_context(context).build() else: BigANNBuilder().add_data_set_build_context(context).build() diff --git a/tests/utils/dataset_test.py b/tests/utils/dataset_test.py index a864d2372..f173b7d31 100644 --- a/tests/utils/dataset_test.py +++ b/tests/utils/dataset_test.py @@ -6,12 +6,13 @@ import tempfile from unittest import TestCase -from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet +from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet, ParquetDataSet from osbenchmark.utils.parse import ConfigurationError from tests.utils.dataset_helper import create_data_set DEFAULT_INDEX_NAME = "test-index" DEFAULT_FIELD_NAME = "test-field" +DEFAULT_DATASET_NAME = "emb" DEFAULT_CONTEXT = Context.INDEX DEFAULT_NUM_VECTORS = 10 DEFAULT_DIMENSION = 10 @@ -33,6 +34,39 @@ def testHDF5AsAcceptableDataSetFormat(self): self.assertEqual(data_set_instance.FORMAT_NAME, HDF5DataSet.FORMAT_NAME) self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + def testParquetAsAcceptableDataSetFormat(self): + with tempfile.TemporaryDirectory() as data_set_dir: + valid_data_set_path = create_data_set( + 700, + DEFAULT_DIMENSION, + ParquetDataSet.FORMAT_NAME, + DEFAULT_CONTEXT, + data_set_dir, + DEFAULT_DATASET_NAME, + ) + data_set_instance = get_data_set("parquet", valid_data_set_path, column_name=DEFAULT_DATASET_NAME) + self.assertEqual(data_set_instance.FORMAT_NAME, ParquetDataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), 700) + actual_vectors = [] + while True: + partial = data_set_instance.read(200) + if partial is None: + self.assertEqual(len(actual_vectors), 700) + break + # last fetch will have 100 records + self.assertLessEqual(len(partial), 200) + actual_vectors.extend(partial) + # Try with reset + data_set_instance.reset() + actual_vectors = [] + while True: + partial = data_set_instance.read(100) + if partial is None: + self.assertEqual(len(actual_vectors), 700) + break + self.assertEqual(len(partial), 100) + actual_vectors.extend(partial) + def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self): float_extension = "fbin" data_set_dir = tempfile.mkdtemp()