Skip to content

Commit

Permalink
Add support to big-ann ground truth file format (#528)
Browse files Browse the repository at this point in the history
Signed-off-by: Vijayan Balasubramanian <[email protected]>
  • Loading branch information
VijayanB authored May 6, 2024
1 parent 103bfdd commit 0786c1a
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 64 deletions.
182 changes: 127 additions & 55 deletions osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_data_set(data_set_format: str, path: str, context: Context):
if data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(path, context)
if data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(path)
return create_big_ann_dataset(path)
raise ConfigurationError("Invalid data set format")


Expand Down Expand Up @@ -144,50 +144,32 @@ def parse_context(context: Context) -> str:
raise Exception("Unsupported context")


class BigANNVectorDataSet(DataSet):
""" Data-set format for vector data-sets for `Big ANN Benchmarks
<https://big-ann-benchmarks.com/index.html#bench-datasets>`_
"""
class BigANNDataSet(DataSet):

DATA_SET_HEADER_LENGTH = 8
U8BIN_EXTENSION = "u8bin"
FBIN_EXTENSION = "fbin"
FORMAT_NAME = "bigann"
SUPPORTED_EXTENSION = [
FBIN_EXTENSION, U8BIN_EXTENSION
]

BYTES_PER_U8INT = 1
BYTES_PER_FLOAT = 4

def __init__(self, dataset_path: str):
self.dataset_path = dataset_path
self.file = None
self.current = BigANNVectorDataSet.BEGINNING
self.num_points = 0
self.dimension = 0
self.num_bytes = 0
self.current = DataSet.BEGINNING
self.bytes_per_num = 0
self.rows = 0
self.row_length = 0

def _init_internal_params(self):
self.file = open(self.dataset_path, 'rb')
self.file.seek(BigANNVectorDataSet.BEGINNING, os.SEEK_END)
num_bytes = self.file.tell()
self.file.seek(BigANNVectorDataSet.BEGINNING)

if num_bytes < BigANNVectorDataSet.DATA_SET_HEADER_LENGTH:
self.file.seek(DataSet.BEGINNING, os.SEEK_END)
self.num_bytes = self.file.tell()
if self.num_bytes < BigANNDataSet.DATA_SET_HEADER_LENGTH:
raise Exception("Invalid file: file size cannot be less than {} bytes".format(
BigANNVectorDataSet.DATA_SET_HEADER_LENGTH))

self.num_points = int.from_bytes(self.file.read(4), "little")
self.dimension = int.from_bytes(self.file.read(4), "little")
self.bytes_per_num = self._get_data_size(self.dataset_path)

if (num_bytes - BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) != (
self.num_points * self.dimension * self.bytes_per_num):
raise Exception("Invalid file. File size is not matching with expected estimated "
"value based on number of points, dimension and bytes per point")

self.reader = self._value_reader(self.dataset_path)
BigANNDataSet.DATA_SET_HEADER_LENGTH))
self.file.seek(BigANNDataSet.BEGINNING)
self.rows = int.from_bytes(self.file.read(4), "little")
self.row_length = int.from_bytes(self.file.read(4), "little")
self.bytes_per_num = self._get_data_size()
self.reader = self._value_reader()

def _load(self):
# load file if it is not loaded yet
Expand All @@ -210,6 +192,10 @@ def read(self, chunk_size: int):
self.current = end_offset
return vectors

def _get_file_byte_offset(self, offset):
"""Return file byte offset for given offset"""
return BigANNDataSet.DATA_SET_HEADER_LENGTH + (self.row_length * self.bytes_per_num * offset)

def seek(self, offset: int):
# load file first before seek
self._load()
Expand All @@ -219,52 +205,138 @@ def seek(self, offset: int):
if offset >= self.size():
raise Exception("Offset must be less than the data set size")

bytes_offset = BigANNVectorDataSet.DATA_SET_HEADER_LENGTH + (
self.dimension * self.bytes_per_num * offset)
bytes_offset = self._get_file_byte_offset(offset)
self.file.seek(bytes_offset)
self.current = offset

def _read_vector(self):
return np.asarray([self.reader(self.file) for _ in
range(self.dimension)])
return np.asarray([self.reader(self.file) for _ in range(self.row_length)])

def size(self):
# load file first before return size
self._load()
return self.num_points
return self.rows

def reset(self):
if self.file:
self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH)
self.current = BigANNVectorDataSet.BEGINNING
self.file.seek(BigANNDataSet.DATA_SET_HEADER_LENGTH)
self.current = BigANNDataSet.BEGINNING

def __del__(self):
if self.file:
self.file.close()

@staticmethod
def _get_extension(file_name):
ext = file_name.split('.')[-1]
if ext not in BigANNVectorDataSet.SUPPORTED_EXTENSION:
@abstractmethod
def _get_supported_extension(self):
"""Return list of supported extension by this dataset"""

def _get_extension(self):
ext = self.dataset_path.split('.')[-1]
supported_extension = self._get_supported_extension()
if ext not in supported_extension:
raise InvalidExtensionException(
"Unknown extension :{}, supported extensions are: {}".format(
ext, str(BigANNVectorDataSet.SUPPORTED_EXTENSION)))
ext, str(supported_extension)))
return ext

