Skip to content

Commit

Permalink
Refactor ColumnInverter (#756)
Browse files Browse the repository at this point in the history
Refactory ColumnInverter

Issue link:#551

### Type of change

- [x] Refactoring
  • Loading branch information
yuzhichang authored Mar 11, 2024
1 parent de951a1 commit 61a8c6a
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 33 deletions.
6 changes: 4 additions & 2 deletions src/common/analyzer/analyzer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public:
convert_to_placeholder_ = convert_to_placeholder;
}

int Analyze(const Term &input, TermList &output, bool jieba_specialize = false) {
int Analyze(const Term &input, TermList &output) {
void *array[2] = {&output, this};
if (jieba_specialize)
if (IsJiebaSpecialize())
return AnalyzeImpl(input, &array, &Analyzer::AppendTermListForJieba);
else
return AnalyzeImpl(input, &array, &Analyzer::AppendTermList);
Expand All @@ -49,6 +49,8 @@ protected:

typedef void (*HookTypeForJieba)(void *data, cppjieba::Word &cut_words);

virtual bool IsJiebaSpecialize() { return false; }

virtual int AnalyzeImpl(const Term &input, void *data, HookType func) { return -1; }

virtual int AnalyzeImpl(const Term &input, void *data, HookTypeForJieba func) { return -1; }
Expand Down
1 change: 1 addition & 0 deletions src/common/analyzer/chinese_analyzer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public:

protected:
inline void Parse(const String &input) { jieba_->CutForSearch(input, cut_words_, true); }
bool IsJiebaSpecialize() override { return true; }
int AnalyzeImpl(const Term &input, void *data, HookTypeForJieba func) override;

private:
Expand Down
15 changes: 9 additions & 6 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@ module;
module column_inverter;
import stl;
import analyzer;
import analyzer_pool;
import memory_pool;
import pool_allocator;
import string_ref;
import term;
import radix_sort;
import index_defines;
import memory_indexer;
import posting_writer;

namespace infinity {

template <u32 T>
static u32 Align(u32 unaligned) {
return (unaligned + T - 1) & (-T);
}

ColumnInverter::ColumnInverter(MemoryIndexer &memory_indexer)
: memory_indexer_(memory_indexer), analyzer_(memory_indexer.GetAnalyzer()), jieba_specialize_(memory_indexer.IsJiebaSpecialize()),
alloc_(memory_indexer.GetPool()), terms_(alloc_), positions_(alloc_), term_refs_(alloc_) {}
ColumnInverter::ColumnInverter(const String &analyzer, MemoryPool *memory_pool, PostingWriterProvider posting_writer_provider)
: analyzer_(AnalyzerPool::instance().Get(analyzer)), alloc_(memory_pool), terms_(alloc_), positions_(alloc_), term_refs_(alloc_),
posting_writer_provider_(posting_writer_provider) {}

bool ColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; }

Expand All @@ -51,7 +54,7 @@ void ColumnInverter::InvertColumn(const ColumnVector &column_vector, u32 row_off

void ColumnInverter::InvertColumn(u32 doc_id, const String &val) {
auto terms_once_ = MakeUnique<TermList>();
analyzer_->Analyze(val, *terms_once_, jieba_specialize_);
analyzer_->Analyze(val, *terms_once_);
terms_per_doc_.push_back(Pair<u32, UniquePtr<TermList>>(doc_id, std::move(terms_once_)));
}

Expand Down Expand Up @@ -155,13 +158,13 @@ void ColumnInverter::GeneratePosting() {
u32 last_term_pos = 0;
u32 last_doc_id = INVALID_DOCID;
StringRef term;
MemoryIndexer::PostingPtr posting = nullptr;
SharedPtr<PostingWriter> posting = nullptr;
for (auto &i : positions_) {
if (last_term_num != i.term_num_ || last_doc_id != i.doc_id_) {
if (last_term_num != i.term_num_) {
last_term_num = i.term_num_;
term = GetTermFromNum(last_term_num);
posting = memory_indexer_.GetOrAddPosting(String(term.data()));
posting = posting_writer_provider_(String(term.data()));
}
last_doc_id = i.doc_id_;
if (last_doc_id != INVALID_DOCID) {
Expand Down
9 changes: 4 additions & 5 deletions src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import pool_allocator;
import term;
import string_ref;
import internal_types;
import posting_writer;

namespace infinity {

class MemoryIndexer;
export class ColumnInverter {
public:
ColumnInverter(MemoryIndexer &memory_indexer);
ColumnInverter(const String &analyzer, MemoryPool *memory_pool, PostingWriterProvider posting_writer_provider);
ColumnInverter(const ColumnInverter &) = delete;
ColumnInverter(const ColumnInverter &&) = delete;
ColumnInverter &operator=(const ColumnInverter &) = delete;
Expand Down Expand Up @@ -100,13 +100,12 @@ private:

void SortTerms();

MemoryIndexer &memory_indexer_;
Analyzer *analyzer_{nullptr};
bool jieba_specialize_{false};
UniquePtr<Analyzer> analyzer_{nullptr};
PoolAllocator<char> alloc_;
TermBuffer terms_;
PosInfoVec positions_;
U32Vec term_refs_;
Vector<Pair<u32, UniquePtr<TermList>>> terms_per_doc_;
PostingWriterProvider posting_writer_provider_{};
};
} // namespace infinity
15 changes: 7 additions & 8 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,17 @@ bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) co
MemoryIndexer::MemoryIndexer(const String &index_dir,
const String &base_name,
docid_t base_doc_id,
const String &analyzer,
optionflag_t flag,
const String &analyzer,
MemoryPool &byte_slice_pool,
RecyclePool &buffer_pool,
ThreadPool &thread_pool)
: index_dir_(index_dir), base_name_(base_name), base_doc_id_(base_doc_id), flag_(flag), byte_slice_pool_(byte_slice_pool),
: index_dir_(index_dir), base_name_(base_name), base_doc_id_(base_doc_id), flag_(flag), analyzer_(analyzer), byte_slice_pool_(byte_slice_pool),
buffer_pool_(buffer_pool), thread_pool_(thread_pool), ring_inverted_(10UL), ring_sorted_(10UL) {
posting_store_ = MakeUnique<PostingTable>(KeyComp(), &byte_slice_pool_);

Path path = Path(index_dir) / "tmp.merge";
spill_full_path_ = path.string();
analyzer_ = AnalyzerPool::instance().Get(analyzer);
jieba_specialize_ = analyzer.compare("chinese") == 0 ? true : false;
}

MemoryIndexer::~MemoryIndexer() { Reset(); }
Expand All @@ -83,8 +81,9 @@ void MemoryIndexer::Insert(const ColumnVector &column_vector, u32 row_offset, u3
}
u64 seq_inserted = seq_inserted_++;
auto task = MakeShared<BatchInvertTask>(seq_inserted, column_vector, row_offset, row_count, doc_count_);
auto func = [this, &task](int id) {
auto inverter = MakeShared<ColumnInverter>(*this);
PostingWriterProvider provider = [this](const String &term) -> SharedPtr<PostingWriter> { return GetOrAddPosting(term); };
auto func = [this, &task, &provider](int id) {
auto inverter = MakeShared<ColumnInverter>(analyzer_, &byte_slice_pool_, provider);
inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
this->ring_inverted_.Put(task->task_seq_, inverter);
};
Expand Down Expand Up @@ -149,12 +148,12 @@ void MemoryIndexer::Dump() {
Reset();
}

MemoryIndexer::PostingPtr MemoryIndexer::GetOrAddPosting(const TermKey &term) {
SharedPtr<PostingWriter> MemoryIndexer::GetOrAddPosting(const String &term) {
MemoryIndexer::PostingTable::Iterator iter = posting_store_->Find(term);
if (iter != posting_store_->End())
return iter.Value();
else {
MemoryIndexer::PostingPtr posting = MakeShared<PostingWriter>(&byte_slice_pool_, &buffer_pool_, PostingFormatOption(flag_));
SharedPtr<PostingWriter> posting = MakeShared<PostingWriter>(&byte_slice_pool_, &buffer_pool_, PostingFormatOption(flag_));
posting_store_->Insert(term, posting);
return posting;
}
Expand Down
15 changes: 4 additions & 11 deletions src/storage/invertedindex/memory_indexer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import memory_pool;
import index_defines;
import posting_writer;
import column_vector;
import analyzer;
import column_inverter;
import third_party;
import internal_types;
Expand All @@ -36,15 +35,14 @@ public:
bool operator()(const String &lhs, const String &rhs) const;
};

using TermKey = String;
using PostingPtr = SharedPtr<PostingWriter>;
using PostingTable = SkipList<TermKey, PostingPtr, KeyComp>;
using PostingTable = SkipList<String, PostingPtr, KeyComp>;

MemoryIndexer(const String &index_dir,
const String &base_name,
docid_t base_doc_id,
const String &analyzer,
optionflag_t flag,
const String &analyzer,
MemoryPool &byte_slice_pool,
RecyclePool &buffer_pool,
ThreadPool &thread_pool);
Expand All @@ -66,15 +64,11 @@ public:

u32 GetDocCount() const { return doc_count_; }

Analyzer *GetAnalyzer() { return analyzer_.get(); }

bool IsJiebaSpecialize() { return jieba_specialize_; }

MemoryPool *GetPool() { return &byte_slice_pool_; }

PostingTable *GetPostingTable() { return posting_store_.get(); }

PostingPtr GetOrAddPosting(const TermKey &term);
SharedPtr<PostingWriter> GetOrAddPosting(const String &term);

void Reset();

Expand All @@ -95,11 +89,10 @@ private:
String base_name_;
docid_t base_doc_id_{INVALID_DOCID};
optionflag_t flag_;
String analyzer_;
MemoryPool &byte_slice_pool_;
RecyclePool &buffer_pool_;
ThreadPool &thread_pool_;
UniquePtr<Analyzer> analyzer_;
bool jieba_specialize_{false};
u32 doc_count_{0};
UniquePtr<PostingTable> posting_store_;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/invertedindex/posting_writer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ private:
DocListEncoder *doc_list_encoder_ = nullptr;
PositionListEncoder *position_list_encoder_ = nullptr;
};

export using PostingWriterProvider = std::function<SharedPtr<PostingWriter>(const String &)>;

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ Status SegmentIndexEntry::CreateIndexPrepare(const IndexBase *index_base,
memory_indexer_ = MakeUnique<MemoryIndexer>(*table_index_entry_->index_dir(),
base_name,
ToDocID(base_row_id),
index_fulltext->analyzer_,
index_fulltext->flag_,
index_fulltext->analyzer_,
table_index_entry_->GetFulltextByteSlicePool(),
table_index_entry_->GetFulltextBufferPool(),
table_index_entry_->GetFulltextThreadPool());
Expand Down

0 comments on commit 61a8c6a

Please sign in to comment.