From 2ac540e4b49374348c0b8114e2b791daeabe067c Mon Sep 17 00:00:00 2001
From: liliu-z <105927039+liliu-z@users.noreply.github.com>
Date: Tue, 29 Nov 2022 16:49:11 +0800
Subject: [PATCH] Add Thread Pool for all indexes in Knowhere (#570)
Signed-off-by: liliu-z
Signed-off-by: liliu-z
---
knowhere/index/VecIndexFactory.cpp | 13 +-
knowhere/index/VecIndexThreadPoolWrapper.h | 121 ++++++++++++++++++
knowhere/index/vector_index/IndexAnnoy.cpp | 43 ++++---
knowhere/index/vector_index/IndexAnnoy.h | 4 +-
.../index/vector_index/IndexBinaryIDMAP.cpp | 10 --
.../index/vector_index/IndexBinaryIDMAP.h | 3 -
knowhere/index/vector_index/IndexIDMAP.cpp | 10 --
knowhere/index/vector_index/IndexIDMAP.h | 3 -
.../index/vector_index/gpu/IndexGPUIDMAP.cpp | 5 -
.../index/vector_index/gpu/IndexGPUIDMAP.h | 3 -
unittest/test_binaryidmap.cpp | 1 -
unittest/test_idmap.cpp | 5 -
12 files changed, 156 insertions(+), 65 deletions(-)
create mode 100644 knowhere/index/VecIndexThreadPoolWrapper.h
diff --git a/knowhere/index/VecIndexFactory.cpp b/knowhere/index/VecIndexFactory.cpp
index d238f1626..b59e8fd55 100644
--- a/knowhere/index/VecIndexFactory.cpp
+++ b/knowhere/index/VecIndexFactory.cpp
@@ -15,6 +15,7 @@
#include "common/Exception.h"
#include "common/Log.h"
+#include "index/VecIndexThreadPoolWrapper.h"
#include "index/vector_index/IndexAnnoy.h"
#include "index/vector_index/IndexBinaryIDMAP.h"
#include "index/vector_index/IndexBinaryIVF.h"
@@ -42,17 +43,17 @@ VecIndexFactory::CreateVecIndex(const IndexType& type, const IndexMode mode) {
switch (mode) {
case IndexMode::MODE_CPU: {
if (type == IndexEnum::INDEX_FAISS_BIN_IDMAP) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_FAISS_BIN_IVFFLAT) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_FAISS_IDMAP) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_FAISS_IVFFLAT) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_FAISS_IVFPQ) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_FAISS_IVFSQ8) {
- return std::make_shared();
+ return std::make_shared(std::make_unique());
} else if (type == IndexEnum::INDEX_ANNOY) {
return std::make_shared();
} else if (type == IndexEnum::INDEX_HNSW) {
diff --git a/knowhere/index/VecIndexThreadPoolWrapper.h b/knowhere/index/VecIndexThreadPoolWrapper.h
new file mode 100644
index 000000000..faa78e3a6
--- /dev/null
+++ b/knowhere/index/VecIndexThreadPoolWrapper.h
@@ -0,0 +1,121 @@
+// Copyright (C) 2019-2020 Zilliz. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software distributed under the License
+// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+// or implied. See the License for the specific language governing permissions and limitations under the License
+
+#pragma once
+
+#include
+#include
+
+#include "knowhere/common/ThreadPool.h"
+#include "knowhere/index/VecIndex.h"
+
+namespace knowhere {
+
+/**
+ * @brief This class is a Wrapper for VecIndex, it will use a global thread pool for all Query and RangeQuery API calls.
+ *
+ */
+class VecIndexThreadPoolWrapper : public VecIndex {
+ public:
+ explicit VecIndexThreadPoolWrapper(std::unique_ptr index)
+ : VecIndexThreadPoolWrapper(std::move(index), ThreadPool::GetGlobalThreadPool()) {
+ }
+
+ explicit VecIndexThreadPoolWrapper(std::unique_ptr index, std::shared_ptr thread_pool)
+ : index_(std::move(index)), thread_pool_(thread_pool) {
+ }
+
+ BinarySet
+ Serialize(const Config& config) override {
+ return index_->Serialize(config);
+ }
+
+ void
+ Load(const BinarySet& index_binary) override {
+ index_->Load(index_binary);
+ }
+
+ void
+ Train(const DatasetPtr& dataset, const Config& config) override {
+ index_->Train(dataset, config);
+ }
+
+ void
+ AddWithoutIds(const DatasetPtr& dataset, const Config& config) override {
+ index_->AddWithoutIds(dataset, config);
+ }
+
+ bool
+ Prepare(const Config& config) override {
+ return index_->Prepare(config);
+ }
+
+ DatasetPtr
+ GetVectorById(const DatasetPtr& dataset, const Config& config) override {
+ return index_->GetVectorById(dataset, config);
+ }
+
+ DatasetPtr
+ Query(const DatasetPtr& dataset, const Config& config, const faiss::BitsetView bitset) override {
+ return thread_pool_->push([&]() { return this->index_->Query(dataset, config, bitset); }).get();
+ }
+
+ DatasetPtr
+ QueryByRange(const DatasetPtr& dataset, const Config& config, const faiss::BitsetView bitset) override {
+ return thread_pool_->push([&]() { return this->index_->QueryByRange(dataset, config, bitset); }).get();
+ }
+
+ DatasetPtr
+ GetIndexMeta(const Config& config) override {
+ return index_->GetIndexMeta(config);
+ }
+
+ int64_t
+ Size() override {
+ return index_->Size();
+ }
+
+ int64_t
+ Dim() override {
+ return index_->Dim();
+ }
+
+ int64_t
+ Count() override {
+ return index_->Count();
+ }
+
+ StatisticsPtr
+ GetStatistics() override {
+ return index_->GetStatistics();
+ }
+
+ void
+ ClearStatistics() override {
+ index_->ClearStatistics();
+ }
+
+ IndexType
+ index_type() const override {
+ return index_->index_type();
+ }
+
+ IndexMode
+ index_mode() const override {
+ return index_->index_mode();
+ }
+
+ private:
+ std::unique_ptr index_;
+ std::shared_ptr thread_pool_;
+};
+
+} // namespace knowhere
diff --git a/knowhere/index/vector_index/IndexAnnoy.cpp b/knowhere/index/vector_index/IndexAnnoy.cpp
index adb9600d8..c1fe5ed09 100644
--- a/knowhere/index/vector_index/IndexAnnoy.cpp
+++ b/knowhere/index/vector_index/IndexAnnoy.cpp
@@ -143,25 +143,32 @@ IndexAnnoy::Query(const DatasetPtr& dataset_ptr, const Config& config, const fai
auto p_id = new int64_t[k * rows];
auto p_dist = new float[k * rows];
-#pragma omp parallel for
+ std::vector> futures;
+ futures.reserve(rows);
for (unsigned int i = 0; i < rows; ++i) {
- std::vector result;
- result.reserve(k);
- std::vector distances;
- distances.reserve(k);
- index_->get_nns_by_vector(static_cast(p_data) + i * dim, k, search_k, &result, &distances,
- bitset);
-
- size_t result_num = result.size();
- auto local_p_id = p_id + k * i;
- auto local_p_dist = p_dist + k * i;
- memcpy(local_p_id, result.data(), result_num * sizeof(int64_t));
- memcpy(local_p_dist, distances.data(), result_num * sizeof(float));
-
- for (; result_num < k; result_num++) {
- local_p_id[result_num] = -1;
- local_p_dist[result_num] = 1.0 / 0.0;
- }
+ futures.push_back(pool_->push([&, index = i]() {
+ std::vector result;
+ result.reserve(k);
+ std::vector distances;
+ distances.reserve(k);
+ index_->get_nns_by_vector(static_cast(p_data) + index * dim, k, search_k, &result, &distances,
+ bitset);
+
+ size_t result_num = result.size();
+ auto local_p_id = p_id + k * index;
+ auto local_p_dist = p_dist + k * index;
+ memcpy(local_p_id, result.data(), result_num * sizeof(int64_t));
+ memcpy(local_p_dist, distances.data(), result_num * sizeof(float));
+
+ for (; result_num < k; result_num++) {
+ local_p_id[result_num] = -1;
+ local_p_dist[result_num] = 1.0 / 0.0;
+ }
+ }));
+ }
+
+ for (auto& future : futures) {
+ future.get();
}
return GenResultDataset(p_id, p_dist);
diff --git a/knowhere/index/vector_index/IndexAnnoy.h b/knowhere/index/vector_index/IndexAnnoy.h
index 2c61ac537..23c76ae19 100644
--- a/knowhere/index/vector_index/IndexAnnoy.h
+++ b/knowhere/index/vector_index/IndexAnnoy.h
@@ -16,8 +16,8 @@
#include "annoy/src/annoylib.h"
#include "annoy/src/kissrandom.h"
-
#include "knowhere/common/Exception.h"
+#include "knowhere/common/ThreadPool.h"
#include "knowhere/index/VecIndex.h"
namespace knowhere {
@@ -28,6 +28,7 @@ class IndexAnnoy : public VecIndex {
public:
IndexAnnoy() {
index_type_ = IndexEnum::INDEX_ANNOY;
+ pool_ = ThreadPool::GetGlobalThreadPool();
}
BinarySet
@@ -66,6 +67,7 @@ class IndexAnnoy : public VecIndex {
private:
std::string metric_type_;
+ std::shared_ptr pool_;
std::shared_ptr> index_ = nullptr;
};
diff --git a/knowhere/index/vector_index/IndexBinaryIDMAP.cpp b/knowhere/index/vector_index/IndexBinaryIDMAP.cpp
index 3bfb12d6b..47e880dc7 100644
--- a/knowhere/index/vector_index/IndexBinaryIDMAP.cpp
+++ b/knowhere/index/vector_index/IndexBinaryIDMAP.cpp
@@ -180,16 +180,6 @@ BinaryIDMAP::Train(const DatasetPtr& dataset_ptr, const Config& config) {
index_ = index;
}
-const uint8_t*
-BinaryIDMAP::GetRawVectors() {
- try {
- auto flat_index = dynamic_cast(index_.get());
- return flat_index->xb.data();
- } catch (std::exception& e) {
- KNOWHERE_THROW_MSG(e.what());
- }
-}
-
void
BinaryIDMAP::QueryImpl(int64_t n,
const uint8_t* data,
diff --git a/knowhere/index/vector_index/IndexBinaryIDMAP.h b/knowhere/index/vector_index/IndexBinaryIDMAP.h
index e099f9247..0dd97fb04 100644
--- a/knowhere/index/vector_index/IndexBinaryIDMAP.h
+++ b/knowhere/index/vector_index/IndexBinaryIDMAP.h
@@ -62,9 +62,6 @@ class BinaryIDMAP : public VecIndex, public FaissBaseBinaryIndex {
return Count() * Dim() / 8;
}
- virtual const uint8_t*
- GetRawVectors();
-
protected:
virtual void
QueryImpl(int64_t n,
diff --git a/knowhere/index/vector_index/IndexIDMAP.cpp b/knowhere/index/vector_index/IndexIDMAP.cpp
index ff3ff3a30..b654fd5fa 100644
--- a/knowhere/index/vector_index/IndexIDMAP.cpp
+++ b/knowhere/index/vector_index/IndexIDMAP.cpp
@@ -211,16 +211,6 @@ IDMAP::CopyCpuToGpu(const int64_t device_id, const Config& config) {
#endif
}
-const float*
-IDMAP::GetRawVectors() {
- try {
- auto flat_index = dynamic_cast(index_.get());
- return reinterpret_cast(flat_index->codes.data());
- } catch (std::exception& e) {
- KNOWHERE_THROW_MSG(e.what());
- }
-}
-
void
IDMAP::QueryImpl(int64_t n,
const float* data,
diff --git a/knowhere/index/vector_index/IndexIDMAP.h b/knowhere/index/vector_index/IndexIDMAP.h
index 3a5546fec..18cca27ce 100644
--- a/knowhere/index/vector_index/IndexIDMAP.h
+++ b/knowhere/index/vector_index/IndexIDMAP.h
@@ -65,9 +65,6 @@ class IDMAP : public VecIndex, public FaissBaseIndex {
VecIndexPtr
CopyCpuToGpu(const int64_t, const Config&);
- virtual const float*
- GetRawVectors();
-
protected:
virtual void
QueryImpl(int64_t, const float*, int64_t, float*, int64_t*, const Config&, const faiss::BitsetView);
diff --git a/knowhere/index/vector_index/gpu/IndexGPUIDMAP.cpp b/knowhere/index/vector_index/gpu/IndexGPUIDMAP.cpp
index 17f6e5d4e..dac2ff551 100644
--- a/knowhere/index/vector_index/gpu/IndexGPUIDMAP.cpp
+++ b/knowhere/index/vector_index/gpu/IndexGPUIDMAP.cpp
@@ -88,11 +88,6 @@ GPUIDMAP::CopyGpuToGpu(const int64_t device_id, const Config& config) {
return std::static_pointer_cast(cpu_index)->CopyCpuToGpu(device_id, config);
}
-const float*
-GPUIDMAP::GetRawVectors() {
- KNOWHERE_THROW_MSG("Not support");
-}
-
void
GPUIDMAP::QueryImpl(int64_t n,
const float* data,
diff --git a/knowhere/index/vector_index/gpu/IndexGPUIDMAP.h b/knowhere/index/vector_index/gpu/IndexGPUIDMAP.h
index 4f9f14cb8..aebbf7606 100644
--- a/knowhere/index/vector_index/gpu/IndexGPUIDMAP.h
+++ b/knowhere/index/vector_index/gpu/IndexGPUIDMAP.h
@@ -39,9 +39,6 @@ class GPUIDMAP : public IDMAP, public GPUIndex {
VecIndexPtr
CopyGpuToGpu(const int64_t, const Config&) override;
- const float*
- GetRawVectors() override;
-
void
GenGraph(const float*, const int64_t, GraphType&, const Config&);
diff --git a/unittest/test_binaryidmap.cpp b/unittest/test_binaryidmap.cpp
index 8906cad16..09dd59af1 100644
--- a/unittest/test_binaryidmap.cpp
+++ b/unittest/test_binaryidmap.cpp
@@ -70,7 +70,6 @@ TEST_P(BinaryIDMAPTest, binaryidmap_basic) {
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
ASSERT_GT(index_->Size(), 0);
- ASSERT_TRUE(std::static_pointer_cast(index_)->GetRawVectors() != nullptr);
auto result = index_->GetVectorById(id_dataset, conf_);
AssertBinVec(result, base_dataset, id_dataset, nq, dim);
diff --git a/unittest/test_idmap.cpp b/unittest/test_idmap.cpp
index f4a04541a..1bcf7e6a1 100644
--- a/unittest/test_idmap.cpp
+++ b/unittest/test_idmap.cpp
@@ -88,7 +88,6 @@ TEST_P(IDMAPTest, idmap_basic) {
index_->BuildAll(base_dataset, conf_);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
- ASSERT_TRUE(index_->GetRawVectors() != nullptr);
ASSERT_GT(index_->Size(), 0);
auto result = index_->GetVectorById(id_dataset, conf_);
@@ -279,7 +278,6 @@ TEST_P(IDMAPTest, idmap_copy) {
index_->BuildAll(base_dataset, conf_);
EXPECT_EQ(index_->Count(), nb);
EXPECT_EQ(index_->Dim(), dim);
- ASSERT_TRUE(index_->GetRawVectors() != nullptr);
auto result = index_->Query(query_dataset, conf_, nullptr);
AssertAnns(result, nq, k);
// PrintResult(result, nq, k);
@@ -295,8 +293,6 @@ TEST_P(IDMAPTest, idmap_copy) {
auto clone_result = clone_index->Query(query_dataset, conf_, nullptr);
AssertAnns(clone_result, nq, k);
- ASSERT_THROW({ std::static_pointer_cast(clone_index)->GetRawVectors(); },
- knowhere::KnowhereException);
auto binary = clone_index->Serialize(conf_);
clone_index->Load(binary);
@@ -311,7 +307,6 @@ TEST_P(IDMAPTest, idmap_copy) {
auto host_index = knowhere::cloner::CopyGpuToCpu(clone_index, conf_);
auto host_result = host_index->Query(query_dataset, conf_, nullptr);
AssertAnns(host_result, nq, k);
- ASSERT_TRUE(std::static_pointer_cast(host_index)->GetRawVectors() != nullptr);
// gpu to gpu
auto device_index = knowhere::cloner::CopyCpuToGpu(index_, DEVICE_ID, conf_);