Skip to content

Commit

Permalink
Add new configs (#2359)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add 3 new config parameter for dense/sparse/fulltext index building

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Documentation Update

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Dec 12, 2024
1 parent 15258ad commit 1e14308
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 21 deletions.
12 changes: 12 additions & 0 deletions docs/references/configurations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ mem_index_capacity = 1048576
# Range: {"local"|"minio"}
storage_type = "local"

# The number of dense vector index building worker threads. Defaults to the half number of CPU cores.
# Range: [1, number of CPU cores]
dense_index_building_worker = 2

# The number of sparse vector index building worker threads. Defaults to the half number of CPU cores.
# Range: [1, number of CPU cores]
sparse_index_building_worker = 2

# The number of fulltext index building worker threads. Defaults to the half number of CPU cores.
# Range: [1, number of CPU cores]
fulltext_index_building_worker = 2

# Object storage configuration
[storage.object_storage]
# URL of the object storage server
Expand Down
3 changes: 3 additions & 0 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ export {
constexpr std::string_view MEMINDEX_MEMORY_QUOTA_OPTION_NAME = "memindex_memory_quota";
constexpr std::string_view RESULT_CACHE_OPTION_NAME = "result_cache";
constexpr std::string_view CACHE_RESULT_CAPACITY_OPTION_NAME = "cache_result_capacity";
constexpr std::string_view DENSE_INDEX_BUILDING_WORKER_OPTION_NAME = "dense_index_building_worker";
constexpr std::string_view SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME = "sparse_index_building_worker";
constexpr std::string_view FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME = "fulltext_index_building_worker";

constexpr std::string_view WAL_DIR_OPTION_NAME = "wal_dir";
constexpr std::string_view WAL_COMPACT_THRESHOLD_OPTION_NAME = "wal_compact_threshold";
Expand Down
63 changes: 63 additions & 0 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3185,6 +3185,69 @@ void PhysicalShow::ExecuteShowConfigs(QueryContext *query_context, ShowOperatorS
}
}

{
{
// option name
Value value = Value::MakeVarchar(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(std::to_string(global_config->DenseIndexBuildingWorker()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Dense vector index building worker count");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}

{
{
// option name
Value value = Value::MakeVarchar(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(std::to_string(global_config->SparseIndexBuildingWorker()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Sparse vector index building worker count");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}

{
{
// option name
Value value = Value::MakeVarchar(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(std::to_string(global_config->FulltextIndexBuildingWorker()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Full-text index building worker count");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}

{
{
// option name
Expand Down
164 changes: 162 additions & 2 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default

// Peer connect timeout
i64 peer_connect_timeout = DEFAULT_PEER_CONNECT_TIMEOUT;
UniquePtr<IntegerOption> peer_connect_timeout_option = MakeUnique<IntegerOption>(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0);
UniquePtr<IntegerOption> peer_connect_timeout_option =
MakeUnique<IntegerOption>(PEER_CONNECT_TIMEOUT_OPTION_NAME, peer_connect_timeout, 10000, 0);
status = global_options_.AddOption(std::move(peer_connect_timeout_option));
if (!status.ok()) {
fmt::print("Fatal: {}", status.message());
Expand Down Expand Up @@ -482,6 +483,45 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
UnrecoverableError(status.message());
}

// Dense index building worker
i64 dense_index_building_worker = Thread::hardware_concurrency() / 2;
if (dense_index_building_worker < 2) {
dense_index_building_worker = 2;
}
UniquePtr<IntegerOption> dense_index_building_worker_option =
MakeUnique<IntegerOption>(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME, dense_index_building_worker, Thread::hardware_concurrency(), 1);
status = global_options_.AddOption(std::move(dense_index_building_worker_option));
if (!status.ok()) {
fmt::print("Fatal: {}", status.message());
UnrecoverableError(status.message());
}

// Sparse index building worker
i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2;
if (sparse_index_building_worker < 2) {
sparse_index_building_worker = 2;
}
UniquePtr<IntegerOption> sparse_index_building_worker_option =
MakeUnique<IntegerOption>(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME, sparse_index_building_worker, Thread::hardware_concurrency(), 1);
status = global_options_.AddOption(std::move(sparse_index_building_worker_option));
if (!status.ok()) {
fmt::print("Fatal: {}", status.message());
UnrecoverableError(status.message());
}

// Fulltext index building worker
i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2;
if (fulltext_index_building_worker < 2) {
fulltext_index_building_worker = 2;
}
UniquePtr<IntegerOption> fulltext_index_building_worker_option =
MakeUnique<IntegerOption>(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME, fulltext_index_building_worker, Thread::hardware_concurrency(), 1);
status = global_options_.AddOption(std::move(fulltext_index_building_worker_option));
if (!status.ok()) {
fmt::print("Fatal: {}", status.message());
UnrecoverableError(status.message());
}

// Result Cache
String result_cache(DEFAULT_RESULT_CACHE);
auto result_cache_option = MakeUnique<StringOption>(RESULT_CACHE_OPTION_NAME, result_cache);
Expand All @@ -492,7 +532,8 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
}

i64 cache_result_num = DEFAULT_CACHE_RESULT_CAPACITY;
auto cache_result_num_option = MakeUnique<IntegerOption>(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits<i64>::max(), 0);
auto cache_result_num_option =
MakeUnique<IntegerOption>(CACHE_RESULT_CAPACITY_OPTION_NAME, cache_result_num, std::numeric_limits<i64>::max(), 0);
status = global_options_.AddOption(std::move(cache_result_num_option));
if (!status.ok()) {
fmt::print("Fatal: {}", status.message());
Expand Down Expand Up @@ -1744,6 +1785,61 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
}
break;
}

case GlobalOptionIndex::kDenseIndexBuildingWorker: {
i64 dense_index_building_worker = Thread::hardware_concurrency() / 2;
if (elem.second.is_integer()) {
dense_index_building_worker = elem.second.value_or(dense_index_building_worker);
} else {
return Status::InvalidConfig("'lru_num' field isn't integer.");
}
UniquePtr<IntegerOption> dense_index_building_worker_option =
MakeUnique<IntegerOption>(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME,
dense_index_building_worker,
Thread::hardware_concurrency(),
1);
if (!dense_index_building_worker_option->Validate()) {
return Status::InvalidConfig(fmt::format("Invalid dense vector index building number: {}", 0));
}
global_options_.AddOption(std::move(dense_index_building_worker_option));
break;
}
case GlobalOptionIndex::kSparseIndexBuildingWorker: {
i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2;
if (elem.second.is_integer()) {
sparse_index_building_worker = elem.second.value_or(sparse_index_building_worker);
} else {
return Status::InvalidConfig("'lru_num' field isn't integer.");
}
UniquePtr<IntegerOption> sparse_index_building_worker_option =
MakeUnique<IntegerOption>(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME,
sparse_index_building_worker,
Thread::hardware_concurrency(),
1);
if (!sparse_index_building_worker_option->Validate()) {
return Status::InvalidConfig(fmt::format("Invalid sparse vector index building number: {}", 0));
}
global_options_.AddOption(std::move(sparse_index_building_worker_option));
break;
}
case GlobalOptionIndex::kFulltextIndexBuildingWorker: {
i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2;
if (elem.second.is_integer()) {
fulltext_index_building_worker = elem.second.value_or(fulltext_index_building_worker);
} else {
return Status::InvalidConfig("'lru_num' field isn't integer.");
}
UniquePtr<IntegerOption> fulltext_index_building_worker_option =
MakeUnique<IntegerOption>(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME,
fulltext_index_building_worker,
Thread::hardware_concurrency(),
1);
if (!fulltext_index_building_worker_option->Validate()) {
return Status::InvalidConfig(fmt::format("Invalid fulltext vector index building number: {}", 0));
}
global_options_.AddOption(std::move(fulltext_index_building_worker_option));
break;
}
default: {
return Status::InvalidConfig(fmt::format("Unrecognized config parameter: {} in 'storage' field", var_name));
}
Expand Down Expand Up @@ -1830,6 +1926,52 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig *default
}
}

if (global_options_.GetOptionByIndex(GlobalOptionIndex::kDenseIndexBuildingWorker) == nullptr) {
// dense index building worker
i64 dense_index_building_worker = Thread::hardware_concurrency() / 2;
if (dense_index_building_worker < 2) {
dense_index_building_worker = 2;
}
UniquePtr<IntegerOption> dense_index_building_worker_option = MakeUnique<IntegerOption>(DENSE_INDEX_BUILDING_WORKER_OPTION_NAME,
dense_index_building_worker,
Thread::hardware_concurrency(),
1);
Status status = global_options_.AddOption(std::move(dense_index_building_worker_option));
if (!status.ok()) {
UnrecoverableError(status.message());
}
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kSparseIndexBuildingWorker) == nullptr) {
// sparse index building worker
i64 sparse_index_building_worker = Thread::hardware_concurrency() / 2;
if (sparse_index_building_worker < 2) {
sparse_index_building_worker = 2;
}
UniquePtr<IntegerOption> sparse_index_building_worker_option = MakeUnique<IntegerOption>(SPARSE_INDEX_BUILDING_WORKER_OPTION_NAME,
sparse_index_building_worker,
Thread::hardware_concurrency(),
1);
Status status = global_options_.AddOption(std::move(sparse_index_building_worker_option));
if (!status.ok()) {
UnrecoverableError(status.message());
}
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kMemIndexMemoryQuota) == nullptr) {
// fulltext index building worker
i64 fulltext_index_building_worker = Thread::hardware_concurrency() / 2;
if (fulltext_index_building_worker < 2) {
fulltext_index_building_worker = 2;
}
UniquePtr<IntegerOption> fulltext_index_building_worker_option =
MakeUnique<IntegerOption>(FULLTEXT_INDEX_BUILDING_WORKER_OPTION_NAME,
fulltext_index_building_worker,
Thread::hardware_concurrency(),
1);
Status status = global_options_.AddOption(std::move(fulltext_index_building_worker_option));
if (!status.ok()) {
UnrecoverableError(status.message());
}
}
} else {
return Status::InvalidConfig("No 'storage' section in configure file.");
}
Expand Down Expand Up @@ -2543,6 +2685,21 @@ i64 Config::MemIndexCapacity() {
return global_options_.GetIntegerValue(GlobalOptionIndex::kMemIndexCapacity);
}

i64 Config::DenseIndexBuildingWorker() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetIntegerValue(GlobalOptionIndex::kDenseIndexBuildingWorker);
}

i64 Config::SparseIndexBuildingWorker() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetIntegerValue(GlobalOptionIndex::kSparseIndexBuildingWorker);
}

i64 Config::FulltextIndexBuildingWorker() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetIntegerValue(GlobalOptionIndex::kFulltextIndexBuildingWorker);
}

StorageType Config::StorageType() {
std::lock_guard<std::mutex> guard(mutex_);
String storage_type_str = global_options_.GetStringValue(GlobalOptionIndex::kStorageType);
Expand Down Expand Up @@ -2741,6 +2898,9 @@ void Config::PrintAll() {
fmt::print(" - compact_interval: {}\n", Utility::FormatTimeInfo(CompactInterval()));
fmt::print(" - optimize_index_interval: {}\n", Utility::FormatTimeInfo(OptimizeIndexInterval()));
fmt::print(" - memindex_capacity: {}\n", MemIndexCapacity()); // mem index capacity is line number
fmt::print(" - dense_index_building_worker: {}\n", DenseIndexBuildingWorker());
fmt::print(" - sparse_index_building_worker: {}\n", SparseIndexBuildingWorker());
fmt::print(" - fulltext_index_building_worker: {}\n", FulltextIndexBuildingWorker());
fmt::print(" - storage_type: {}\n", ToString(StorageType()));
switch (StorageType()) {
case StorageType::kLocal: {
Expand Down
3 changes: 3 additions & 0 deletions src/main/config.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public:
void SetOptimizeInterval(i64);

i64 MemIndexCapacity();
i64 DenseIndexBuildingWorker();
i64 SparseIndexBuildingWorker();
i64 FulltextIndexBuildingWorker();

StorageType StorageType();
String ObjectStorageUrl();
Expand Down
24 changes: 10 additions & 14 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ Status InfinityContext::ChangeServerRole(NodeRole target_role, bool from_leader,
}
task_scheduler_ = MakeUnique<TaskScheduler>(config_.get());

i64 cpu_limit = config_->CPULimit();
SetIndexThreadPool(cpu_limit);
SetIndexThreadPool();
break;
}
case NodeRole::kStandalone: {
Expand Down Expand Up @@ -513,21 +512,18 @@ void InfinityContext::UnInit() {
config_.reset();
}

void InfinityContext::SetIndexThreadPool(SizeT thread_num) {
thread_num = thread_num / 2;
if (thread_num < 2)
thread_num = 2;
LOG_TRACE(fmt::format("Set index thread pool size to {}", thread_num));
inverting_thread_pool_.resize(thread_num);
commiting_thread_pool_.resize(thread_num);
hnsw_build_thread_pool_.resize(thread_num);
void InfinityContext::SetIndexThreadPool() {
LOG_TRACE("Set index thread pool.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
}

void InfinityContext::RestoreIndexThreadPoolToDefault() {
LOG_TRACE("Restore index thread pool size to default");
inverting_thread_pool_.resize(4);
commiting_thread_pool_.resize(2);
hnsw_build_thread_pool_.resize(4);
LOG_TRACE("Restore index thread pool size to default.");
inverting_thread_pool_.resize(config_->DenseIndexBuildingWorker());
commiting_thread_pool_.resize(config_->SparseIndexBuildingWorker());
hnsw_build_thread_pool_.resize(config_->FulltextIndexBuildingWorker());
}

void InfinityContext::AddThriftServerFn(std::function<void()> start_func, std::function<void()> stop_func) {
Expand Down
Loading

0 comments on commit 1e14308

Please sign in to comment.