diff --git a/osbenchmark/exceptions.py b/osbenchmark/exceptions.py index 19a90a412..56fb623e2 100644 --- a/osbenchmark/exceptions.py +++ b/osbenchmark/exceptions.py @@ -110,3 +110,17 @@ class WorkloadConfigError(BenchmarkError): class NotFound(BenchmarkError): pass + + +class InvalidExtensionException(BenchmarkError): + """ + Thrown when invalid or unsupported file extension is passed in config + """ + + +class ConfigurationError(BenchmarkError): + """Exception raised for errors configuration. + + Attributes: + message -- explanation of the error + """ diff --git a/osbenchmark/utils/dataset.py b/osbenchmark/utils/dataset.py new file mode 100644 index 000000000..e2f951ad0 --- /dev/null +++ b/osbenchmark/utils/dataset.py @@ -0,0 +1,242 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import os +import struct +from abc import ABC, ABCMeta, abstractmethod +from enum import Enum +from typing import cast + +import h5py +import numpy as np + +from osbenchmark.exceptions import InvalidExtensionException +from osbenchmark.utils.parse import ConfigurationError + + +class Context(Enum): + """DataSet context enum. Can be used to add additional context for how a + data-set should be interpreted. + """ + INDEX = 1 + QUERY = 2 + NEIGHBORS = 3 + + +class DataSet(ABC): + """DataSet interface. Used for reading data-sets from files. + + Methods: + read: Read a chunk of data from the data-set + seek: Get to position in the data-set + size: Gets the number of items in the data-set + reset: Resets internal state of data-set to beginning + """ + __metaclass__ = ABCMeta + + BEGINNING = 0 + + @abstractmethod + def read(self, chunk_size: int): + """Read vector for given chunk size + @param chunk_size: limits vector size to read + """ + + @abstractmethod + def seek(self, offset: int): + """ + Move reader to given offset + @param offset: value to move reader pointer to + """ + + @abstractmethod + def size(self): + """ + Returns size of dataset + """ + + @abstractmethod + def reset(self): + """ + Resets the dataset reader + """ + + +def get_data_set(data_set_format: str, path: str, context: Context): + """ + Factory method to get instance of Dataset for given format. + Args: + data_set_format: File format like hdf5, bigann + path: Data set file path + context: Dataset Context Enum + Returns: DataSet instance + """ + if data_set_format == HDF5DataSet.FORMAT_NAME: + return HDF5DataSet(path, context) + if data_set_format == BigANNVectorDataSet.FORMAT_NAME: + return BigANNVectorDataSet(path) + raise ConfigurationError("Invalid data set format") + + +class HDF5DataSet(DataSet): + """ Data-set format corresponding to `ANN Benchmarks + `_ + """ + + FORMAT_NAME = "hdf5" + + def __init__(self, dataset_path: str, context: Context): + file = h5py.File(dataset_path) + self.data = cast(h5py.Dataset, file[self.parse_context(context)]) + self.current = self.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + vectors = cast(np.ndarray, self.data[self.current:end_offset]) + self.current = end_offset + return vectors + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + self.current = offset + + def size(self): + return self.data.len() + + def reset(self): + self.current = self.BEGINNING + + @staticmethod + def parse_context(context: Context) -> str: + if context == Context.NEIGHBORS: + return "neighbors" + + if context == Context.INDEX: + return "train" + + if context == Context.QUERY: + return "test" + + raise Exception("Unsupported context") + + +class BigANNVectorDataSet(DataSet): + """ Data-set format for vector data-sets for `Big ANN Benchmarks + `_ + """ + + DATA_SET_HEADER_LENGTH = 8 + U8BIN_EXTENSION = "u8bin" + FBIN_EXTENSION = "fbin" + FORMAT_NAME = "bigann" + SUPPORTED_EXTENSION = [ + FBIN_EXTENSION, U8BIN_EXTENSION + ] + + BYTES_PER_U8INT = 1 + BYTES_PER_FLOAT = 4 + + def __init__(self, dataset_path: str): + self.file = open(dataset_path, 'rb') + self.file.seek(BigANNVectorDataSet.BEGINNING, os.SEEK_END) + num_bytes = self.file.tell() + self.file.seek(BigANNVectorDataSet.BEGINNING) + + if num_bytes < BigANNVectorDataSet.DATA_SET_HEADER_LENGTH: + raise Exception("Invalid file: file size cannot be less than {} bytes".format( + BigANNVectorDataSet.DATA_SET_HEADER_LENGTH)) + + self.num_points = int.from_bytes(self.file.read(4), "little") + self.dimension = int.from_bytes(self.file.read(4), "little") + self.bytes_per_num = self._get_data_size(dataset_path) + + if (num_bytes - BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) != ( + self.num_points * self.dimension * self.bytes_per_num): + raise Exception("Invalid file. File size is not matching with expected estimated " + "value based on number of points, dimension and bytes per point") + + self.reader = self._value_reader(dataset_path) + self.current = BigANNVectorDataSet.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + vectors = np.asarray( + [self._read_vector() for _ in range(end_offset - self.current)] + ) + self.current = end_offset + return vectors + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + bytes_offset = BigANNVectorDataSet.DATA_SET_HEADER_LENGTH + \ + self.dimension * self.bytes_per_num * offset + self.file.seek(bytes_offset) + self.current = offset + + def _read_vector(self): + return np.asarray([self.reader(self.file) for _ in + range(self.dimension)]) + + def size(self): + return self.num_points + + def reset(self): + self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) + self.current = BigANNVectorDataSet.BEGINNING + + def __del__(self): + self.file.close() + + @staticmethod + def _get_extension(file_name): + ext = file_name.split('.')[-1] + if ext not in BigANNVectorDataSet.SUPPORTED_EXTENSION: + raise InvalidExtensionException( + "Unknown extension :{}, supported extensions are: {}".format( + ext, str(BigANNVectorDataSet.SUPPORTED_EXTENSION))) + return ext + + @staticmethod + def _get_data_size(file_name): + ext = BigANNVectorDataSet._get_extension(file_name) + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_U8INT + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_FLOAT + + @staticmethod + def _value_reader(file_name): + ext = BigANNVectorDataSet._get_extension(file_name) + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return lambda file: float(int.from_bytes(file.read(BigANNVectorDataSet.BYTES_PER_U8INT), "little")) + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return lambda file: struct.unpack(' str: + if key not in params: + if default is not None: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if isinstance(params[key], str): + return params[key] + + raise ConfigurationError("Value must be a string for param {}".format(key)) + + +def parse_int_parameter(key: str, params: dict, default: int = None) -> int: + if key not in params: + if default: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if isinstance(params[key], int): + return params[key] + + raise ConfigurationError("Value must be a int for param {}".format(key)) + + +def parse_float_parameter(key: str, params: dict, default: float = None) -> float: + if key not in params: + if default: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if isinstance(params[key], float): + return params[key] + + raise ConfigurationError("Value must be a float for param {}".format(key)) diff --git a/setup.py b/setup.py index 2625cc5c7..3e6e31cdb 100644 --- a/setup.py +++ b/setup.py @@ -100,6 +100,12 @@ def str_from_file(name): "boto3==1.28.62", # Licence: BSD-3-Clause "zstandard==0.22.0", + # License: BSD + # Required for knnvector workload + "h5py==3.10.0", + # License: BSD + # Required for knnvector workload + "numpy==1.24.2", ] tests_require = [ diff --git a/tests/utils/dataset_helper.py b/tests/utils/dataset_helper.py new file mode 100644 index 000000000..f638d2859 --- /dev/null +++ b/tests/utils/dataset_helper.py @@ -0,0 +1,207 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import os +import random +import string +from abc import ABC, abstractmethod + +import h5py +import numpy as np + +from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet + +DEFAULT_RANDOM_STRING_LENGTH = 8 + + +class DataSetBuildContext: + """ Data class capturing information needed to build a particular data set + + Attributes: + data_set_context: Indicator of what the data set is used for, + vectors: A 2D array containing vectors that are used to build data set. + path: string representing path where data set should be serialized to. + """ + + def __init__(self, data_set_context: Context, vectors: np.ndarray, path: str): + self.data_set_context: Context = data_set_context + self.vectors: np.ndarray = vectors # TODO: Validate shape + self.path: str = path + + def get_num_vectors(self) -> int: + return self.vectors.shape[0] + + def get_dimension(self) -> int: + return self.vectors.shape[1] + + def get_type(self) -> np.dtype: + return self.vectors.dtype + + +class DataSetBuilder(ABC): + """ Abstract builder used to create a build a collection of data sets + + Attributes: + data_set_build_contexts: list of data set build contexts that builder + will build. + """ + + def __init__(self): + self.data_set_build_contexts = list() + + def add_data_set_build_context(self, data_set_build_context: DataSetBuildContext): + """ Adds a data set build context to list of contexts to be built. + + Args: + data_set_build_context: DataSetBuildContext to be added to list + + Returns: Updated DataSetBuilder + + """ + self._validate_data_set_context(data_set_build_context) + self.data_set_build_contexts.append(data_set_build_context) + return self + + def build(self): + """ Builds and serializes all data sets build contexts + """ + for data_set_build_context in self.data_set_build_contexts: + self._build_data_set(data_set_build_context) + + @abstractmethod + def _build_data_set(self, context: DataSetBuildContext): + """ Builds an individual data set + + Args: + context: DataSetBuildContext of data set to be built + """ + + @abstractmethod + def _validate_data_set_context(self, context: DataSetBuildContext): + """ Validates that data set context can be added to this builder + + Args: + context: DataSetBuildContext to be validated + """ + + +class HDF5Builder(DataSetBuilder): + + def __init__(self): + super().__init__() + self.data_set_meta_data = dict() + + def _validate_data_set_context(self, context: DataSetBuildContext): + if context.path not in self.data_set_meta_data.keys(): + self.data_set_meta_data[context.path] = { + context.data_set_context: context + } + return + + if context.data_set_context in \ + self.data_set_meta_data[context.path].keys(): + raise IllegalDataSetBuildContext("Path and context for data set " + "are already present in builder.") + + self.data_set_meta_data[context.path][context.data_set_context] = \ + context + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext != HDF5DataSet.FORMAT_NAME: + raise IllegalDataSetBuildContext("Invalid file extension") + + def _build_data_set(self, context: DataSetBuildContext): + # For HDF5, because multiple data sets can be grouped in the same file, + # we will build data sets in memory and not write to disk until + # _flush_data_sets_to_disk is called + with h5py.File(context.path, 'a') as hf: + hf.create_dataset( + HDF5DataSet.parse_context(context.data_set_context), + data=context.vectors + ) + + +class BigANNBuilder(DataSetBuilder): + + def _validate_data_set_context(self, context: DataSetBuildContext): + self._validate_extension(context) + + # prevent the duplication of paths for data sets + data_set_paths = [c.path for c in self.data_set_build_contexts] + if any(data_set_paths.count(x) > 1 for x in data_set_paths): + raise IllegalDataSetBuildContext("Build context paths have to be " + "unique.") + + @staticmethod + def _validate_extension(context: DataSetBuildContext): + ext = context.path.split('.')[-1] + + if ext not in [BigANNVectorDataSet.U8BIN_EXTENSION, BigANNVectorDataSet.FBIN_EXTENSION]: + raise IllegalDataSetBuildContext("Invalid file extension: {}".format(ext)) + + if ext == BigANNVectorDataSet.U8BIN_EXTENSION and context.get_type() != \ + np.uint8: + raise IllegalDataSetBuildContext("Invalid data type for {} ext." + .format(BigANNVectorDataSet + .U8BIN_EXTENSION)) + + if ext == BigANNVectorDataSet.FBIN_EXTENSION and context.get_type() != \ + np.float32: + print(context.get_type()) + raise IllegalDataSetBuildContext("Invalid data type for {} ext." + .format(BigANNVectorDataSet + .FBIN_EXTENSION)) + + def _build_data_set(self, context: DataSetBuildContext): + num_vectors = context.get_num_vectors() + dimension = context.get_dimension() + with open(context.path, 'wb') as f: + f.write(int.to_bytes(num_vectors, 4, "little")) + f.write(int.to_bytes(dimension, 4, "little")) + context.vectors.tofile(f) + + +def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray: + rng = np.random.default_rng() + return rng.random(size=(num_vectors, dimension), dtype=np.float32) + + +class IllegalDataSetBuildContext(Exception): + """Exception raised when passed in DataSetBuildContext is illegal + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message: str): + self.message = f'{message}' + super().__init__(self.message) + + +def create_data_set( + num_vectors: int, + dimension: int, + extension: str, + data_set_context: Context, + data_set_dir +) -> str: + file_name_base = ''.join(random.choice(string.ascii_letters) for _ in + range(DEFAULT_RANDOM_STRING_LENGTH)) + data_set_file_name = "{}.{}".format(file_name_base, extension) + data_set_path = os.path.join(data_set_dir, data_set_file_name) + context = DataSetBuildContext( + data_set_context, + create_random_2d_array(num_vectors, dimension), + data_set_path) + + if extension == HDF5DataSet.FORMAT_NAME: + HDF5Builder().add_data_set_build_context(context).build() + else: + BigANNBuilder().add_data_set_build_context(context).build() + + return data_set_path diff --git a/tests/utils/dataset_test.py b/tests/utils/dataset_test.py new file mode 100644 index 000000000..a864d2372 --- /dev/null +++ b/tests/utils/dataset_test.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +import tempfile +from unittest import TestCase + +from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet +from osbenchmark.utils.parse import ConfigurationError +from tests.utils.dataset_helper import create_data_set + +DEFAULT_INDEX_NAME = "test-index" +DEFAULT_FIELD_NAME = "test-field" +DEFAULT_CONTEXT = Context.INDEX +DEFAULT_NUM_VECTORS = 10 +DEFAULT_DIMENSION = 10 +DEFAULT_RANDOM_STRING_LENGTH = 8 + + +class DataSetTestCase(TestCase): + + def testHDF5AsAcceptableDataSetFormat(self): + with tempfile.TemporaryDirectory() as data_set_dir: + valid_data_set_path = create_data_set( + DEFAULT_NUM_VECTORS, + DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + DEFAULT_CONTEXT, + data_set_dir + ) + data_set_instance = get_data_set("hdf5", valid_data_set_path, Context.INDEX) + self.assertEqual(data_set_instance.FORMAT_NAME, HDF5DataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + + def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self): + float_extension = "fbin" + data_set_dir = tempfile.mkdtemp() + + valid_data_set_path = create_data_set( + DEFAULT_NUM_VECTORS, + DEFAULT_DIMENSION, + float_extension, + DEFAULT_CONTEXT, + data_set_dir + ) + data_set_instance = get_data_set("bigann", valid_data_set_path, Context.INDEX) + self.assertEqual(data_set_instance.FORMAT_NAME, BigANNVectorDataSet.FORMAT_NAME) + self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS) + + def testUnSupportedDataSetFormat(self): + with self.assertRaises(ConfigurationError) as _: + get_data_set("random", "/some/path", Context.INDEX) diff --git a/tests/utils/parse_test.py b/tests/utils/parse_test.py new file mode 100644 index 000000000..efa721216 --- /dev/null +++ b/tests/utils/parse_test.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from unittest import TestCase + +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter, parse_float_parameter + + +class ParseParamsFunctionalTests(TestCase): + params = { + "string-value": "hello-world", + "int-value": 1000, + "float-value": 1.234, + } + + def test_parse_string_parameter_from_params(self): + self.assertEqual("hello-world", parse_string_parameter("string-value", self.params)) + + def test_parse_string_parameter_default(self): + self.assertEqual("vector-search", parse_string_parameter("default-value", self.params, "vector-search")) + + def test_parse_int_parameter_from_params(self): + self.assertEqual(1000, parse_int_parameter("int-value", self.params)) + + def test_parse_int_parameter_default(self): + self.assertEqual(1111, parse_int_parameter("default-value", self.params, 1111)) + + def test_parse_float_parameter_from_params(self): + self.assertEqual(1.234, parse_float_parameter("float-value", self.params)) + + def test_parse_float_parameter_default(self): + self.assertEqual(0.1, parse_float_parameter("default-value", self.params, 0.1))