Skip to content

Commit

Permalink
new classes, methods and config
Browse files Browse the repository at this point in the history
  • Loading branch information
harsha-simhadri committed Oct 13, 2023
1 parent f5bb90b commit a5f0970
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 7 deletions.
30 changes: 30 additions & 0 deletions benchmark/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ def get_dataset_iterator(self, bs=512, split=(1,0)):
j1 = min(j0 + bs, i1)
yield sanitize(x[j0:j1])

def get_data_in_range(self, start, end):
assert start >= 0
assert end <= self.nb
filename = self.get_dataset_fn()
x = xbin_mmap(filename, dtype=self.dtype, maxn=self.nb)
return x[start:end]

def search_type(self):
return "knn"

Expand Down Expand Up @@ -434,6 +441,28 @@ def distance(self):

def prepare(self, skip_data=False, original_size=10 ** 9):
return super().prepare(skip_data, original_size = self.nb)

class MSTuringClustered30M(DatasetCompetitionFormat):
def __init__(self):
self.nb = 29998994
self.d = 100
self.nq = 10000
self.dtype = "float32"
self.ds_fn = "msturing-29998994-clustered.fbin"
self.qs_fn = "testQuery10K.fbin"
self.gt_fn = "clu_msturing30M_gt100"

self.base_url = ""
self.basedir = os.path.join(BASEDIR, "MSTuring-30M-clustered")

self.private_gt_url = None
self.private_qs_url = None

def distance(self):
return "euclidean"

def prepare(self, skip_data=False, original_size=10 ** 9):
return super().prepare(skip_data, original_size = self.nb)

class MSSPACEV1B(DatasetCompetitionFormat):
def __init__(self, nb_M=1000):
Expand Down Expand Up @@ -984,6 +1013,7 @@ def __str__(self):
'msturing-1M': lambda : MSTuringANNS(1),

'msturing-10M-clustered': lambda: MSTuringClustered10M(),
'msturing-30M-clustered': lambda: MSTuringClustered30M(),

'msspacev-1B': lambda : MSSPACEV1B(1000),
'msspacev-100M': lambda : MSSPACEV1B(100),
Expand Down
14 changes: 13 additions & 1 deletion neurips23/streaming/diskann/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,16 @@ msturing-10M-clustered:
args: |
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":100, "T":16}]
[{"Ls":100, "T":16}]
msturing-30M-clustered:
diskann:
docker-tag: neurips23-streaming-diskann
module: neurips23.streaming.diskann.diskann-str
constructor: diskann
base-args: ["@metric"]
run-groups:
base:
args: |
[{"R":32, "L":30, "insert_threads":16, "consolidate_threads":16}]
query-args: |
[{"Ls":50, "T":16}]
183 changes: 183 additions & 0 deletions neurips23/streaming/final_runbook_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import argparse
import os
import numpy as np
import random
import yaml

from scipy.cluster.vq import vq, kmeans2
from typing import Tuple
from benchmark.datasets import DATASETS

def cluster_and_permute(
data, num_clusters
) -> Tuple[np.ndarray[int], np.ndarray[int]]:
"""
Cluster the data and return permutation of row indices
that would group indices of the same cluster together
"""
npts = np.shape(data)[0]
sample_size = min(100000, npts)
sample_indices = np.random.choice(range(npts), size=sample_size, replace=False)
sampled_data = data[sample_indices, :]
centroids, sample_labels = kmeans2(sampled_data, num_clusters, minit="++", iter=10)
labels, dist = vq(data, centroids)

count = np.zeros(num_clusters)
for i in range(npts):
count[labels[i]] += 1
print("Cluster counts")
print(count)

offsets = np.zeros(num_clusters + 1, dtype=int)
for i in range(0, num_clusters, 1):
offsets[i + 1] = offsets[i] + count[i]

permutation = np.zeros(npts, dtype=int)
counters = np.zeros(num_clusters, dtype=int)
for i in range(npts):
label = labels[i]
row = offsets[label] + counters[label]
counters[label] += 1
permutation[row] = i

return offsets, permutation


def write_permuated_data(
data,
permutation:np.ndarray[int],
output_data_file:str
):
permuted_data = data[permutation,:]

shape = np.shape(permuted_data)
with open(output_data_file, 'wb') as df:
df.write(shape[0].to_bytes(4, 'little'))
df.write(shape[1].to_bytes(4, 'little'))
df.write(permuted_data)


def create_runbook(
dataset_str:str,
offsets:np.ndarray[int],
permutation:np.ndarray[int],
num_clusters:int,
output_yaml_file:str
):
ins_cursor_start = offsets.copy()
ins_cursor_end = offsets.copy()

del_cursor_start = offsets.copy()
del_cursor_end = offsets.copy()

operation_list = []
num_operations = 1
active_points = 0
max_pts = 0
active_points_in_cluster = np.zeros(num_clusters)