@staticmethod
def _get_data_size(file_name):
ext = BigANNVectorDataSet._get_extension(file_name)
if ext == BigANNVectorDataSet.U8BIN_EXTENSION:
@abstractmethod
def get_data_size(self, extension):
"""Return data size based on extension"""

def _get_data_size(self):
"""Return data size"""
ext = self._get_extension()
return self.get_data_size(ext)

@abstractmethod
def _get_value_reader(self, extension):
"""Return value reader based on extension"""

def _value_reader(self):
ext = self._get_extension()
return self._get_value_reader(ext)


class BigANNVectorDataSet(BigANNDataSet):
""" Data-set format for vector data-sets for `Big ANN Benchmarks
<https://big-ann-benchmarks.com/index.html#bench-datasets>`
"""

U8BIN_EXTENSION = "u8bin"
FBIN_EXTENSION = "fbin"
SUPPORTED_EXTENSION = [
FBIN_EXTENSION, U8BIN_EXTENSION
]

BYTES_PER_U8INT = 1
BYTES_PER_FLOAT = 4

def _init_internal_params(self):
super()._init_internal_params()
if (self.num_bytes - BigANNDataSet.DATA_SET_HEADER_LENGTH) != (
self.rows * self.row_length * self.bytes_per_num):
raise Exception("Invalid file. File size is not matching with expected estimated "
"value based on number of points, dimension and bytes per point")

def _get_supported_extension(self):
return BigANNVectorDataSet.SUPPORTED_EXTENSION

def get_data_size(self, extension):
if extension == BigANNVectorDataSet.U8BIN_EXTENSION:
return BigANNVectorDataSet.BYTES_PER_U8INT

if ext == BigANNVectorDataSet.FBIN_EXTENSION:
if extension == BigANNVectorDataSet.FBIN_EXTENSION:
return BigANNVectorDataSet.BYTES_PER_FLOAT

@staticmethod
def _value_reader(file_name):
ext = BigANNVectorDataSet._get_extension(file_name)
if ext == BigANNVectorDataSet.U8BIN_EXTENSION:
return None

def _get_value_reader(self, extension):
if extension == BigANNVectorDataSet.U8BIN_EXTENSION:
return lambda file: float(int.from_bytes(file.read(BigANNVectorDataSet.BYTES_PER_U8INT), "little"))

if ext == BigANNVectorDataSet.FBIN_EXTENSION:
if extension == BigANNVectorDataSet.FBIN_EXTENSION:
return lambda file: struct.unpack('<f', file.read(BigANNVectorDataSet.BYTES_PER_FLOAT))

return None


class BigANNGroundTruthDataSet(BigANNDataSet):
""" Data-set format for neighbor data-sets for `Big ANN Benchmarks
<https://big-ann-benchmarks.com/index.html#bench-datasets>`"""

BIN_EXTENSION = "bin"
SUPPORTED_EXTENSION = [BIN_EXTENSION]

BYTES_PER_UNSIGNED_INT32 = 4

def _init_internal_params(self):
super()._init_internal_params()
# The ground truth binary files consist of the following information:
# num_queries(uint32_t) K-NN(uint32) followed by num_queries X K x sizeof(uint32_t) bytes of data
# representing the IDs of the K-nearest neighbors of the queries, followed by num_queries X K x sizeof(float)
# bytes of data representing the distances to the corresponding points.
if (self.num_bytes - BigANNDataSet.DATA_SET_HEADER_LENGTH) != 2 * (
self.rows * self.row_length * self.bytes_per_num):
raise Exception("Invalid file. File size is not matching with expected estimated "
"value based on number of queries, k and bytes per query")

def _get_supported_extension(self):
return BigANNGroundTruthDataSet.SUPPORTED_EXTENSION

def get_data_size(self, extension):
return BigANNGroundTruthDataSet.BYTES_PER_UNSIGNED_INT32

def _get_value_reader(self, extension):
return lambda file: int.from_bytes(
file.read(BigANNGroundTruthDataSet.BYTES_PER_UNSIGNED_INT32), "little")


def create_big_ann_dataset(file_path: str):
if not file_path:
raise Exception("Invalid file path")
extension = file_path.split('.')[-1]
if extension in BigANNGroundTruthDataSet.SUPPORTED_EXTENSION:
return BigANNGroundTruthDataSet(file_path)
if extension in BigANNVectorDataSet.SUPPORTED_EXTENSION:
return BigANNVectorDataSet(file_path)
raise Exception("Unsupported file")
68 changes: 60 additions & 8 deletions tests/utils/dataset_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import h5py
import numpy as np

from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet
from osbenchmark.utils.dataset import Context, BigANNVectorDataSet, HDF5DataSet, BigANNGroundTruthDataSet

DEFAULT_RANDOM_STRING_LENGTH = 8

Expand All @@ -30,10 +30,10 @@ def __init__(self, data_set_context: Context, vectors: np.ndarray, path: str):
self.vectors: np.ndarray = vectors # TODO: Validate shape
self.path: str = path

