diff --git a/src/common/stl.cppm b/src/common/stl.cppm index 39758e923a..804b77eaf0 100644 --- a/src/common/stl.cppm +++ b/src/common/stl.cppm @@ -196,9 +196,9 @@ export namespace std { using std::is_floating_point_v; using std::common_type_t; - using std::monostate; using std::function; - + using std::monostate; + using std::thread; } // namespace std namespace infinity { diff --git a/src/storage/invertedindex/column_indexer.cpp b/src/storage/invertedindex/column_indexer.cpp index d85a7ef520..11a567f86d 100644 --- a/src/storage/invertedindex/column_indexer.cpp +++ b/src/storage/invertedindex/column_indexer.cpp @@ -36,43 +36,47 @@ import fst; namespace infinity { -ColumnIndexer::ColumnIndexer(Indexer *indexer, - u64 column_id, +ColumnIndexer::ColumnIndexer(u64 column_id, + const String directory, const InvertedIndexConfig &index_config, SharedPtr byte_slice_pool, - SharedPtr buffer_pool) { - active_memory_indexer_ = MakeUnique(indexer, this, column_id, index_config, byte_slice_pool, buffer_pool); - index_name_ = indexer->GetDirectory(); - Path path = Path(index_name_) / std::to_string(column_id); - index_name_ = path.string(); + SharedPtr buffer_pool, + ThreadPool &thread_pool) + : thread_pool_(thread_pool) { + active_memory_indexer_ = MakeUnique(column_id, index_config, byte_slice_pool, buffer_pool, thread_pool); + standby_memory_indexer_ = MakeUnique(column_id, index_config, byte_slice_pool, buffer_pool, thread_pool); + directory_ = directory; std::error_code ec; - bool path_exists = std::filesystem::exists(path); + bool path_exists = std::filesystem::exists(directory); if (!path_exists) { - std::filesystem::create_directories(path, ec); + std::filesystem::create_directories(directory, ec); } } ColumnIndexer::~ColumnIndexer() {} -void ColumnIndexer::Insert(RowID row_id, String &data) { active_memory_indexer_->Insert(row_id, data); } - -void ColumnIndexer::Insert(const ColumnVector &column_vector, RowID start_row_id) { active_memory_indexer_->Insert(column_vector, start_row_id); } - -void ColumnIndexer::PreCommit() { active_memory_indexer_->PreCommit(); } +void ColumnIndexer::Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) { + active_memory_indexer_->Insert(column_vector, row_offset, row_count, row_id_begin); +} void ColumnIndexer::Commit() { active_memory_indexer_->Commit(); } void ColumnIndexer::Dump() { - active_memory_indexer_->DisableCommit(); - Path path = Path(index_name_) / std::to_string(current_segment_id_); + { + std::unique_lock lock(mutex_); + std::swap(active_memory_indexer_, standby_memory_indexer_); + } + standby_memory_indexer_->WaitInflightTasks(); + + Path path = Path(directory_) / std::to_string(current_segment_id_); String index_prefix = path.string(); LocalFileSystem fs; String posting_file = index_prefix + POSTING_SUFFIX; SharedPtr posting_file_writer = MakeShared(fs, posting_file, 128000); String dict_file = index_prefix + DICT_SUFFIX; SharedPtr dict_file_writer = MakeShared(fs, dict_file, 128000); - MemoryIndexer::PostingTable *posting_table = active_memory_indexer_->GetPostingTable(); - TermMetaDumper term_meta_dumpler(active_memory_indexer_->index_config_.GetPostingFormatOption()); + MemoryIndexer::PostingTable *posting_table = standby_memory_indexer_->GetPostingTable(); + TermMetaDumper term_meta_dumpler(standby_memory_indexer_->index_config_.GetPostingFormatOption()); String fst_file = index_prefix + DICT_SUFFIX + ".fst"; std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc); @@ -95,7 +99,7 @@ void ColumnIndexer::Dump() { fs.AppendFile(dict_file, fst_file); fs.DeleteFile(fst_file); } - active_memory_indexer_->Reset(); + standby_memory_indexer_->Reset(); } } // namespace infinity diff --git a/src/storage/invertedindex/column_indexer.cppm b/src/storage/invertedindex/column_indexer.cppm index 2cd3976fa2..9c8a223401 100644 --- a/src/storage/invertedindex/column_indexer.cppm +++ b/src/storage/invertedindex/column_indexer.cppm @@ -32,29 +32,36 @@ namespace infinity { class Indexer; export class ColumnIndexer { public: - ColumnIndexer(Indexer *indexer, - u64 column_id, + ColumnIndexer(u64 column_id, + const String directory, const InvertedIndexConfig &index_config, SharedPtr byte_slice_pool, - SharedPtr buffer_pool); + SharedPtr buffer_pool, + ThreadPool &thread_pool); ~ColumnIndexer(); - MemoryIndexer *GetMemoryIndexer() { return active_memory_indexer_.get(); } - // realtime insert - void Insert(RowID row_id, String &data); + MemoryIndexer *GetMemoryIndexer() { + std::unique_lock lock(mutex_); + return active_memory_indexer_.get(); + } - void Insert(const ColumnVector &column_vector, RowID start_row_id); - - void PreCommit(); + // One thread can call this method. This's non-blocking. + void Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin); + // A background thread of Indexer calls this method regularly (for example, every 2 seconds). This's non-blocking. + // Other threads can also call this method. void Commit(); + // One thread can call this method when memory limit reach (for example, 200MB). This's blocking. void Dump(); private: - UniquePtr active_memory_indexer_; - String index_name_; + std::mutex mutex_; + UniquePtr active_memory_indexer_, standby_memory_indexer_; + + String directory_; + ThreadPool &thread_pool_; u32 current_segment_id_{0}; }; diff --git a/src/storage/invertedindex/column_inverter.cpp b/src/storage/invertedindex/column_inverter.cpp index 26f6acaa63..fe6fc94d01 100644 --- a/src/storage/invertedindex/column_inverter.cpp +++ b/src/storage/invertedindex/column_inverter.cpp @@ -14,39 +14,165 @@ module; -module column_inverter; +#include +#include +#include +module column_inverter; import stl; -import column_vector; -import internal_types; - +import analyzer; +import memory_pool; +import pool_allocator; +import string_ref; +import term; +import radix_sort; +import index_defines; +import memory_indexer; namespace infinity { -RefCount::RefCount() : lock_(), cv_(), ref_count_(0u) {} +template +static u32 Align(u32 unaligned) { + return (unaligned + T - 1) & (-T); +} + +ColumnInverter::ColumnInverter(MemoryIndexer &memory_indexer) + : memory_indexer_(memory_indexer), analyzer_(memory_indexer.GetAnalyzer()), jieba_specialize_(memory_indexer.IsJiebaSpecialize()), + alloc_(memory_indexer.GetPool()), terms_(alloc_), positions_(alloc_), term_refs_(alloc_) {} + +bool ColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; } + +void ColumnInverter::InvertColumn(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) { + docid_t start_doc_id = RowID2DocID(row_id_begin); + for (SizeT i = 0; i < row_count; ++i) { + String data = column_vector.ToString(row_offset + i); + InvertColumn(start_doc_id + i, data); + } +} + +void ColumnInverter::InvertColumn(u32 doc_id, const String &val) { + auto terms_once_ = MakeUnique(); + analyzer_->Analyze(val, *terms_once_, jieba_specialize_); + terms_per_doc_.push_back(Pair>(doc_id, std::move(terms_once_))); +} + +u32 ColumnInverter::AddTerm(StringRef term) { + const u32 terms_size = terms_.size(); + const u32 unpadded_size = terms_size + 4 + term.size() + 1; + const u32 fully_padded_size = Align<4>(unpadded_size); + terms_.resize(fully_padded_size); -RefCount::~RefCount() {} + char *buf = &terms_[0] + terms_size; + memset(buf, 0, 4); + memcpy(buf + 4, term.data(), term.size()); + memset(buf + 4 + term.size(), 0, fully_padded_size - unpadded_size + 1); -void RefCount::Retain() noexcept { - std::lock_guard guard(lock_); - ++ref_count_; + u32 term_ref = (terms_size + 4) >> 2; + term_refs_.push_back(term_ref); + return term_ref; } -void RefCount::Release() noexcept { - std::lock_guard guard(lock_); - --ref_count_; - if (ref_count_ == 0u) { - cv_.notify_all(); +void ColumnInverter::Merge(ColumnInverter &rhs) { + if (positions_.empty()) { + for (auto &doc_terms : terms_per_doc_) { + u32 doc_id = doc_terms.first; + auto &terms_once_ = doc_terms.second; + for (auto it = terms_once_->begin(); it != terms_once_->end(); ++it) { + StringRef term(it->text_); + u32 term_ref = AddTerm(term); + positions_.emplace_back(term_ref, doc_id, it->word_offset_); + } + } + terms_per_doc_.clear(); + } + for (auto &doc_terms : rhs.terms_per_doc_) { + u32 doc_id = doc_terms.first; + auto &terms_once_ = doc_terms.second; + for (auto it = terms_once_->begin(); it != terms_once_->end(); ++it) { + StringRef term(it->text_); + u32 term_ref = AddTerm(term); + positions_.emplace_back(term_ref, doc_id, it->word_offset_); + } } + rhs.terms_per_doc_.clear(); } -void RefCount::WaitForZeroRefCount() { - std::unique_lock guard(lock_); - cv_.wait(guard, [this] { return (ref_count_ == 0u); }); +struct TermRefRadix { + u32 operator()(const u64 v) { return v >> 32; } +}; + +void ColumnInverter::SortTerms() { + Vector first_four_bytes(term_refs_.size()); + for (u32 i = 1; i < term_refs_.size(); ++i) { + u64 first_four = ntohl(*reinterpret_cast(GetTermFromRef(term_refs_[i]))); + first_four_bytes[i] = (first_four << 32) | term_refs_[i]; + } + ShiftBasedRadixSorter::RadixSort(TermRefRadix(), + CompareTermRef(terms_), + &first_four_bytes[1], + first_four_bytes.size() - 1, + 16); + for (u32 i(1); i < first_four_bytes.size(); i++) { + term_refs_[i] = first_four_bytes[i] & 0xffffffffl; + } + auto term_ref_begin(term_refs_.begin() + 1); + uint32_t term_num = 1; // First valid term number + const char *last_term = GetTermFromRef(*term_ref_begin); + UpdateTermNum(*term_ref_begin, term_num); + for (++term_ref_begin; term_ref_begin != term_refs_.end(); ++term_ref_begin) { + const char *term = GetTermFromRef(*term_ref_begin); + int cmpres = strcmp(last_term, term); + // assert(cmpres <= 0); + if (cmpres < 0) { + ++term_num; + term_refs_[term_num] = *term_ref_begin; + last_term = term; + } + UpdateTermNum(*term_ref_begin, term_num); + } + // assert(term_refs_.size() >= term_num + 1); + term_refs_.resize(term_num + 1); + // Replace initial word reference by word number. + for (auto &p : positions_) { + p.term_num_ = GetTermNum(p.term_num_); + } } -bool RefCount::ZeroRefCount() { - std::unique_lock guard(lock_); - return (ref_count_ == 0u); +struct FullRadix { + u64 operator()(const ColumnInverter::PosInfo &p) const { return (static_cast(p.term_num_) << 32) | p.doc_id_; } +}; + +void ColumnInverter::Sort() { + SortTerms(); + ShiftBasedRadixSorter, 56, true>::RadixSort(FullRadix(), + std::less(), + &positions_[0], + positions_.size(), + 16); +} + +void ColumnInverter::GeneratePosting() { + u32 last_term_num = 0; + u32 last_term_pos = 0; + u32 last_doc_id = 0; + StringRef term; + MemoryIndexer::PostingPtr posting = nullptr; + for (auto &i : positions_) { + if (last_term_num != i.term_num_ || last_doc_id != i.doc_id_) { + if (last_term_num != i.term_num_) { + last_term_num = i.term_num_; + term = GetTermFromNum(last_term_num); + posting = memory_indexer_.GetOrAddPosting(String(term.data())); + } + last_doc_id = i.doc_id_; + if (last_doc_id != 0) { + posting->EndDocument(last_doc_id, 0); + } + } + if (i.term_pos_ != last_term_pos) { + last_term_pos = i.term_pos_; + posting->AddPosition(last_term_pos); + } + } } } // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/column_inverter.cppm b/src/storage/invertedindex/column_inverter.cppm index db3e5f0254..eb148a3b63 100644 --- a/src/storage/invertedindex/column_inverter.cppm +++ b/src/storage/invertedindex/column_inverter.cppm @@ -15,47 +15,94 @@ module; export module column_inverter; + import stl; +import analyzer; + import column_vector; +import memory_pool; +import pool_allocator; +import term; +import string_ref; import internal_types; namespace infinity { -class RefCount { - std::mutex lock_; - std::condition_variable cv_; - u32 ref_count_; - +class MemoryIndexer; +export class ColumnInverter { public: - RefCount(); - virtual ~RefCount(); - void Retain() noexcept; - void Release() noexcept; - void WaitForZeroRefCount(); - bool ZeroRefCount(); -}; + ColumnInverter(MemoryIndexer &memory_indexer); + ColumnInverter(const ColumnInverter &) = delete; + ColumnInverter(const ColumnInverter &&) = delete; + ColumnInverter &operator=(const ColumnInverter &) = delete; + ColumnInverter &operator=(const ColumnInverter &&) = delete; -export struct ColumnCommitter { + void InvertColumn(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin); - void Retain() { ref_count_.Retain(); } + void InvertColumn(u32 doc_id, const String &val); - void Release() { ref_count_.Release(); } + void Merge(ColumnInverter &rhs); - bool ZeroRefCount() { return ref_count_.ZeroRefCount(); } + void Sort(); - void WaitForZeroRefCount() { return ref_count_.WaitForZeroRefCount(); } + void GeneratePosting(); - RefCount &GetRefCount() { return ref_count_; } + struct PosInfo { + u32 term_num_{0}; + u32 doc_id_{0}; + u32 term_pos_{0}; - virtual void Commit() = 0; + bool operator<(const PosInfo &rhs) const { + if (term_num_ != rhs.term_num_) { + return term_num_ < rhs.term_num_; + } + if (doc_id_ != rhs.doc_id_) { + return doc_id_ < rhs.doc_id_; + } + return term_pos_ < rhs.term_pos_; + } + }; - RefCount ref_count_; -}; +private: + using TermBuffer = Vector>; + using PosInfoVec = Vector>; + using U32Vec = Vector>; -export class ColumnInverter { -public: - virtual void InvertColumn(const ColumnVector &column_vector, RowID start_row_id) = 0; + struct CompareTermRef { + const char *const term_buffer_; + + CompareTermRef(const TermBuffer &term_buffer) : term_buffer_(&term_buffer[0]) {} + + const char *GetTerm(u32 term_ref) const { return &term_buffer_[term_ref << 2]; } + + bool operator()(const u32 lhs, const u32 rhs) const; + }; + + const char *GetTermFromRef(u32 term_ref) const { return &terms_[term_ref << 2]; } + + const char *GetTermFromNum(u32 term_num) const { return GetTermFromRef(term_refs_[term_num]); } + + u32 GetTermNum(u32 term_ref) const { + const char *p = &terms_[(term_ref - 1) << 2]; + return *reinterpret_cast(p); + } + + void UpdateTermNum(u32 term_ref, u32 term_num) { + char *p = &terms_[(term_ref - 1) << 2]; + *reinterpret_cast(p) = term_num; + } + + u32 AddTerm(StringRef term); + + void SortTerms(); - virtual void InvertColumn(u32 doc_id, const String &val) = 0; + MemoryIndexer &memory_indexer_; + Analyzer *analyzer_{nullptr}; + bool jieba_specialize_{false}; + PoolAllocator alloc_; + TermBuffer terms_; + PosInfoVec positions_; + U32Vec term_refs_; + Vector>> terms_per_doc_; }; -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/invertedindex/commit_task.cpp b/src/storage/invertedindex/commit_task.cpp deleted file mode 100644 index 5f5a46edbf..0000000000 --- a/src/storage/invertedindex/commit_task.cpp +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -module commit_task; - -import stl; -import column_inverter; - -namespace infinity { - -CommitTask::CommitTask(ColumnCommitter *inverter) : inverter_(inverter) { inverter_->Retain(); } - -CommitTask::~CommitTask() { inverter_->Release(); } - -void CommitTask::Run() { inverter_->Commit(); } - -} // namespace infinity diff --git a/src/storage/invertedindex/commit_task.cppm b/src/storage/invertedindex/commit_task.cppm deleted file mode 100644 index 60d95215c8..0000000000 --- a/src/storage/invertedindex/commit_task.cppm +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -export module commit_task; - -import stl; -import task_executor; -import column_inverter; - -namespace infinity { -export class CommitTask : public TaskExecutor::Task { -public: - CommitTask(ColumnCommitter *inverter); - - ~CommitTask(); - - void Run() override; - -private: - ColumnCommitter *inverter_{nullptr}; -}; -} // namespace infinity diff --git a/src/storage/invertedindex/index_config.cppm b/src/storage/invertedindex/index_config.cppm index fee5c8219c..9eb30c32fe 100644 --- a/src/storage/invertedindex/index_config.cppm +++ b/src/storage/invertedindex/index_config.cppm @@ -68,7 +68,7 @@ public: private: String index_name_{""}; u64 memory_quota_{200 * 1024 * 1024}; - u32 indexing_threads_{1}; + u32 indexing_threads_{2}; PostingFormatOption posting_format_option_; optionflag_t flag_{OPTION_FLAG_ALL}; bool is_short_list_vbyte_compress_{false}; diff --git a/src/storage/invertedindex/indexer.cpp b/src/storage/invertedindex/indexer.cpp index 614766b9c5..fd87f1858a 100644 --- a/src/storage/invertedindex/indexer.cpp +++ b/src/storage/invertedindex/indexer.cpp @@ -38,13 +38,25 @@ import column_indexer; import logical_type; import internal_types; - +#include namespace infinity { -Indexer::Indexer() {} +Indexer::Indexer() { + thd_commit_ = std::thread([this] { + while (!stopped_.load()) { + sleep(2); + this->Commit(); + } + }); + thd_commit_.detach(); +} -Indexer::~Indexer() {} +Indexer::~Indexer() { + stopped_.store(true); + thd_commit_.join(); + thread_pool_.stop(); +} void Indexer::Open(const InvertedIndexConfig &index_config, const String &directory) { index_config_ = index_config; @@ -62,7 +74,10 @@ void Indexer::Open(const InvertedIndexConfig &index_config, const String &direct index_config_.GetColumnIDs(column_ids_); for (SizeT i = 0; i < column_ids_.size(); ++i) { u64 column_id = column_ids_[i]; - column_indexers_[column_id] = MakeUnique(this, column_id, index_config_, byte_slice_pool_, buffer_pool_); + String index_dir = GetDirectory(); + Path path = Path(index_dir) / std::to_string(column_id); + index_dir = path.string(); + column_indexers_[column_id] = MakeUnique(column_id, index_dir, index_config_, byte_slice_pool_, buffer_pool_, thread_pool_); } id_generator_ = MakeShared(); @@ -72,6 +87,9 @@ void Indexer::Open(const InvertedIndexConfig &index_config, const String &direct AddSegment(); dump_ref_count_ = column_ids_.size(); + + u32 num_threads = index_config.GetIndexingParallelism(); + thread_pool_.resize(num_threads); } void Indexer::AddSegment() { @@ -101,38 +119,22 @@ void Indexer::BatchInsert(const BlockEntry *block_entry, u32 row_offset, u32 row ctx->SetBaseDocId(doc_id_begin); } else { if (ctx->GetNextDocId() != doc_id_begin) { - // TODO: doc id is not continuous + // doc id gap + AddSegment(); + ctx = active_segment_.load(std::memory_order_relaxed); + ctx->SetBaseDocId(doc_id_begin); } } for (auto &[column_id, column_indexer] : column_indexers_) { BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_id); ColumnVector column_vector = block_column_entry->GetColumnVector(buffer_mgr); - column_indexers_[column_id]->Insert(column_vector, row_id_begin); + column_indexers_[column_id]->Insert(column_vector, row_offset, row_count, row_id_begin); } ctx->IncDocCount(row_count); } -void Indexer::Insert(RowID row_id, String &data) { - UpdateSegment(row_id); - for (SizeT i = 0; i < column_ids_.size(); ++i) { - u64 column_id = column_ids_[i]; - column_indexers_[column_id]->Insert(row_id, data); - } -} - void Indexer::Commit() { - // Commit is called in a dedicate thread periodically - { - // PreCommit is used to switch internal inverters such that all - // data after the Commit is called from other threads will not - // be out of sync - std::unique_lock lock(flush_mutex_); - for (SizeT i = 0; i < column_ids_.size(); ++i) { - u64 column_id = column_ids_[i]; - column_indexers_[column_id]->PreCommit(); - } - } for (SizeT i = 0; i < column_ids_.size(); ++i) { u64 column_id = column_ids_[i]; column_indexers_[column_id]->Commit(); @@ -142,7 +144,7 @@ void Indexer::Commit() { void Indexer::Dump() { auto *flush_segment = active_segment_.load(std::memory_order_relaxed); AddSegment(); - flush_segment->SetSegmentStatus(Segment::DUMPLING); + flush_segment->SetSegmentStatus(Segment::DUMPING); for (SizeT i = 0; i < column_ids_.size(); ++i) { u64 column_id = column_ids_[i]; @@ -155,17 +157,5 @@ SharedPtr Indexer::CreateInMemSegmentReader(u64 column_ return MakeShared(column_indexers_[column_id]->GetMemoryIndexer()); } -void Indexer::TryDump() { - dump_ref_count_.fetch_sub(1, std::memory_order_acq_rel); - - if (dump_ref_count_ == 0) { - // TODO, using a global resource manager to control the memory quota - if (byte_slice_pool_->GetUsedBytes() >= index_config_.GetMemoryQuota()) { - Dump(); - } - dump_ref_count_ = column_ids_.size(); - } -} - void Indexer::GetSegments(Vector &segments) { segments_ = segments; } } // namespace infinity diff --git a/src/storage/invertedindex/indexer.cppm b/src/storage/invertedindex/indexer.cppm index ff844eb15b..4f247633fd 100644 --- a/src/storage/invertedindex/indexer.cppm +++ b/src/storage/invertedindex/indexer.cppm @@ -50,14 +50,14 @@ public: void Open(const InvertedIndexConfig &index_config, const String &directory); + // One thread can call this method. This's non-blocking. void BatchInsert(const BlockEntry *block_entry, u32 row_offset, u32 row_count, BufferManager *buffer_mgr); - void Insert(RowID row_id, String &data); - + // A background thread of Indexer calls this method regularly (for example, every 2 seconds). This's non-blocking. + // Other threads can also call this method. void Commit(); - void TryDump(); - + // One thread can call this method when memory limit reach (for example, 200MB). This's blocking. void Dump(); SharedPtr CreateInMemSegmentReader(u64 column_id); @@ -90,6 +90,10 @@ private: SharedPtr id_generator_; Vector segments_; Atomic active_segment_; + + ThreadPool thread_pool_{2}; + std::thread thd_commit_; + Atomic stopped_; Atomic dump_ref_count_; mutable std::shared_mutex flush_mutex_; }; diff --git a/src/storage/invertedindex/invert_task.cpp b/src/storage/invertedindex/invert_task.cpp deleted file mode 100644 index c621cc2d7a..0000000000 --- a/src/storage/invertedindex/invert_task.cpp +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -module invert_task; - -import stl; -import column_inverter; -import column_vector; - -import internal_types; - -namespace infinity { - -InvertTask::InvertTask(ColumnInverter *inverter, const String &value, u32 row_id) : inverter_(inverter), value_(value), row_id_(row_id) {} - -void InvertTask::Run() { inverter_->InvertColumn(row_id_, value_); } - -BatchInvertTask::BatchInvertTask(ColumnInverter *inverter, const ColumnVector &column_vector, RowID start_row_id) - : inverter_(inverter), column_vector_(column_vector), start_row_id_(start_row_id) {} - -void BatchInvertTask::Run() { inverter_->InvertColumn(column_vector_, start_row_id_); } - -} // namespace infinity diff --git a/src/storage/invertedindex/invert_task.cppm b/src/storage/invertedindex/invert_task.cppm index 35a842feab..097a9b7701 100644 --- a/src/storage/invertedindex/invert_task.cppm +++ b/src/storage/invertedindex/invert_task.cppm @@ -17,43 +17,20 @@ module; export module invert_task; import stl; -import task_executor; -import column_inverter; import column_vector; - import internal_types; namespace infinity { -export class InvertTask : public TaskExecutor::Task { -public: - explicit InvertTask(ColumnInverter *inverter, const String &value, u32 row_id); - - ~InvertTask() = default; - - void Run() override; - -private: - ColumnInverter *inverter_{nullptr}; - - const String &value_; - - u32 row_id_; -}; - -export class BatchInvertTask : public TaskExecutor::Task { +export struct BatchInvertTask { public: - BatchInvertTask(ColumnInverter *inverter, const ColumnVector &column_vector, RowID start_row_id); - - ~BatchInvertTask() = default; - - void Run() override; - -private: - ColumnInverter *inverter_{nullptr}; - - ColumnVector column_vector_; - - RowID start_row_id_; + BatchInvertTask(u64 task_seq, const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) + : task_seq_(task_seq), column_vector_(column_vector), row_offset_(row_offset), row_count_(row_count), row_id_begin_(row_id_begin) {} + + u64 task_seq_; + const ColumnVector &column_vector_; + u32 row_offset_; + u32 row_count_; + RowID row_id_begin_; }; } // namespace infinity diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index 8e59755460..4daca0f80c 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -14,6 +14,7 @@ module; +#include #include #include #include @@ -47,52 +48,29 @@ import column_vector; import analyzer; import analyzer_pool; import term; -import sequential_column_inverter; -import parallel_column_inverter; +import column_inverter; import invert_task; -import commit_task; -import task_executor; import indexer; import third_party; +import ring; namespace infinity { -int IndexInverter(Runnable &worker) { - worker.Run(); - return 1; -} - -int IndexCommiter(Runnable &worker) { - worker.Run(); - return 1; -} - bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) const { int ret = strcmp(lhs.c_str(), rhs.c_str()); return ret < 0; } -// bool MemoryIndexer::KeyComp::operator()(const TermKey &lhs, const TermKey &rhs) const { return lhs < rhs; } - -MemoryIndexer::MemoryIndexer(Indexer *indexer, - ColumnIndexer *column_indexer, - u64 column_id, +MemoryIndexer::MemoryIndexer(u64 column_id, const InvertedIndexConfig &index_config, SharedPtr byte_slice_pool, - SharedPtr buffer_pool) - : indexer_(indexer), column_indexer_(column_indexer), column_id_(column_id), index_config_(index_config), byte_slice_pool_(byte_slice_pool), - buffer_pool_(buffer_pool), num_inverters_(1), max_inverters_(4) { + SharedPtr buffer_pool, + ThreadPool &thread_pool) + : column_id_(column_id), index_config_(index_config), byte_slice_pool_(byte_slice_pool), buffer_pool_(buffer_pool), thread_pool_(thread_pool), + ring_inverted_(10UL), ring_sorted_(10UL) { memory_allocator_ = MakeShared(GetPool()); posting_store_ = MakeUnique(memory_allocator_.get()); SetAnalyzer(); - if (index_config_.GetIndexingParallelism() > 1) { - inverter_ = MakeUnique(this); - } else { - parallel_inverter_ = MakeUnique(this, index_config_.GetIndexingParallelism()); - } - invert_executor_ = - SequencedTaskExecutor::Create(IndexInverter, index_config_.GetIndexingParallelism(), index_config_.GetIndexingParallelism() * 1000); - commit_executor_ = SequencedTaskExecutor::Create(IndexCommiter, 1, 1000); } MemoryIndexer::~MemoryIndexer() { Reset(); } @@ -105,93 +83,42 @@ void MemoryIndexer::SetAnalyzer() { jieba_specialize_ = analyzer.compare("chinese") == 0 ? true : false; } -void MemoryIndexer::Insert(RowID row_id, String &data) { inverter_->InvertColumn(RowID2DocID(row_id), data); } - -void MemoryIndexer::Insert(const ColumnVector &column_vector, RowID start_row_id) { - if (index_config_.GetIndexingParallelism() > 1) { - docid_t start_doc_id = RowID2DocID(start_row_id); - - u32 row_size = column_vector.Size(); - for (u32 i = 0; i < row_size; i += parallel_inverter_->Size()) { - for (u32 j = 0; j < parallel_inverter_->inverters_.size(); ++j) { - auto task = MakeUnique(parallel_inverter_->inverters_[j].get(), column_vector.ToString(i + j), start_doc_id + i + j); - invert_executor_->Execute(j, std::move(task)); - } - } - } else { - auto task = MakeUnique(inverter_.get(), column_vector, start_row_id); - invert_executor_->Execute(0, std::move(task)); - } -} - -void MemoryIndexer::SwitchActiveInverter() { - inflight_inverters_.emplace_back(std::move(inverter_)); - while (!inflight_inverters_.empty() && inflight_inverters_.front()->ZeroRefCount()) { - free_inverters_.emplace_back(std::move(inflight_inverters_.front())); - inflight_inverters_.pop_front(); - } - if (!free_inverters_.empty()) { - inverter_ = std::move(free_inverters_.back()); - free_inverters_.pop_back(); - return; - } - if (num_inverters_ >= max_inverters_) { - // assert(!inflight_inverters_.empty()); - inverter_ = std::move(inflight_inverters_.front()); - inflight_inverters_.pop_front(); - inverter_->WaitForZeroRefCount(); - return; - } - inverter_ = MakeUnique(this); - ++num_inverters_; -} - -void MemoryIndexer::SwitchActiveParallelInverters() { - inflight_parallel_inverters_.emplace_back(std::move(parallel_inverter_)); - while (!inflight_parallel_inverters_.empty() && inflight_inverters_.front()->ZeroRefCount()) { - free_parallel_inverters_.emplace_back(std::move(inflight_parallel_inverters_.front())); - inflight_inverters_.pop_front(); - } - if (!free_parallel_inverters_.empty()) { - parallel_inverter_ = std::move(free_parallel_inverters_.back()); - free_parallel_inverters_.pop_back(); - return; - } - if (num_inverters_ >= max_inverters_) { - parallel_inverter_ = std::move(inflight_parallel_inverters_.front()); - inflight_parallel_inverters_.pop_front(); - return; - } - - parallel_inverter_ = MakeUnique(this, index_config_.GetIndexingParallelism()); - - ++num_inverters_; -} - -void MemoryIndexer::PreCommit() { - if (index_config_.GetIndexingParallelism() > 1) { - inflight_commit_task_ = MakeUnique(parallel_inverter_.get()); - SwitchActiveParallelInverters(); - } else { - auto task = MakeUnique(inverter_.get()); - commit_executor_->Execute(0, std::move(task)); - SwitchActiveInverter(); +void MemoryIndexer::Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) { + { + std::unique_lock lock(mutex_); + inflight_tasks_++; } + u64 seq_inserted = seq_inserted_++; + auto task = MakeShared(seq_inserted, column_vector, row_offset, row_count, row_id_begin); + auto func = [this, &task](int id) { + auto inverter = MakeShared(*this); + inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->row_id_begin_); + this->ring_inverted_.Put(task->task_seq_, inverter); + }; + thread_pool_.push(func); } void MemoryIndexer::Commit() { - std::unique_lock lck(mutex_); - while (disable_commit_) - cv_.wait(lck); - - if (index_config_.GetIndexingParallelism() > 1) { - invert_executor_->SyncAll(); - commit_executor_->Execute(0, std::move(inflight_commit_task_)); - } + thread_pool_.push([this](int id) { + Vector> inverters; + u64 seq_commit = this->ring_inverted_.GetMulti(inverters); + SizeT num = inverters.size(); + for (SizeT i = 1; i < num; i++) { + inverters[0]->Merge(*inverters[i]); + } + inverters[0]->Sort(); + this->ring_sorted_.Put(seq_commit, inverters[0]); + this->ring_sorted_.Iterate([](SharedPtr &inverter) { inverter->GeneratePosting(); }); + { + std::unique_lock lock(mutex_); + inflight_tasks_ -= num; + if (inflight_tasks_ == 0) { + cv_.notify_all(); + } + } + }); } -void MemoryIndexer::TryDump() { indexer_->TryDump(); } - MemoryIndexer::PostingPtr MemoryIndexer::GetOrAddPosting(const TermKey &term) { MemoryIndexer::PostingTable::Iterator iter = posting_store_->find(term); if (iter.valid()) @@ -225,7 +152,7 @@ void MemoryIndexer::Reset() { } posting_store_->clear(); } - disable_commit_ = false; + thread_pool_.stop(true); cv_.notify_all(); } diff --git a/src/storage/invertedindex/memory_indexer.cppm b/src/storage/invertedindex/memory_indexer.cppm index 7c0d83f111..866c86d6c7 100644 --- a/src/storage/invertedindex/memory_indexer.cppm +++ b/src/storage/invertedindex/memory_indexer.cppm @@ -28,19 +28,16 @@ import data_block; import column_vector; import analyzer; -import sequential_column_inverter; -import parallel_column_inverter; -import task_executor; +import column_inverter; import third_party; import internal_types; -import commit_task; +import ring; namespace vespalib::alloc { class MemoryPoolAllocator; } namespace infinity { -class Indexer; class ColumnIndexer; export class MemoryIndexer { public: @@ -50,37 +47,31 @@ public: struct KeyComp { bool operator()(const String &lhs, const String &rhs) const; - // bool operator()(const TermKey &lhs, const TermKey &rhs) const; }; enum IndexMode { - REAL_TIME, NEAR_REAL_TIME, OFFLINE, }; - MemoryIndexer(Indexer *indexer, - ColumnIndexer *column_indexer, - u64 column_id, + MemoryIndexer(u64 column_id, const InvertedIndexConfig &index_config, SharedPtr byte_slice_pool, - SharedPtr buffer_pool); + SharedPtr buffer_pool, + ThreadPool &thread_pool); ~MemoryIndexer(); - void SetIndexMode(IndexMode index_mode); - - bool IsRealTime() { return index_mode_ == REAL_TIME; } - // realtime insert - void Insert(RowID row_id, String &data); - - void Insert(const ColumnVector &column_vector, RowID start_row_id); - - void PreCommit(); + void Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin); void Commit(); - void TryDump(); + void WaitInflightTasks() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return inflight_tasks_ == 0; }); + } + + void SetIndexMode(IndexMode index_mode); Analyzer *GetAnalyzer() { return analyzer_.get(); } @@ -96,20 +87,12 @@ public: void Reset(); - void DisableCommit() { disable_commit_ = true; } - private: void SetAnalyzer(); - void SwitchActiveInverter(); - - void SwitchActiveParallelInverters(); - private: friend class ColumnIndexer; - Indexer *indexer_{nullptr}; - ColumnIndexer *column_indexer_{nullptr}; IndexMode index_mode_{NEAR_REAL_TIME}; u64 column_id_; InvertedIndexConfig index_config_; @@ -120,22 +103,13 @@ private: UniquePtr posting_store_; UniquePtr analyzer_; bool jieba_specialize_{false}; - Vector> free_inverters_; - Deque> inflight_inverters_; - UniquePtr inverter_; - - UniquePtr parallel_inverter_; - Vector> free_parallel_inverters_; - Deque> inflight_parallel_inverters_; - - UniquePtr inflight_commit_task_; - u32 num_inverters_; - u32 max_inverters_; - UniquePtr invert_executor_; - UniquePtr commit_executor_; + ThreadPool &thread_pool_; + Ring> ring_inverted_; + Ring> ring_sorted_; + u64 seq_inserted_{0}; + u64 inflight_tasks_{0}; - bool disable_commit_{false}; std::condition_variable cv_; std::mutex mutex_; }; diff --git a/src/storage/invertedindex/parallel_column_inverter.cpp b/src/storage/invertedindex/parallel_column_inverter.cpp deleted file mode 100644 index d26a13a187..0000000000 --- a/src/storage/invertedindex/parallel_column_inverter.cpp +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -#include -#include - -module parallel_column_inverter; - -import stl; -import analyzer; -import memory_pool; -import pool_allocator; -import string_ref; -import term; -import radix_sort; -import index_defines; -import memory_indexer; -import third_party; - -namespace infinity { - -TermPostings::TermPostings(MemoryIndexer *memory_indexer) : memory_indexer_(memory_indexer), terms_{0, ValueRefHash{}, TermEq{postings_}} {} - -void TermPostings::GetSorted(Vector &term_postings) { - SizeT num = postings_.size(); - term_postings.resize(num); - for (SizeT i = 0; i < num; i++) { - term_postings[i] = &postings_[i]; - } - std::sort(term_postings.begin(), term_postings.end(), [](const auto lhs, const auto rhs) { return StringViewCompare(lhs->term_, rhs->term_); }); -} - -TermPosting *TermPostings::Emplace(std::string_view term) { - const HashedStringView hashed_term{term}; - - bool is_new = false; - const auto it = terms_.lazy_emplace(hashed_term, [&, size = terms_.size()](const auto &ctor) { - ctor(size, hashed_term.hash_); - is_new = true; - }); - if (!is_new) { - return &postings_[it->ref_]; - } - const auto term_size = term.size(); - try { - // TODO using a dedicate object pool - MemoryPool *pool = memory_indexer_->GetPool(); - auto *start = (char *)pool->Allocate(term_size); - memcpy(start, term.data(), term_size); - return &postings_.emplace_back(start, term_size); - } catch (...) { - terms_.erase(it); - return nullptr; - } -} - -ParallelColumnInverter::ParallelColumnInverter(MemoryIndexer *memory_indexer) - : memory_indexer_(memory_indexer), analyzer_(memory_indexer->GetAnalyzer()), jieba_specialize_(memory_indexer->IsJiebaSpecialize()), - alloc_(memory_indexer->GetPool()) { - term_postings_ = MakeUnique(memory_indexer_); -} - -ParallelColumnInverter::~ParallelColumnInverter() {} - -void ParallelColumnInverter::InvertColumn(u32 doc_id, const String &val) { - terms_once_.clear(); - analyzer_->Analyze(val, terms_once_, jieba_specialize_); - for (auto it = terms_once_.begin(); it != terms_once_.end(); ++it) { - std::string_view term(it->text_); - TermPosting *term_posting = term_postings_->Emplace(term); - term_posting->values_.emplace_back(doc_id, it->word_offset_); - } -} - -void ParallelColumnInverter::InvertColumn(const ColumnVector &column_vector, RowID start_row_id) {} - -ParallelColumnInverters::ParallelColumnInverters(MemoryIndexer *memory_indexer, u32 size) : memory_indexer_(memory_indexer), size_(size) { - inverters_.resize(size_); - for (u32 i = 0; i < size_; ++i) { - inverters_[i] = MakeUnique(memory_indexer); - } -} - -void ParallelColumnInverters::Commit() { - Vector> all_postings(size_); - for (u32 i = 0; i < size_; ++i) { - TermPostings *term_postings_slice = inverters_[i]->GetTermPostings(); - Vector &term_postings = all_postings[i]; - term_postings.resize(term_postings_slice->Size()); - for (u32 j = 0; j < term_postings_slice->Size(); ++j) { - term_postings.push_back(&(term_postings_slice->postings_[i])); - } - term_postings_slice->GetSorted(term_postings); - } - auto cmp = [](const Pair &lhs, const Pair &rhs) { - return StringViewCompare(lhs.first->term_, rhs.first->term_); - }; - Heap, decltype(cmp)> pq(cmp); - for (u32 i = 0; i < size_; ++i) { - if (!all_postings[i].empty()) { - pq.push({all_postings[i][0], i}); - all_postings[i].erase(all_postings[i].begin()); - } - } - - Vector to_merge(size_); - std::string_view prev_term = pq.top().first->term_; - while (!pq.empty()) { - auto [term_posting, array_index] = pq.top(); - pq.pop(); - { - if (!StringViewCompare(prev_term, term_posting->term_)) { - DoMerge(to_merge); - prev_term = term_posting->term_; - to_merge.clear(); - } else { - to_merge.push_back(term_posting); - } - } - if (!all_postings[array_index].empty()) { - pq.push({all_postings[array_index][0], array_index}); - all_postings[array_index].erase(all_postings[array_index].begin()); - } - } - DoMerge(to_merge); - - for (u32 i = 0; i < size_; ++i) { - inverters_[i]->GetTermPostings()->Clear(); - } - - memory_indexer_->TryDump(); -} - -void ParallelColumnInverters::DoMerge(Vector &to_merge) { - Vector values; - u32 total_size = 0; - for (u32 i = 0; i < to_merge.size(); ++i) { - total_size += to_merge[i]->values_.size(); - } - values.resize(total_size); - TermPosting::PosInfo *ptr = values.data(); - for (u32 i = 0; i < to_merge.size(); ++i) { - memcpy(ptr, to_merge[i]->values_.data(), to_merge[i]->values_.size() * sizeof(TermPosting::PosInfo)); - ptr += to_merge[i]->values_.size(); - } - std::sort(values.begin(), values.end(), [](const auto lhs, const auto rhs) { - if (lhs.doc_id_ != rhs.doc_id_) { - return lhs.doc_id_ < rhs.doc_id_; - } - return lhs.term_pos_ < rhs.term_pos_; - }); - - MemoryIndexer::PostingPtr posting = memory_indexer_->GetOrAddPosting(String(to_merge[0]->term_)); - docid_t curr_doc_id = values[0].doc_id_; - for (u32 i = 0; i < values.size(); ++i) { - if (values[i].doc_id_ != curr_doc_id) { - posting->EndDocument(curr_doc_id, 0); - curr_doc_id = values[i].doc_id_; - } - posting->AddPosition(values[i].term_pos_); - } - posting->EndDocument(curr_doc_id, 0); -} - -} // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/parallel_column_inverter.cppm b/src/storage/invertedindex/parallel_column_inverter.cppm deleted file mode 100644 index 10dac30aec..0000000000 --- a/src/storage/invertedindex/parallel_column_inverter.cppm +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -#include -#include - -export module parallel_column_inverter; -import stl; -import analyzer; - -import column_vector; -import memory_pool; -import pool_allocator; -import term; -import index_defines; -import third_party; -import internal_types; -import column_inverter; - -namespace infinity { - -struct Hasher { - SizeT operator()(const std::string_view &value) const { return std::hash{}(value); } -}; - -export inline bool StringViewCompare(std::string_view lhs, std::string_view rhs) { - const SizeT size = std::min(lhs.size(), rhs.size()); - const auto res = ::memcmp(lhs.data(), rhs.data(), size); - - if (0 == res) { - return lhs.size() < rhs.size(); - } - - return res < 0; -} - -export struct HashedStringView : public std::string_view { - explicit HashedStringView(std::string_view ref, const Hasher &hasher = Hasher{}) : HashedStringView{ref, hasher(ref)} {} - - HashedStringView(std::string_view ref, SizeT hash) : std::string_view(ref), hash_(hash) {} - - SizeT hash_; -}; - -export struct ValueRef { - explicit ValueRef(SizeT ref, SizeT hash) : ref_{ref}, hash_{hash} {} - - SizeT ref_; - SizeT hash_; -}; - -struct ValueRefHash { - using is_transparent = void; - - SizeT operator()(const ValueRef &value) const noexcept { return value.hash_; } - - SizeT operator()(const HashedStringView &value) const noexcept { return value.hash_; } -}; - -export struct TermPosting { - TermPosting(const char *data, u32 size) : term_(data, size) {} - - struct PosInfo { - u32 doc_id_{0}; - u32 term_pos_{0}; - - bool operator<(const PosInfo &rhs) const { - if (doc_id_ != rhs.doc_id_) { - return doc_id_ < rhs.doc_id_; - } - return term_pos_ < rhs.term_pos_; - } - }; - std::string_view term_; - Vector values_; -}; - -class MemoryIndexer; - -export class TermPostings { -public: - TermPostings(const TermPostings &) = delete; - TermPostings(const TermPostings &&) = delete; - TermPostings &operator=(const TermPostings &) = delete; - TermPostings &operator=(const TermPostings &&) = delete; - explicit TermPostings(MemoryIndexer *memory_indexer); - ~TermPostings() {} - - void Clear() { - terms_.clear(); - postings_.clear(); - } - - void GetSorted(Vector &term_postings); - - TermPosting *Emplace(std::string_view term); - - bool Empty() const { return terms_.empty(); } - - SizeT Size() const { return terms_.size(); } - - struct TermEq { - using is_transparent = void; - explicit TermEq(const Vector &data) : data_{&data} {} - bool operator()(const ValueRef &lhs, const ValueRef &rhs) const noexcept { return lhs.ref_ == rhs.ref_; } - bool operator()(const ValueRef &lhs, const HashedStringView &rhs) const { return (*data_)[lhs.ref_].term_ == rhs; } - bool operator()(const HashedStringView &lhs, const ValueRef &rhs) const { return this->operator()(rhs, lhs); } - - const Vector *data_; - }; - MemoryIndexer *memory_indexer_{nullptr}; - Vector postings_; - FlatHashSet terms_; -}; - -export class ParallelColumnInverter : public ColumnInverter { -public: - explicit ParallelColumnInverter(MemoryIndexer *memory_indexer); - ParallelColumnInverter(const ParallelColumnInverter &) = delete; - ParallelColumnInverter(const ParallelColumnInverter &&) = delete; - ParallelColumnInverter &operator=(const ParallelColumnInverter &) = delete; - ParallelColumnInverter &operator=(const ParallelColumnInverter &&) = delete; - virtual ~ParallelColumnInverter(); - - void InvertColumn(const ColumnVector &column_vector, RowID start_row_id) override; - - void InvertColumn(u32 doc_id, const String &val) override; - - TermPostings *GetTermPostings() { return term_postings_.get(); } - -private: - MemoryIndexer *memory_indexer_{nullptr}; - UniquePtr term_postings_; - Analyzer *analyzer_{nullptr}; - bool jieba_specialize_{false}; - PoolAllocator alloc_; - TermList terms_once_; -}; - -export class ParallelColumnInverters : public ColumnCommitter { -public: - ParallelColumnInverters(MemoryIndexer *memory_indexer, u32 size); - virtual ~ParallelColumnInverters() {} - - u32 Size() { return size_; } - - void Commit() override; - - Vector> inverters_; - -private: - void DoMerge(Vector &to_merge); - - MemoryIndexer *memory_indexer_{nullptr}; - u32 size_; -}; - -} // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/ring.cppm b/src/storage/invertedindex/ring.cppm new file mode 100644 index 0000000000..750007f534 --- /dev/null +++ b/src/storage/invertedindex/ring.cppm @@ -0,0 +1,89 @@ +module; + +export module ring; + +import stl; +#include + +namespace infinity { + +export template +class Ring { +private: + std::condition_variable cv_full_; + std::condition_variable cv_empty_; + std::mutex mutex_; + Vector ring_buf_; // An element is allowed to insert into the ring if its offset in inside [off_ground_, off_ground_+2^cap_mask_) + u64 cap_shift_; + u64 cap_mask_; + u64 off_ground_; // min element offset inside the ring + u64 off_ceiling_; // 1 + max element offset inside the ring + u64 off_filled_; // elements with offset inside range [off_ground_, off_filled_) is a consecutive slice of ring, which should be + // ready to next stage + u64 seq_get_; // sequence number of get operation +public: + Ring(u64 cap_shift) : cap_shift_(cap_shift), cap_mask_((1 << cap_shift) - 1), off_ground_(0), off_ceiling_(0), off_filled_(0), seq_get_(0) { + ring_buf_.resize(1 << cap_shift); + } + + void Put(u64 off, T elem) { + T zero{}; + std::unique_lock lock(mutex_); + assert(off > off_filled_); + if (off >= off_ground_ + (1 << cap_mask_)) + cv_full_.wait(lock, [this, off] { return off < off_ground_ + (1 << cap_shift_); }); + ring_buf_[off & cap_mask_] = elem; + if (off_ceiling_ < off + 1) { + off_ceiling_ = off + 1; + } + if (off == off_filled_) { + off_filled_++; + while (off_filled_ < off_ceiling_ && ring_buf_[off_filled_ & cap_mask_] != zero) { + off_filled_++; + } + } + cv_empty_.notify_one(); + } + + u64 Size() { + std::unique_lock lock(mutex_); + return off_ceiling_ - off_ground_; + } + + u64 GetSigle(T &elem) { + std::unique_lock lock(mutex_); + if (off_ground_ == off_filled_) { + cv_empty_.wait(lock, [this] { return off_ground_ < off_filled_; }); + } + elem = ring_buf_[off_ground_ & cap_mask_]; + off_ground_++; + u64 seq = seq_get_++; + cv_full_.notify_one(); + return seq; + } + + u64 GetMulti(Vector &batch) { + std::unique_lock lock(mutex_); + if (off_ground_ == off_filled_) { + cv_empty_.wait(lock, [this] { return off_ground_ < off_filled_; }); + } + batch.clear(); + while (off_ground_ < off_filled_) { + T elem = ring_buf_[off_ground_ & cap_mask_]; + batch.push_back(elem); + off_ground_++; + } + u64 seq = seq_get_++; + cv_full_.notify_one(); + return seq; + } + + void Iterate(std::function func) { + std::unique_lock lock(mutex_); + for (u64 off = off_ground_; off < off_filled_; off++) { + func(ring_buf_[off & cap_mask_]); + } + off_ground_ = off_filled_; + } +}; +} // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/segment.cppm b/src/storage/invertedindex/segment.cppm index 9e3b1c175a..59e1883320 100644 --- a/src/storage/invertedindex/segment.cppm +++ b/src/storage/invertedindex/segment.cppm @@ -55,7 +55,7 @@ export class Segment { public: enum SegmentStatus { BUILDING, // In memory segment - DUMPLING, // Flush + DUMPING, // Flush BUILT // Disk segment }; Segment(SegmentStatus segment_status) : segment_status_(segment_status) {} diff --git a/src/storage/invertedindex/sequential_column_inverter.cpp b/src/storage/invertedindex/sequential_column_inverter.cpp deleted file mode 100644 index d7f35449ae..0000000000 --- a/src/storage/invertedindex/sequential_column_inverter.cpp +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -#include -#include -#include - -module sequential_column_inverter; -import stl; -import analyzer; -import memory_pool; -import pool_allocator; -import string_ref; -import term; -import radix_sort; -import index_defines; -import memory_indexer; -namespace infinity { - -template -static u32 Align(u32 unaligned) { - return (unaligned + T - 1) & (-T); -} - -SequentialColumnInverter::SequentialColumnInverter(MemoryIndexer *memory_indexer) - : memory_indexer_(memory_indexer), analyzer_(memory_indexer->GetAnalyzer()), jieba_specialize_(memory_indexer->IsJiebaSpecialize()), - alloc_(memory_indexer->GetPool()), terms_(alloc_), positions_(alloc_), term_refs_(alloc_) {} - -SequentialColumnInverter::~SequentialColumnInverter() {} - -bool SequentialColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; } - -void SequentialColumnInverter::InvertColumn(const ColumnVector &column_vector, RowID start_row_id) { - docid_t start_doc_id = RowID2DocID(start_row_id); - for (SizeT i = 0; i < column_vector.Size(); ++i) { - String data = column_vector.ToString(i); - InvertColumn(start_doc_id + i, data); - } -} - -void SequentialColumnInverter::InvertColumn(u32 doc_id, const String &val) { - terms_once_.clear(); - analyzer_->Analyze(val, terms_once_, jieba_specialize_); - for (auto it = terms_once_.begin(); it != terms_once_.end(); ++it) { - StringRef term(it->text_); - u32 term_ref = AddTerm(term); - positions_.emplace_back(term_ref, doc_id, it->word_offset_); - } -} - -u32 SequentialColumnInverter::AddTerm(StringRef term) { - const u32 terms_size = terms_.size(); - const u32 unpadded_size = terms_size + 4 + term.size() + 1; - const u32 fully_padded_size = Align<4>(unpadded_size); - terms_.resize(fully_padded_size); - - char *buf = &terms_[0] + terms_size; - memset(buf, 0, 4); - memcpy(buf + 4, term.data(), term.size()); - memset(buf + 4 + term.size(), 0, fully_padded_size - unpadded_size + 1); - - u32 term_ref = (terms_size + 4) >> 2; - term_refs_.push_back(term_ref); - return term_ref; -} - -struct TermRefRadix { - u32 operator()(const u64 v) { return v >> 32; } -}; - -void SequentialColumnInverter::SortTerms() { - Vector first_four_bytes(term_refs_.size()); - for (u32 i = 1; i < term_refs_.size(); ++i) { - u64 first_four = ntohl(*reinterpret_cast(GetTermFromRef(term_refs_[i]))); - first_four_bytes[i] = (first_four << 32) | term_refs_[i]; - } - ShiftBasedRadixSorter::RadixSort(TermRefRadix(), - CompareTermRef(terms_), - &first_four_bytes[1], - first_four_bytes.size() - 1, - 16); - for (u32 i(1); i < first_four_bytes.size(); i++) { - term_refs_[i] = first_four_bytes[i] & 0xffffffffl; - } - auto term_ref_begin(term_refs_.begin() + 1); - uint32_t term_num = 1; // First valid term number - const char *last_term = GetTermFromRef(*term_ref_begin); - UpdateTermNum(*term_ref_begin, term_num); - for (++term_ref_begin; term_ref_begin != term_refs_.end(); ++term_ref_begin) { - const char *term = GetTermFromRef(*term_ref_begin); - int cmpres = strcmp(last_term, term); - // assert(cmpres <= 0); - if (cmpres < 0) { - ++term_num; - term_refs_[term_num] = *term_ref_begin; - last_term = term; - } - UpdateTermNum(*term_ref_begin, term_num); - } - // assert(term_refs_.size() >= term_num + 1); - term_refs_.resize(term_num + 1); - // Replace initial word reference by word number. - for (auto &p : positions_) { - p.term_num_ = GetTermNum(p.term_num_); - } -} - -struct FullRadix { - u64 operator()(const SequentialColumnInverter::PosInfo &p) const { return (static_cast(p.term_num_) << 32) | p.doc_id_; } -}; - -void SequentialColumnInverter::Commit() { - SortTerms(); - ShiftBasedRadixSorter, 56, true>::RadixSort(FullRadix(), - std::less(), - &positions_[0], - positions_.size(), - 16); - DoInsert(); - memory_indexer_->TryDump(); -} - -void SequentialColumnInverter::DoInsert() { - u32 last_term_num = 0; - u32 last_term_pos = 0; - u32 last_doc_id = 0; - StringRef term; - MemoryIndexer::PostingPtr posting = nullptr; - for (auto &i : positions_) { - if (last_term_num != i.term_num_ || last_doc_id != i.doc_id_) { - if (last_term_num != i.term_num_) { - last_term_num = i.term_num_; - term = GetTermFromNum(last_term_num); - posting = memory_indexer_->GetOrAddPosting(String(term.data())); - } - last_doc_id = i.doc_id_; - if (last_doc_id != 0) { - posting->EndDocument(last_doc_id, 0); - } - } - if (i.term_pos_ != last_term_pos) { - last_term_pos = i.term_pos_; - posting->AddPosition(last_term_pos); - } - } -} - -} // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/sequential_column_inverter.cppm b/src/storage/invertedindex/sequential_column_inverter.cppm deleted file mode 100644 index a705d5510a..0000000000 --- a/src/storage/invertedindex/sequential_column_inverter.cppm +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -module; - -export module sequential_column_inverter; - -import stl; -import analyzer; - -import column_vector; -import memory_pool; -import pool_allocator; -import term; -import string_ref; -import internal_types; -import column_inverter; - -namespace infinity{ - -class MemoryIndexer; -export class SequentialColumnInverter : public ColumnInverter, public ColumnCommitter { -public: - SequentialColumnInverter(MemoryIndexer *memory_indexer); - SequentialColumnInverter(const SequentialColumnInverter &) = delete; - SequentialColumnInverter(const SequentialColumnInverter &&) = delete; - SequentialColumnInverter &operator=(const SequentialColumnInverter &) = delete; - SequentialColumnInverter &operator=(const SequentialColumnInverter &&) = delete; - virtual ~SequentialColumnInverter(); - - void InvertColumn(const ColumnVector &column_vector, RowID start_row_id) override; - - void InvertColumn(u32 doc_id, const String &val) override; - - void Commit() override; - - struct PosInfo { - u32 term_num_{0}; - u32 doc_id_{0}; - u32 term_pos_{0}; - - bool operator<(const PosInfo &rhs) const { - if (term_num_ != rhs.term_num_) { - return term_num_ < rhs.term_num_; - } - if (doc_id_ != rhs.doc_id_) { - return doc_id_ < rhs.doc_id_; - } - return term_pos_ < rhs.term_pos_; - } - }; - -private: - using TermBuffer = Vector>; - using PosInfoVec = Vector>; - using U32Vec = Vector>; - - struct CompareTermRef { - const char *const term_buffer_; - - CompareTermRef(const TermBuffer &term_buffer) : term_buffer_(&term_buffer[0]) {} - - const char *GetTerm(u32 term_ref) const { return &term_buffer_[term_ref << 2]; } - - bool operator()(const u32 lhs, const u32 rhs) const; - }; - - const char *GetTermFromRef(u32 term_ref) const { return &terms_[term_ref << 2]; } - - const char *GetTermFromNum(u32 term_num) const { return GetTermFromRef(term_refs_[term_num]); } - - u32 GetTermNum(u32 term_ref) const { - const char *p = &terms_[(term_ref - 1) << 2]; - return *reinterpret_cast(p); - } - - void UpdateTermNum(u32 term_ref, u32 term_num) { - char *p = &terms_[(term_ref - 1) << 2]; - *reinterpret_cast(p) = term_num; - } - - u32 AddTerm(StringRef term); - - void SortTerms(); - - void DoInsert(); - - MemoryIndexer *memory_indexer_{nullptr}; - Analyzer *analyzer_{nullptr}; - bool jieba_specialize_{false}; - PoolAllocator alloc_; - TermBuffer terms_; - PosInfoVec positions_; - U32Vec term_refs_; - TermList terms_once_; -}; -} // namespace infinity diff --git a/src/storage/meta/entry/table_index_entry.cpp b/src/storage/meta/entry/table_index_entry.cpp index 082c41c585..c165775381 100644 --- a/src/storage/meta/entry/table_index_entry.cpp +++ b/src/storage/meta/entry/table_index_entry.cpp @@ -269,8 +269,9 @@ Status TableIndexEntry::CreateIndexPrepare(TableEntry *table_entry, BlockIndex * for (const auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) { fulltext_index_entry->indexer_->BatchInsert(block_entry, 0, block_entry->row_count(), buffer_mgr); } + fulltext_index_entry->indexer_->Commit(); + fulltext_index_entry->indexer_->Dump(); } - // Don't need to commit explictly here since it's done in the indexer_ thread. } } for (const auto &[column_id, column_index_entry] : column_index_map_) {