From 61a8c6a6da30e59b66d92708bbb072f11729bcb5 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Mon, 11 Mar 2024 20:40:24 +0800 Subject: [PATCH] Refactor ColumnInverter (#756) Refactory ColumnInverter Issue link:#551 ### Type of change - [x] Refactoring --- src/common/analyzer/analyzer.cppm | 6 ++++-- src/common/analyzer/chinese_analyzer.cppm | 1 + src/storage/invertedindex/column_inverter.cpp | 15 +++++++++------ src/storage/invertedindex/column_inverter.cppm | 9 ++++----- src/storage/invertedindex/memory_indexer.cpp | 15 +++++++-------- src/storage/invertedindex/memory_indexer.cppm | 15 ++++----------- src/storage/invertedindex/posting_writer.cppm | 3 +++ src/storage/meta/entry/segment_index_entry.cpp | 2 +- 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/common/analyzer/analyzer.cppm b/src/common/analyzer/analyzer.cppm index 13c46fd721..e8d78f930d 100644 --- a/src/common/analyzer/analyzer.cppm +++ b/src/common/analyzer/analyzer.cppm @@ -35,9 +35,9 @@ public: convert_to_placeholder_ = convert_to_placeholder; } - int Analyze(const Term &input, TermList &output, bool jieba_specialize = false) { + int Analyze(const Term &input, TermList &output) { void *array[2] = {&output, this}; - if (jieba_specialize) + if (IsJiebaSpecialize()) return AnalyzeImpl(input, &array, &Analyzer::AppendTermListForJieba); else return AnalyzeImpl(input, &array, &Analyzer::AppendTermList); @@ -49,6 +49,8 @@ protected: typedef void (*HookTypeForJieba)(void *data, cppjieba::Word &cut_words); + virtual bool IsJiebaSpecialize() { return false; } + virtual int AnalyzeImpl(const Term &input, void *data, HookType func) { return -1; } virtual int AnalyzeImpl(const Term &input, void *data, HookTypeForJieba func) { return -1; } diff --git a/src/common/analyzer/chinese_analyzer.cppm b/src/common/analyzer/chinese_analyzer.cppm index 194367cc33..fbba2e1311 100644 --- a/src/common/analyzer/chinese_analyzer.cppm +++ b/src/common/analyzer/chinese_analyzer.cppm @@ -36,6 +36,7 @@ public: protected: inline void Parse(const String &input) { jieba_->CutForSearch(input, cut_words_, true); } + bool IsJiebaSpecialize() override { return true; } int AnalyzeImpl(const Term &input, void *data, HookTypeForJieba func) override; private: diff --git a/src/storage/invertedindex/column_inverter.cpp b/src/storage/invertedindex/column_inverter.cpp index 3dca76e4c6..e9420a6857 100644 --- a/src/storage/invertedindex/column_inverter.cpp +++ b/src/storage/invertedindex/column_inverter.cpp @@ -22,6 +22,7 @@ module; module column_inverter; import stl; import analyzer; +import analyzer_pool; import memory_pool; import pool_allocator; import string_ref; @@ -29,6 +30,8 @@ import term; import radix_sort; import index_defines; import memory_indexer; +import posting_writer; + namespace infinity { template @@ -36,9 +39,9 @@ static u32 Align(u32 unaligned) { return (unaligned + T - 1) & (-T); } -ColumnInverter::ColumnInverter(MemoryIndexer &memory_indexer) - : memory_indexer_(memory_indexer), analyzer_(memory_indexer.GetAnalyzer()), jieba_specialize_(memory_indexer.IsJiebaSpecialize()), - alloc_(memory_indexer.GetPool()), terms_(alloc_), positions_(alloc_), term_refs_(alloc_) {} +ColumnInverter::ColumnInverter(const String &analyzer, MemoryPool *memory_pool, PostingWriterProvider posting_writer_provider) + : analyzer_(AnalyzerPool::instance().Get(analyzer)), alloc_(memory_pool), terms_(alloc_), positions_(alloc_), term_refs_(alloc_), + posting_writer_provider_(posting_writer_provider) {} bool ColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; } @@ -51,7 +54,7 @@ void ColumnInverter::InvertColumn(const ColumnVector &column_vector, u32 row_off void ColumnInverter::InvertColumn(u32 doc_id, const String &val) { auto terms_once_ = MakeUnique(); - analyzer_->Analyze(val, *terms_once_, jieba_specialize_); + analyzer_->Analyze(val, *terms_once_); terms_per_doc_.push_back(Pair>(doc_id, std::move(terms_once_))); } @@ -155,13 +158,13 @@ void ColumnInverter::GeneratePosting() { u32 last_term_pos = 0; u32 last_doc_id = INVALID_DOCID; StringRef term; - MemoryIndexer::PostingPtr posting = nullptr; + SharedPtr posting = nullptr; for (auto &i : positions_) { if (last_term_num != i.term_num_ || last_doc_id != i.doc_id_) { if (last_term_num != i.term_num_) { last_term_num = i.term_num_; term = GetTermFromNum(last_term_num); - posting = memory_indexer_.GetOrAddPosting(String(term.data())); + posting = posting_writer_provider_(String(term.data())); } last_doc_id = i.doc_id_; if (last_doc_id != INVALID_DOCID) { diff --git a/src/storage/invertedindex/column_inverter.cppm b/src/storage/invertedindex/column_inverter.cppm index 38d0ddc34f..662cf572e8 100644 --- a/src/storage/invertedindex/column_inverter.cppm +++ b/src/storage/invertedindex/column_inverter.cppm @@ -27,13 +27,13 @@ import pool_allocator; import term; import string_ref; import internal_types; +import posting_writer; namespace infinity { -class MemoryIndexer; export class ColumnInverter { public: - ColumnInverter(MemoryIndexer &memory_indexer); + ColumnInverter(const String &analyzer, MemoryPool *memory_pool, PostingWriterProvider posting_writer_provider); ColumnInverter(const ColumnInverter &) = delete; ColumnInverter(const ColumnInverter &&) = delete; ColumnInverter &operator=(const ColumnInverter &) = delete; @@ -100,13 +100,12 @@ private: void SortTerms(); - MemoryIndexer &memory_indexer_; - Analyzer *analyzer_{nullptr}; - bool jieba_specialize_{false}; + UniquePtr analyzer_{nullptr}; PoolAllocator alloc_; TermBuffer terms_; PosInfoVec positions_; U32Vec term_refs_; Vector>> terms_per_doc_; + PostingWriterProvider posting_writer_provider_{}; }; } // namespace infinity diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index ef35e14805..9a6d47ddb4 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -59,19 +59,17 @@ bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) co MemoryIndexer::MemoryIndexer(const String &index_dir, const String &base_name, docid_t base_doc_id, - const String &analyzer, optionflag_t flag, + const String &analyzer, MemoryPool &byte_slice_pool, RecyclePool &buffer_pool, ThreadPool &thread_pool) - : index_dir_(index_dir), base_name_(base_name), base_doc_id_(base_doc_id), flag_(flag), byte_slice_pool_(byte_slice_pool), + : index_dir_(index_dir), base_name_(base_name), base_doc_id_(base_doc_id), flag_(flag), analyzer_(analyzer), byte_slice_pool_(byte_slice_pool), buffer_pool_(buffer_pool), thread_pool_(thread_pool), ring_inverted_(10UL), ring_sorted_(10UL) { posting_store_ = MakeUnique(KeyComp(), &byte_slice_pool_); Path path = Path(index_dir) / "tmp.merge"; spill_full_path_ = path.string(); - analyzer_ = AnalyzerPool::instance().Get(analyzer); - jieba_specialize_ = analyzer.compare("chinese") == 0 ? true : false; } MemoryIndexer::~MemoryIndexer() { Reset(); } @@ -83,8 +81,9 @@ void MemoryIndexer::Insert(const ColumnVector &column_vector, u32 row_offset, u3 } u64 seq_inserted = seq_inserted_++; auto task = MakeShared(seq_inserted, column_vector, row_offset, row_count, doc_count_); - auto func = [this, &task](int id) { - auto inverter = MakeShared(*this); + PostingWriterProvider provider = [this](const String &term) -> SharedPtr { return GetOrAddPosting(term); }; + auto func = [this, &task, &provider](int id) { + auto inverter = MakeShared(analyzer_, &byte_slice_pool_, provider); inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_); this->ring_inverted_.Put(task->task_seq_, inverter); }; @@ -149,12 +148,12 @@ void MemoryIndexer::Dump() { Reset(); } -MemoryIndexer::PostingPtr MemoryIndexer::GetOrAddPosting(const TermKey &term) { +SharedPtr MemoryIndexer::GetOrAddPosting(const String &term) { MemoryIndexer::PostingTable::Iterator iter = posting_store_->Find(term); if (iter != posting_store_->End()) return iter.Value(); else { - MemoryIndexer::PostingPtr posting = MakeShared(&byte_slice_pool_, &buffer_pool_, PostingFormatOption(flag_)); + SharedPtr posting = MakeShared(&byte_slice_pool_, &buffer_pool_, PostingFormatOption(flag_)); posting_store_->Insert(term, posting); return posting; } diff --git a/src/storage/invertedindex/memory_indexer.cppm b/src/storage/invertedindex/memory_indexer.cppm index ea1720d359..3286bf3692 100644 --- a/src/storage/invertedindex/memory_indexer.cppm +++ b/src/storage/invertedindex/memory_indexer.cppm @@ -22,7 +22,6 @@ import memory_pool; import index_defines; import posting_writer; import column_vector; -import analyzer; import column_inverter; import third_party; import internal_types; @@ -36,15 +35,14 @@ public: bool operator()(const String &lhs, const String &rhs) const; }; - using TermKey = String; using PostingPtr = SharedPtr; - using PostingTable = SkipList; + using PostingTable = SkipList; MemoryIndexer(const String &index_dir, const String &base_name, docid_t base_doc_id, - const String &analyzer, optionflag_t flag, + const String &analyzer, MemoryPool &byte_slice_pool, RecyclePool &buffer_pool, ThreadPool &thread_pool); @@ -66,15 +64,11 @@ public: u32 GetDocCount() const { return doc_count_; } - Analyzer *GetAnalyzer() { return analyzer_.get(); } - - bool IsJiebaSpecialize() { return jieba_specialize_; } - MemoryPool *GetPool() { return &byte_slice_pool_; } PostingTable *GetPostingTable() { return posting_store_.get(); } - PostingPtr GetOrAddPosting(const TermKey &term); + SharedPtr GetOrAddPosting(const String &term); void Reset(); @@ -95,11 +89,10 @@ private: String base_name_; docid_t base_doc_id_{INVALID_DOCID}; optionflag_t flag_; + String analyzer_; MemoryPool &byte_slice_pool_; RecyclePool &buffer_pool_; ThreadPool &thread_pool_; - UniquePtr analyzer_; - bool jieba_specialize_{false}; u32 doc_count_{0}; UniquePtr posting_store_; diff --git a/src/storage/invertedindex/posting_writer.cppm b/src/storage/invertedindex/posting_writer.cppm index ec89737b27..10bcd0afd6 100644 --- a/src/storage/invertedindex/posting_writer.cppm +++ b/src/storage/invertedindex/posting_writer.cppm @@ -49,4 +49,7 @@ private: DocListEncoder *doc_list_encoder_ = nullptr; PositionListEncoder *position_list_encoder_ = nullptr; }; + +export using PostingWriterProvider = std::function(const String &)>; + } // namespace infinity \ No newline at end of file diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index 4ed651d28b..f923838c15 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -268,8 +268,8 @@ Status SegmentIndexEntry::CreateIndexPrepare(const IndexBase *index_base, memory_indexer_ = MakeUnique(*table_index_entry_->index_dir(), base_name, ToDocID(base_row_id), - index_fulltext->analyzer_, index_fulltext->flag_, + index_fulltext->analyzer_, table_index_entry_->GetFulltextByteSlicePool(), table_index_entry_->GetFulltextBufferPool(), table_index_entry_->GetFulltextThreadPool());