From 7485acc58c4b485a102f22ccf54e83626cb6f226 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Sun, 17 Dec 2023 17:04:57 -0800 Subject: [PATCH] Add dataset parser Added Hdf5, Bigann dataset parser. Added test cases for dataset and parser. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/utils/dataset.py | 221 ++++++++++++++++++++++++++++++++++ osbenchmark/utils/parse.py | 59 +++++++++ tests/utils/dataset_helper.py | 219 +++++++++++++++++++++++++++++++++ tests/utils/dataset_test.py | 53 ++++++++ tests/utils/parse_test.py | 34 ++++++ 5 files changed, 586 insertions(+) create mode 100644 osbenchmark/utils/dataset.py create mode 100644 osbenchmark/utils/parse.py create mode 100644 tests/utils/dataset_helper.py create mode 100644 tests/utils/dataset_test.py create mode 100644 tests/utils/parse_test.py diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py new file mode 100644 index 000000000..e26940572 --- /dev/null +++ b/osbenchmark/utils/dataset.py @@ -0,0 +1,221 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import os +import struct +from abc import ABC, ABCMeta, abstractmethod +from enum import Enum +from typing import cast + +import h5py +import numpy as np + +from osbenchmark.utils.parse import ConfigurationError + + +class Context(Enum): + """DataSet context enum. Can be used to add additional context for how a + data-set should be interpreted. + """ + INDEX = 1 + QUERY = 2 + NEIGHBORS = 3 + + +class DataSet(ABC): + """DataSet interface. Used for reading data-sets from files. + + Methods: + read: Read a chunk of data from the data-set + seek: Get to position in the data-set + size: Gets the number of items in the data-set + reset: Resets internal state of data-set to beginning + """ + __metaclass__ = ABCMeta + + BEGINNING = 0 + + @abstractmethod + def read(self, chunk_size: int): + pass + + @abstractmethod + def seek(self, offset: int): + pass + + @abstractmethod + def size(self): + pass + + @abstractmethod + def reset(self): + pass + + +def get_data_set(data_set_format: str, path: str, context: Context): + """ + Factory method to get instance of Dataset for given format. + Args: + data_set_format: File format like hdf5, bigann + path: Data set file path + context: Dataset Context Enum + Returns: DataSet instance + """ + if data_set_format == HDF5DataSet.FORMAT_NAME: + return HDF5DataSet(path, context) + if data_set_format == BigANNVectorDataSet.FORMAT_NAME: + return BigANNVectorDataSet(path) + raise ConfigurationError("Invalid data set format") + + +class HDF5DataSet(DataSet): + """ Data-set format corresponding to `ANN Benchmarks + `_ + """ + + FORMAT_NAME = "hdf5" + + def __init__(self, dataset_path: str, context: Context): + file = h5py.File(dataset_path) + self.data = cast(h5py.Dataset, file[self.parse_context(context)]) + self.current = self.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + v = cast(np.ndarray, self.data[self.current:end_offset]) + self.current = end_offset + return v + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + self.current = offset + + def size(self): + return self.data.len() + + def reset(self): + self.current = self.BEGINNING + + @staticmethod + def parse_context(context: Context) -> str: + if context == Context.NEIGHBORS: + return "neighbors" + + if context == Context.INDEX: + return "train" + + if context == Context.QUERY: + return "test" + + raise Exception("Unsupported context") + + +class BigANNVectorDataSet(DataSet): + """ Data-set format for vector data-sets for `Big ANN Benchmarks + `_ + """ + + DATA_SET_HEADER_LENGTH = 8 + U8BIN_EXTENSION = "u8bin" + FBIN_EXTENSION = "fbin" + FORMAT_NAME = "bigann" + + BYTES_PER_U8INT = 1 + BYTES_PER_FLOAT = 4 + + def __init__(self, dataset_path: str): + self.file = open(dataset_path, 'rb') + self.file.seek(BigANNVectorDataSet.BEGINNING, os.SEEK_END) + num_bytes = self.file.tell() + self.file.seek(BigANNVectorDataSet.BEGINNING) + + if num_bytes < BigANNVectorDataSet.DATA_SET_HEADER_LENGTH: + raise Exception("File is invalid") + + self.num_points = int.from_bytes(self.file.read(4), "little") + self.dimension = int.from_bytes(self.file.read(4), "little") + self.bytes_per_num = self._get_data_size(dataset_path) + + if (num_bytes - BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) != self.num_points * \ + self.dimension * self.bytes_per_num: + raise Exception("File is invalid") + + self.reader = self._value_reader(dataset_path) + self.current = BigANNVectorDataSet.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + v = np.asarray([self._read_vector() for _ in + range(end_offset - self.current)]) + self.current = end_offset + return v + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + bytes_offset = BigANNVectorDataSet.DATA_SET_HEADER_LENGTH + \ + self.dimension * self.bytes_per_num * offset + self.file.seek(bytes_offset) + self.current = offset + + def _read_vector(self): + return np.asarray([self.reader(self.file) for _ in + range(self.dimension)]) + + def size(self): + return self.num_points + + def reset(self): + self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) + self.current = BigANNVectorDataSet.BEGINNING + + def __del__(self): + self.file.close() + + @staticmethod + def _get_data_size(file_name): + ext = file_name.split('.')[-1] + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_U8INT + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_FLOAT + + raise Exception("Unknown extension") + + @staticmethod + def _value_reader(file_name): + ext = file_name.split('.')[-1] + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return lambda file: float(int.from_bytes(file.read(BigANNVectorDataSet.BYTES_PER_U8INT), "little")) + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return lambda file: struct.unpack(' str: + if key not in params: + if default is not None: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if type(params[key]) is str: + return params[key] + + raise ConfigurationError("Value must be a string for param {}".format(key)) + + +def parse_int_parameter(key: str, params: dict, default: int = None) -> int: + if key not in params: + if default: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if type(params[key]) is int: + return params[key] + + raise ConfigurationError("Value must be a int for param {}".format(key)) + + +def parse_float_parameter(key: str, params: dict, default: float = None) -> float: + if key not in params: + if default: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if type(params[key]) is float: + return params[key] + + raise ConfigurationError("Value must be a float for param {}".format(key)) + + +class ConfigurationError(Exception): + """Exception raised for errors configuration. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message: str): + self.message = message + super().__init__(self.message) diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py new file mode 100644 index 000000000..462e46243 --- /dev/null +++ b/tests/utils/dataset_helper.py @@ -0,0 +1,219 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import os +import random +import string +from abc import ABC, abstractmethod + +import h5py +import numpy as np + +from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet + +DEFAULT_RANDOM_STRING_LENGTH = 8 + + +class DataSetBuildContext: + """ Data class capturing information needed to build a particular data set + + Attributes: + data_set_context: Indicator of what the data set is used for, + vectors: A 2D array containing vectors that are used to build data set. + path: string representing path where data set should be serialized to. + """ + + def __init__(self, data_set_context: Context, vectors: np.ndarray, path: str): + self.data_set_context: Context = data_set_context + self.vectors: np.ndarray = vectors # TODO: Validate shape + self.path: str = path + + def get_num_vectors(self) -> int: + return self.vectors.shape[0] + + def get_dimension(self) -> int: + return self.vectors.shape[1] + + def get_type(self) -> np.dtype: + return self.vectors.dtype + + +class DataSetBuilder(ABC): + """ Abstract builder used to create a build a collection of data sets + + Attributes: + data_set_build_contexts: list of data set build contexts that builder + will build. + """ + + def __init__(self): + self.data_set_build_contexts = list() + + def add_data_set_build_context(self, data_set_build_context: DataSetBuildContext): + """ Adds a data set build context to list of contexts to be built. + + Args: + data_set_build_context: DataSetBuildContext to be added to list + + Returns: Updated DataSetBuilder + + """ + self._validate_data_set_context(data_set_build_context) + self.data_set_build_contexts.append(data_set_build_context) + return self + + def build(self): + """ Builds and serializes all data sets build contexts + + Returns: + + """ + [self._build_data_set(data_set_build_context) for data_set_build_context + in self.data_set_build_contexts] + + @abstractmethod + def _build_data_set(self, context: DataSetBuildContext): + """ Builds an individual data set + + Args: + context: DataSetBuildContext of data set to be built + + Returns: + + """ + pass + + @abstractmethod + def _validate_data_set_context(self, context: DataSetBuildContext): + """ Validates that data set context can be added to this builder + + Args: + context: DataSetBuildContext to be validated + + Returns: + + """ + pass + + +class HDF5Builder(DataSetBuilder): + + def __init__(self): + super(HDF5Builder, self).__init__() + self.data_set_meta_data = dict() + + def _validate_data_set_context(self, context: DataSetBuildContext): + if context.path not in self.data_set_meta_data.keys(): + self.data_set_meta_data[context.path] = { + context.data_set_context: context + } + return + + if context.data_set_context in \ + self.data_set_meta_data[context.path].keys(): + raise IllegalDataSetBuildContext("Path and context for data set " + "are already present in builder.") + + self.data_set_meta_data[context.path][context.data_set_context] = \ + context + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext != HDF5DataSet.FORMAT_NAME: + raise IllegalDataSetBuildContext("Invalid file extension") + + def _build_data_set(self, context: DataSetBuildContext): + # For HDF5, because multiple data sets can be grouped in the same file, + # we will build data sets in memory and not write to disk until + # _flush_data_sets_to_disk is called + with h5py.File(context.path, 'a') as hf: + hf.create_dataset( + HDF5DataSet.parse_context(context.data_set_context), + data=context.vectors + ) + + +class BigANNBuilder(DataSetBuilder): + + def _validate_data_set_context(self, context: DataSetBuildContext): + self._validate_extension(context) + + # prevent the duplication of paths for data sets + data_set_paths = [c.path for c in self.data_set_build_contexts] + if any(data_set_paths.count(x) > 1 for x in data_set_paths): + raise IllegalDataSetBuildContext("Build context paths have to be " + "unique.") + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext != BigANNVectorDataSet.U8BIN_EXTENSION and ext != \ + BigANNVectorDataSet.FBIN_EXTENSION: + raise IllegalDataSetBuildContext("Invalid file extension") + + if ext == BigANNVectorDataSet.U8BIN_EXTENSION and context.get_type() != \ + np.uint8: + raise IllegalDataSetBuildContext("Invalid data type for {} ext." + .format(BigANNVectorDataSet + .U8BIN_EXTENSION)) + + if ext == BigANNVectorDataSet.FBIN_EXTENSION and context.get_type() != \ + np.float32: + print(context.get_type()) + raise IllegalDataSetBuildContext("Invalid data type for {} ext." + .format(BigANNVectorDataSet + .FBIN_EXTENSION)) + + def _build_data_set(self, context: DataSetBuildContext): + num_vectors = context.get_num_vectors() + dimension = context.get_dimension() + with open(context.path, 'wb') as f: + f.write(int.to_bytes(num_vectors, 4, "little")) + f.write(int.to_bytes(dimension, 4, "little")) + context.vectors.tofile(f) + + +def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray: + rng = np.random.default_rng() + return rng.random(size=(num_vectors, dimension), dtype=np.float32) + + +class IllegalDataSetBuildContext(Exception): + """Exception raised when passed in DataSetBuildContext is illegal + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message: str): + self.message = f'{message}' + super().__init__(self.message) + + +def create_data_set( + num_vectors: int, + dimension: int, + extension: str, + data_set_context: Context, + data_set_dir +) -> str: + file_name_base = ''.join(random.choice(string.ascii_letters) for _ in + range(DEFAULT_RANDOM_STRING_LENGTH)) + data_set_file_name = "{}.{}".format(file_name_base, extension) + data_set_path = os.path.join(data_set_dir, data_set_file_name) + context = DataSetBuildContext( + data_set_context, + create_random_2d_array(num_vectors, dimension), + data_set_path) + + if extension == HDF5DataSet.FORMAT_NAME: + HDF5Builder().add_data_set_build_context(context).build() + else: + BigANNBuilder().add_data_set_build_context(context).build() + + return data_set_path diff --git a/tests/utils/dataset_test.py b/tests/utils/dataset_test.py new file mode 100644 index 000000000..a864d2372 --- /dev/null +++ b/tests/utils/dataset_test.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import tempfile +from unittest import TestCase + +from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet +from osbenchmark.utils.parse import ConfigurationError +from tests.utils.dataset_helper import create_data_set + +DEFAULT_INDEX_NAME = "test-index" +DEFAULT_FIELD_NAME = "test-field" +DEFAULT_CONTEXT = Context.INDEX +DEFAULT_NUM_VECTORS = 10 +DEFAULT_DIMENSION = 10 +DEFAULT_RANDOM_STRING_LENGTH = 8 + + +class DataSetTestCase(TestCase): + + def testHDF5AsAcceptableDataSetFormat(self): + with tempfile.TemporaryDirectory() as data_set_dir: + valid_data_set_path = create_data_set( + DEFAULT_NUM_VECTORS, + DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + DEFAULT_CONTEXT, + data_set_dir + ) + data_set_instance = get_data_set("hdf5", valid_data_set_path, Context.INDEX) + self.assertEqual(data_set_instance.FORMAT_NAME, HDF5DataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + + def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self): + float_extension = "fbin" + data_set_dir = tempfile.mkdtemp() + + valid_data_set_path = create_data_set( + DEFAULT_NUM_VECTORS, + DEFAULT_DIMENSION, + float_extension, + DEFAULT_CONTEXT, + data_set_dir + ) + data_set_instance = get_data_set("bigann", valid_data_set_path, Context.INDEX) + self.assertEqual(data_set_instance.FORMAT_NAME, BigANNVectorDataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + + def testUnSupportedDataSetFormat(self): + with self.assertRaises(ConfigurationError) as _: + get_data_set("random", "/some/path", Context.INDEX) diff --git a/tests/utils/parse_test.py b/tests/utils/parse_test.py new file mode 100644 index 000000000..efa721216 --- /dev/null +++ b/tests/utils/parse_test.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from unittest import TestCase + +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter + + +class ParseParamsFunctionalTests(TestCase): + params = { + "string-value": "hello-world", + "int-value": 1000, + "float-value": 1.234, + } + + def test_parse_string_parameter_from_params(self): + self.assertEqual("hello-world", parse_string_parameter("string-value", self.params)) + + def test_parse_string_parameter_default(self): + self.assertEqual("vector-search", parse_string_parameter("default-value", self.params, "vector-search")) + + def test_parse_int_parameter_from_params(self): + self.assertEqual(1000, parse_int_parameter("int-value", self.params)) + + def test_parse_int_parameter_default(self): + self.assertEqual(1111, parse_int_parameter("default-value", self.params, 1111)) + + def test_parse_float_parameter_from_params(self): + self.assertEqual(1.234, parse_float_parameter("float-value", self.params)) + + def test_parse_float_parameter_default(self): + self.assertEqual(0.1, parse_float_parameter("default-value", self.params, 0.1))