diff --git a/docs/references/configurations.mdx b/docs/references/configurations.mdx index 143472d01d..38be8dfc3c 100644 --- a/docs/references/configurations.mdx +++ b/docs/references/configurations.mdx @@ -153,6 +153,18 @@ mem_index_capacity = 1048576 # Range: {"local"|"minio"} storage_type = "local" +# The number of dense vector index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +dense_index_building_worker = 2 + +# The number of sparse vector index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +sparse_index_building_worker = 2 + +# The number of fulltext index building worker threads. Defaults to the half number of CPU cores. +# Range: [1, number of CPU cores] +fulltext_index_building_worker = 2 + # Object storage configuration [storage.object_storage] # URL of the object storage server diff --git a/src/common/default_values.cppm b/src/common/default_values.cppm index 49cf3430ce..d962ab620f 100644 --- a/src/common/default_values.cppm +++ b/src/common/default_values.cppm @@ -278,6 +278,9 @@ export { constexpr std::string_view MEMINDEX_MEMORY_QUOTA_OPTION_NAME = "memindex_memory_quota"; constexpr std::string_view RESULT_CACHE_OPTION_NAME = "result_cache"; constexpr std::string_view CACHE_RESULT_CAPACITY_OPTION_NAME = "cache_result_capacity"; + constexpr std::string_view DENSE_INDEX_BUILDING_WORKER_OPTION_NAME = "dense_index_building_worker"; + constexpr std::string_view SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME = "sparse_index_building_worker"; + constexpr std::string_view FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME = "fulltext_index_building_worker"; constexpr std::string_view WAL_DIR_OPTION_NAME = "wal_dir"; constexpr std::string_view WAL_COMPACT_THRESHOLD_OPTION_NAME = "wal_compact_threshold"; diff --git a/src/executor/operator/physical_show.cpp b/src/executor/operator/physical_show.cpp index e41cc17a1b..1d65e4f884 100644 --- a/src/executor/operator/physical_show.cpp +++ b/src/executor/operator/physical_show.cpp @@ -3185,6 +3185,69 @@ void PhysicalShow::ExecuteShowConfigs(QueryContext *query_context, ShowOperatorS } } + { + { + // option name + Value value = Value::MakeVarchar(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->DenseIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Dense vector index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + + { + { + // option name + Value value = Value::MakeVarchar(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->SparseIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Sparse vector index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + + { + { + // option name + Value value = Value::MakeVarchar(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[0]); + } + { + // option name type + Value value = Value::MakeVarchar(std::to_string(global_config->FulltextIndexBuildingWorker())); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[1]); + } + { + // option name type + Value value = Value::MakeVarchar("Full-text index building worker count"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[2]); + } + } + { { // option name diff --git a/src/main/config.cpp b/src/main/config.cpp index 8f0668ba78..dd389bcfdb 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -271,7 +271,8 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default // Peer connect timeout i64 peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT; - UniquePtr peer_connect_timeout_option = MakeUnique(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0); + UniquePtr peer_connect_timeout_option = + MakeUnique(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0); status = global_options_.AddOption(std::move(peer_connect_timeout_option)); if (!status.ok()) { fmt::print("Fatal: {}", status.message()); @@ -482,6 +483,45 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default UnrecoverableError(status.message()); } + // Dense index building worker + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (dense_index_building_worker < 2) { + dense_index_building_worker = 2; + } + UniquePtr dense_index_building_worker_option = + MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, dense_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(dense_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + + // Sparse index building worker + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (sparse_index_building_worker < 2) { + sparse_index_building_worker = 2; + } + UniquePtr sparse_index_building_worker_option = + MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, sparse_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(sparse_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + + // Fulltext index building worker + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (fulltext_index_building_worker < 2) { + fulltext_index_building_worker = 2; + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, fulltext_index_building_worker, Thread::hardware_concurrency(), 1); + status = global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + if (!status.ok()) { + fmt::print("Fatal: {}", status.message()); + UnrecoverableError(status.message()); + } + // Result Cache String result_cache(DEFAULT_RESULT_CACHE); auto result_cache_option = MakeUnique(RESULT_CACHE_OPTION_NAME, result_cache); @@ -492,7 +532,8 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } i64 cache_result_num = DEFAULT_CACHE_RESULT_CAPACITY; - auto cache_result_num_option = MakeUnique(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits::max(), 0); + auto cache_result_num_option = + MakeUnique(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits::max(), 0); status = global_options_.AddOption(std::move(cache_result_num_option)); if (!status.ok()) { fmt::print("Fatal: {}", status.message()); @@ -1744,6 +1785,61 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } break; } + + case GlobalOptionIndex::kDenseIndexBuildingWorker: { + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + dense_index_building_worker = elem.second.value_or(dense_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr dense_index_building_worker_option = + MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, + dense_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!dense_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid dense vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(dense_index_building_worker_option)); + break; + } + case GlobalOptionIndex::kSparseIndexBuildingWorker: { + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + sparse_index_building_worker = elem.second.value_or(sparse_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr sparse_index_building_worker_option = + MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, + sparse_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!sparse_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid sparse vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(sparse_index_building_worker_option)); + break; + } + case GlobalOptionIndex::kFulltextIndexBuildingWorker: { + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (elem.second.is_integer()) { + fulltext_index_building_worker = elem.second.value_or(fulltext_index_building_worker); + } else { + return Status::InvalidConfig("'lru_num' field isn't integer."); + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, + fulltext_index_building_worker, + Thread::hardware_concurrency(), + 1); + if (!fulltext_index_building_worker_option->Validate()) { + return Status::InvalidConfig(fmt::format("Invalid fulltext vector index building number: {}", 0)); + } + global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + break; + } default: { return Status::InvalidConfig(fmt::format("Unrecognized config parameter: {} in 'storage' field", var_name)); } @@ -1830,6 +1926,52 @@ Status Config::Init(const SharedPtr &config_path, DefaultConfig *default } } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kDenseIndexBuildingWorker) == nullptr) { + // dense index building worker + i64 dense_index_building_worker = Thread::hardware_concurrency() / 2; + if (dense_index_building_worker < 2) { + dense_index_building_worker = 2; + } + UniquePtr dense_index_building_worker_option = MakeUnique(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, + dense_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(dense_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kSparseIndexBuildingWorker) == nullptr) { + // sparse index building worker + i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2; + if (sparse_index_building_worker < 2) { + sparse_index_building_worker = 2; + } + UniquePtr sparse_index_building_worker_option = MakeUnique(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, + sparse_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(sparse_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } + if (global_options_.GetOptionByIndex(GlobalOptionIndex::kMemIndexMemoryQuota) == nullptr) { + // fulltext index building worker + i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2; + if (fulltext_index_building_worker < 2) { + fulltext_index_building_worker = 2; + } + UniquePtr fulltext_index_building_worker_option = + MakeUnique(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, + fulltext_index_building_worker, + Thread::hardware_concurrency(), + 1); + Status status = global_options_.AddOption(std::move(fulltext_index_building_worker_option)); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + } } else { return Status::InvalidConfig("No 'storage' section in configure file."); } @@ -2543,6 +2685,21 @@ i64 Config::MemIndexCapacity() { return global_options_.GetIntegerValue(GlobalOptionIndex::kMemIndexCapacity); } +i64 Config::DenseIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kDenseIndexBuildingWorker); +} + +i64 Config::SparseIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kSparseIndexBuildingWorker); +} + +i64 Config::FulltextIndexBuildingWorker() { + std::lock_guard guard(mutex_); + return global_options_.GetIntegerValue(GlobalOptionIndex::kFulltextIndexBuildingWorker); +} + StorageType Config::StorageType() { std::lock_guard guard(mutex_); String storage_type_str = global_options_.GetStringValue(GlobalOptionIndex::kStorageType); @@ -2741,6 +2898,9 @@ void Config::PrintAll() { fmt::print(" - compact_interval: {}\n", Utility::FormatTimeInfo(CompactInterval())); fmt::print(" - optimize_index_interval: {}\n", Utility::FormatTimeInfo(OptimizeIndexInterval())); fmt::print(" - memindex_capacity: {}\n", MemIndexCapacity()); // mem index capacity is line number + fmt::print(" - dense_index_building_worker: {}\n", DenseIndexBuildingWorker()); + fmt::print(" - sparse_index_building_worker: {}\n", SparseIndexBuildingWorker()); + fmt::print(" - fulltext_index_building_worker: {}\n", FulltextIndexBuildingWorker()); fmt::print(" - storage_type: {}\n", ToString(StorageType())); switch (StorageType()) { case StorageType::kLocal: { diff --git a/src/main/config.cppm b/src/main/config.cppm index 66154f2541..4836235a69 100644 --- a/src/main/config.cppm +++ b/src/main/config.cppm @@ -102,6 +102,9 @@ public: void SetOptimizeInterval(i64); i64 MemIndexCapacity(); + i64 DenseIndexBuildingWorker(); + i64 SparseIndexBuildingWorker(); + i64 FulltextIndexBuildingWorker(); StorageType StorageType(); String ObjectStorageUrl(); diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index bc849b8c02..442c6b5243 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -239,8 +239,7 @@ Status InfinityContext::ChangeServerRole(NodeRole target_role, bool from_leader, } task_scheduler_ = MakeUnique(config_.get()); - i64 cpu_limit = config_->CPULimit(); - SetIndexThreadPool(cpu_limit); + SetIndexThreadPool(); break; } case NodeRole::kStandalone: { @@ -513,21 +512,18 @@ void InfinityContext::UnInit() { config_.reset(); } -void InfinityContext::SetIndexThreadPool(SizeT thread_num) { - thread_num = thread_num / 2; - if (thread_num < 2) - thread_num = 2; - LOG_TRACE(fmt::format("Set index thread pool size to {}", thread_num)); - inverting_thread_pool_.resize(thread_num); - commiting_thread_pool_.resize(thread_num); - hnsw_build_thread_pool_.resize(thread_num); +void InfinityContext::SetIndexThreadPool() { + LOG_TRACE("Set index thread pool."); + inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker()); + commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker()); + hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker()); } void InfinityContext::RestoreIndexThreadPoolToDefault() { - LOG_TRACE("Restore index thread pool size to default"); - inverting_thread_pool_.resize(4); - commiting_thread_pool_.resize(2); - hnsw_build_thread_pool_.resize(4); + LOG_TRACE("Restore index thread pool size to default."); + inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker()); + commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker()); + hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker()); } void InfinityContext::AddThriftServerFn(std::function start_func, std::function stop_func) { diff --git a/src/main/infinity_context.cppm b/src/main/infinity_context.cppm index 136cc6b69d..fd0e60722e 100644 --- a/src/main/infinity_context.cppm +++ b/src/main/infinity_context.cppm @@ -64,7 +64,7 @@ public: void UnInit(); - void SetIndexThreadPool(SizeT thread_num); + void SetIndexThreadPool(); void RestoreIndexThreadPoolToDefault(); void AddThriftServerFn(std::function start_func, std::function stop_func); @@ -90,11 +90,11 @@ private: atomic_bool infinity_context_inited_{false}; // For fulltext index - ThreadPool inverting_thread_pool_{4}; + ThreadPool inverting_thread_pool_{2}; ThreadPool commiting_thread_pool_{2}; // For hnsw index - ThreadPool hnsw_build_thread_pool_{4}; + ThreadPool hnsw_build_thread_pool_{2}; mutable std::mutex mutex_; diff --git a/src/main/options.cpp b/src/main/options.cpp index d5b06c838c..113dc59ae9 100644 --- a/src/main/options.cpp +++ b/src/main/options.cpp @@ -78,6 +78,10 @@ GlobalOptions::GlobalOptions() { name2index_[String(TEMP_DIR_OPTION_NAME)] = GlobalOptionIndex::kTempDir; name2index_[String(MEMINDEX_MEMORY_QUOTA_OPTION_NAME)] = GlobalOptionIndex::kMemIndexMemoryQuota; + name2index_[String(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kDenseIndexBuildingWorker; + name2index_[String(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kSparseIndexBuildingWorker; + name2index_[String(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME)] = GlobalOptionIndex::kFulltextIndexBuildingWorker; + name2index_[String(RESULT_CACHE_OPTION_NAME)] = GlobalOptionIndex::kResultCache; name2index_[String(CACHE_RESULT_CAPACITY_OPTION_NAME)] = GlobalOptionIndex::kCacheResultCapacity; diff --git a/src/main/options.cppm b/src/main/options.cppm index 33b78f3a64..2ac15c9f45 100644 --- a/src/main/options.cppm +++ b/src/main/options.cppm @@ -165,8 +165,10 @@ export enum class GlobalOptionIndex : i8 { kPeerConnectTimeout = 49, kPeerRecvTimeout = 50, kPeerSendTimeout = 51, - - kInvalid = 52, + kDenseIndexBuildingWorker = 53, + kSparseIndexBuildingWorker = 54, + kFulltextIndexBuildingWorker = 55, + kInvalid = 57, }; export struct GlobalOptions {