From fb97191e2fa95b3fe5d9d4a4b8fa05ead1b7e2ae Mon Sep 17 00:00:00 2001 From: kira Date: Tue, 16 Jan 2024 15:39:50 +0800 Subject: [PATCH] zilliz streaming --- .github/workflows/neurips23.yml | 3 + neurips23/streaming/zilliz/Dockerfile | 11 +++ neurips23/streaming/zilliz/config.yaml | 60 +++++++++++++ neurips23/streaming/zilliz/zilliz.py | 115 +++++++++++++++++++++++++ 4 files changed, 189 insertions(+) create mode 100644 neurips23/streaming/zilliz/Dockerfile create mode 100644 neurips23/streaming/zilliz/config.yaml create mode 100755 neurips23/streaming/zilliz/zilliz.py diff --git a/.github/workflows/neurips23.yml b/.github/workflows/neurips23.yml index c69ccede1..7e58e0f90 100644 --- a/.github/workflows/neurips23.yml +++ b/.github/workflows/neurips23.yml @@ -105,6 +105,9 @@ jobs: - algorithm: pinecone dataset: random-filter-s track: filter + - algorithm: zilliz + dataset: random-xs + track: streaming fail-fast: false steps: diff --git a/neurips23/streaming/zilliz/Dockerfile b/neurips23/streaming/zilliz/Dockerfile new file mode 100644 index 000000000..901cd6745 --- /dev/null +++ b/neurips23/streaming/zilliz/Dockerfile @@ -0,0 +1,11 @@ +FROM neurips23 + +RUN apt update +RUN apt install -y software-properties-common +RUN add-apt-repository -y ppa:git-core/ppa +RUN apt update +RUN DEBIAN_FRONTEND=noninteractive apt install -y git make cmake g++ libaio-dev libgoogle-perftools-dev libunwind-dev clang-format libboost-dev libboost-program-options-dev libmkl-full-dev libcpprest-dev python3.10 + +RUN git clone https://github.com/hhy3/zilliz-bigann.git --branch streaming +RUN pip install ./zilliz-bigann/*.whl + diff --git a/neurips23/streaming/zilliz/config.yaml b/neurips23/streaming/zilliz/config.yaml new file mode 100644 index 000000000..f3d2d777a --- /dev/null +++ b/neurips23/streaming/zilliz/config.yaml @@ -0,0 +1,60 @@ +random-xs-clustered: + zilliz: + docker-tag: neurips23-streaming-zilliz + module: neurips23.streaming.zilliz.zilliz + constructor: Zilliz + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":32, "L":100, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":200, "T":8}] + +random-xs: + zilliz: + docker-tag: neurips23-streaming-zilliz + module: neurips23.streaming.zilliz.zilliz + constructor: Zilliz + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16}] + query-args: | + [{"Ls":50, "T":8}] + +msturing-10M-clustered: + zilliz: + docker-tag: neurips23-streaming-zilliz + module: neurips23.streaming.zilliz.zilliz + constructor: Zilliz + base-args: ["@metric"] + run-groups: + base: + args: | + [{"R":16, "L":10, "insert_threads":8, "consolidate_threads":8}] + query-args: | + [ + {"Ls":100, "T":8} + ] + +msturing-30M-clustered: + zilliz: + docker-tag: neurips23-streaming-zilliz + module: neurips23.streaming.zilliz.zilliz + constructor: Zilliz + base-args: ["@metric"] + run-groups: + base: + args: | + [ + {"R":32, "L":110, "insert_threads":8, "consolidate_threads":8} + ] + query-args: | + [ + {"Ls":400, "T":8}, + {"Ls":450, "T":8}, + {"Ls":500, "T":8}, + {"Ls":550, "T":8} + ] diff --git a/neurips23/streaming/zilliz/zilliz.py b/neurips23/streaming/zilliz/zilliz.py new file mode 100755 index 000000000..1b0eb079e --- /dev/null +++ b/neurips23/streaming/zilliz/zilliz.py @@ -0,0 +1,115 @@ +from __future__ import absolute_import +import psutil +import os +import time +import numpy as np + +import diskannpy +import fast_refine + +from neurips23.streaming.base import BaseStreamingANN + +class Zilliz(BaseStreamingANN): + def __init__(self, metric, index_params): + self.name = "pyanns" + if (index_params.get("R")==None): + print("Error: missing parameter R") + return + if (index_params.get("L")==None): + print("Error: missing parameter L") + return + self._index_params = index_params + self._metric = metric + + self.R = index_params.get("R") + self.L = index_params.get("L") + self.insert_threads = index_params.get("insert_threads") + self.consolidate_threads = index_params.get("consolidate_threads") + self.mx = None + self.mi = None + + def index_name(self): + return f"R{self.R}_L{self.L}" + + def create_index_dir(self, dataset): + index_dir = os.path.join(os.getcwd(), "data", "indices", "streaming") + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, 'pyanns') + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, dataset.short_name()) + os.makedirs(index_dir, mode=0o777, exist_ok=True) + index_dir = os.path.join(index_dir, self.index_name()) + os.makedirs(index_dir, mode=0o777, exist_ok=True) + return index_dir + + def translate_dist_fn(self, metric): + if metric == 'euclidean': + return 'l2' + elif metric == 'ip': + return 'mips' + else: + raise Exception('Invalid metric') + + def translate_dtype(self, dtype:str): + return np.uint8 + + def setup(self, dtype, max_pts, ndim): + self.index = diskannpy.DynamicMemoryIndex( + distance_metric = self.translate_dist_fn(self._metric), + vector_dtype = self.translate_dtype(dtype), + max_vectors = max_pts, + dimensions = ndim, + graph_degree = self.R, + complexity=self.L, + num_threads = self.insert_threads, #to allocate scratch space for up to 64 search threads + initial_search_complexity = 100 + ) + self.refiner = fast_refine.Refiner(ndim, max_pts) + self.max_pts = max_pts + print('Index class constructed and ready for update/search') + self.active_indices = set() + self.num_unprocessed_deletes = 0 + + def quant(self, X, mi, mx): + return np.round(np.clip((X - mi) / (mx - mi) * 127.0, 0.0, 127.0)).astype('uint8') + + def insert(self, X, ids): + if self.mi is None: + self.mi = X.min() + self.mx = X.max() + + self.refiner.batch_insert(X, ids) + X = self.quant(X, self.mi, self.mx) + self.active_indices.update(ids+1) + print('#active pts', len(self.active_indices), '#unprocessed deletes', self.num_unprocessed_deletes) + if len(self.active_indices) + self.num_unprocessed_deletes > self.max_pts: + self.index.consolidate_delete() + self.num_unprocessed_deletes = 0 + + self.index.batch_insert(X, ids+1) + + def delete(self, ids): + self.refiner.batch_delete(ids) + for id in ids: + self.index.mark_deleted(id+1) + self.active_indices.difference_update(ids+1) + self.num_unprocessed_deletes += len(ids) + + def query(self, X, k): + """Carry out a batch query for k-NN of query set X.""" + nq, d = X.shape + Xq = self.quant(X, self.mi, self.mx) + k_mul = 5 + k_reorder = k * k_mul + I, _ = self.index.batch_search( + Xq, k_reorder, self.Ls, self.search_threads) + I = I - 1 + self.res = self.refiner.batch_refine(X, I, k).reshape(nq, k) + + def set_query_arguments(self, query_args): + self._query_args = query_args + self.Ls = 0 if query_args.get("Ls") == None else query_args.get("Ls") + self.search_threads = self._query_args.get("T") + + def __str__(self): + return f'zilliz({self.index_name(), self._query_args})'