def get_num_vectors(self) -> int:
def get_num_rows(self) -> int:
return self.vectors.shape[0]

def get_dimension(self) -> int:
def get_row_length(self) -> int:
return self.vectors.shape[1]

def get_type(self) -> np.dtype:
Expand Down Expand Up @@ -126,7 +126,7 @@ def _build_data_set(self, context: DataSetBuildContext):
)


class BigANNBuilder(DataSetBuilder):
class BigANNVectorBuilder(DataSetBuilder):

def _validate_data_set_context(self, context: DataSetBuildContext):
self._validate_extension(context)
Expand All @@ -152,20 +152,48 @@ def _validate_extension(context: DataSetBuildContext):

if ext == BigANNVectorDataSet.FBIN_EXTENSION and context.get_type() != \
np.float32:
print(context.get_type())
raise IllegalDataSetBuildContext("Invalid data type for {} ext."
.format(BigANNVectorDataSet
.FBIN_EXTENSION))

def _build_data_set(self, context: DataSetBuildContext):
num_vectors = context.get_num_vectors()
dimension = context.get_dimension()
num_vectors = context.get_num_rows()
dimension = context.get_row_length()
with open(context.path, 'wb') as f:
f.write(int.to_bytes(num_vectors, 4, "little"))
f.write(int.to_bytes(dimension, 4, "little"))
context.vectors.tofile(f)


class BigANNGroundTruthBuilder(BigANNVectorBuilder):

@staticmethod
def _validate_extension(context: DataSetBuildContext):
ext = context.path.split('.')[-1]

if ext not in [BigANNGroundTruthDataSet.BIN_EXTENSION]:
raise IllegalDataSetBuildContext("Invalid file extension: {}".format(ext))

if context.get_type() != np.float32:
raise IllegalDataSetBuildContext("Invalid data type for {} ext."
.format(BigANNGroundTruthDataSet
.BIN_EXTENSION))

def _build_data_set(self, context: DataSetBuildContext):
num_queries = context.get_num_rows()
k = context.get_row_length()
with open(context.path, 'wb') as f:
# Writing number of queries
f.write(int.to_bytes(num_queries, 4, "little"))
# Writing number of neighbors in a query
f.write(int.to_bytes(k, 4, "little"))
# Writing ids of neighbors
context.vectors.tofile(f)
# Writing distance of neighbors. For simplicity, we are rewriting the ids to fill the
# file with distance.
context.vectors.tofile(f)


def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray:
rng = np.random.default_rng()
return rng.random(size=(num_vectors, dimension), dtype=np.float32)
Expand Down Expand Up @@ -206,6 +234,30 @@ def create_data_set(
if extension == HDF5DataSet.FORMAT_NAME:
HDF5Builder().add_data_set_build_context(context).build()
else:
BigANNBuilder().add_data_set_build_context(context).build()
BigANNVectorBuilder().add_data_set_build_context(context).build()

return data_set_path


def create_ground_truth(
num_queries: int,
k: int,
extension: str,
data_set_context: Context,
data_set_dir,
file_path: str = None
) -> str:
if file_path:
data_set_path = file_path
else:
file_name_base = ''.join(random.choice(string.ascii_letters) for _ in
range(DEFAULT_RANDOM_STRING_LENGTH))
data_set_file_name = "{}.{}".format(file_name_base, extension)
data_set_path = os.path.join(data_set_dir, data_set_file_name)
context = DataSetBuildContext(
data_set_context,
create_random_2d_array(num_queries, k),
data_set_path)

BigANNGroundTruthBuilder().add_data_set_build_context(context).build()
return data_set_path
17 changes: 16 additions & 1 deletion tests/utils/dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from osbenchmark.utils.dataset import Context, get_data_set, HDF5DataSet, BigANNVectorDataSet
from osbenchmark.utils.parse import ConfigurationError
from tests.utils.dataset_helper import create_data_set
from tests.utils.dataset_helper import create_data_set, create_ground_truth

DEFAULT_INDEX_NAME = "test-index"
DEFAULT_FIELD_NAME = "test-field"
Expand Down Expand Up @@ -48,6 +48,21 @@ def testBigANNAsAcceptableDataSetFormatWithFloatExtension(self):
self.assertEqual(data_set_instance.FORMAT_NAME, BigANNVectorDataSet.FORMAT_NAME)
self.assertEqual(data_set_instance.size(), DEFAULT_NUM_VECTORS)

def testBigANNGroundTruthAsAcceptableDataSetFormat(self):
bin_extension = "bin"
data_set_dir = tempfile.mkdtemp()

valid_data_set_path = create_ground_truth(
100,
10,
bin_extension,
Context.NEIGHBORS,
data_set_dir
)
data_set_instance = get_data_set("bigann", valid_data_set_path, Context.NEIGHBORS)
self.assertEqual(data_set_instance.FORMAT_NAME, BigANNVectorDataSet.FORMAT_NAME)
self.assertEqual(data_set_instance.size(), 100)

def testUnSupportedDataSetFormat(self):
with self.assertRaises(ConfigurationError) as _:
get_data_set("random", "/some/path", Context.INDEX)

0 comments on commit 0786c1a

Please sign in to comment.