num_rounds = 5
search_entry = [{'operation': str('search')}]

for round in range(num_rounds):
#insertions
for c in range(num_clusters):
delta = ((int)((offsets[c+1]-offsets[c])/num_rounds)
if round < num_rounds-1
else offsets[c+1]-ins_cursor_end[c])
ins_cursor_end[c] = ins_cursor_start[c] + delta
active_points += delta
max_pts = max(max_pts, active_points)
active_points_in_cluster[c] += delta
print('ins [', ins_cursor_start[c], ', ', ins_cursor_end[c],
') active:', int(active_points_in_cluster[c]),
'total:', active_points)
entry = [{'operation': 'insert'}, {'start': int(ins_cursor_start[c])}, {'end': int(ins_cursor_end[c])}]
operation_list.append((num_operations, entry))
num_operations += 1
operation_list.append((num_operations, search_entry))
num_operations += 1
ins_cursor_start[c] = ins_cursor_end[c]

#deletions
for c in range(num_clusters):
fraction = random.uniform(0,0.9)
delta = (int)(fraction*(ins_cursor_end[c]-del_cursor_start[c]))
del_cursor_end[c] = del_cursor_start[c] + delta
active_points -= delta
active_points_in_cluster[c] -= delta
print('del [', del_cursor_start[c], ',', del_cursor_end[c],
') active:', int(active_points_in_cluster[c]),
'total:', active_points)
entry = [{'operation': 'delete'}, {'start': int(del_cursor_start[c])}, {'end': int(del_cursor_end[c])}]
operation_list.append((num_operations, entry))
num_operations += 1
operation_list.append((num_operations, search_entry))
num_operations += 1
del_cursor_start[c] = del_cursor_end[c]

# #queries
# for c in range(num_clusters):
# cluster_index_range = range(offsets[c], offsets[c + 1])
# cluster_indices = np.array(permutation[cluster_index_range], dtype=np.uintc)
# print(cluster_index_range)
# entry = [{'operation': 'insert'}, {'start': int(offsets[c])}, {'end': int(offsets[c+1])}]
# operation_list.append((c+1, entry))

with open(output_yaml_file, 'w') as yf:
operation_list.sort(key = lambda x: x[0])
sorted_dict = {}
sorted_dict['max_pts'] = int(max_pts)
for (k, v) in operation_list:
sorted_dict[k]=v
yaml_object = {}
yaml_object[dataset_str] = sorted_dict
yaml.dump(yaml_object, yf)


def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument(
'--dataset',
choices=DATASETS.keys(),
required=True)
parser.add_argument(
'-c', '--num_clusters',
type=int,
required=True
)
parser.add_argument(
'-o', '--output_data_file',
required=True
)
parser.add_argument(
'-y', '--output_yaml_file',
required=True
)
args = parser.parse_args()

ds = DATASETS[args.dataset]()
if ds.nb <= 10**7:
data = ds.get_dataset()
else:
data = next(ds.get_dataset_iterator(bs=ds.nb))
print(np.shape(data))

offsets, permutation = cluster_and_permute(data, args.num_clusters)
print(permutation)

write_permuated_data(data=data,
permutation=permutation,
output_data_file=args.output_data_file)

create_runbook(dataset_str=args.dataset,
offsets=offsets,
permutation=permutation,
num_clusters=args.num_clusters,
output_yaml_file=args.output_yaml_file)


if __name__ == '__main__':
main()
14 changes: 8 additions & 6 deletions neurips23/streaming/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
search_times = []
all_results = []

data = ds.get_dataset()
ids = np.arange(1, ds.nb+1, dtype=np.uint32)
# data = ds.get_dataset()
# ids = np.arange(1, ds.nb+1, dtype=np.uint32)

Q = ds.get_queries() if not private_query else ds.get_private_queries()
print(fr"Got {Q.shape[0]} queries")
Expand All @@ -34,11 +34,13 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
result_map = {}
num_searches = 0
for step, entry in enumerate(runbook):
start = time.time()
start_time = time.time()
match entry['operation']:
case 'insert':
ids = np.arange(entry['start'], entry['end'], dtype=np.uint32)
algo.insert(data[ids,:], ids)
start = entry['start']
end = entry['end']
ids = np.arange(start, end, dtype=np.uint32)
algo.insert(ds.get_data_in_range(start, end), ids)
case 'delete':
ids = np.arange(entry['start'], entry['end'], dtype=np.uint32)
algo.delete(ids)
Expand All @@ -56,7 +58,7 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
num_searches += 1
case _:
raise NotImplementedError('Invalid runbook operation.')
step_time = (time.time() - start)
step_time = (time.time() - start_time)
print(f"Step {step+1} took {step_time}s.")

attrs = {
Expand Down

0 comments on commit a5f0970

Please sign in to comment.