Skip to content

Commit

Permalink
Refactory fulltext index building into pipeline (#664)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Refactory fulltext index building into pipeline.
Issue link:#551

```
export class Indexer {
    // One thread can call this method. This's non-blocking.
    void BatchInsert(const BlockEntry *block_entry, u32 row_offset, u32 row_count, BufferManager *buffer_mgr);

    // A background thread of Indexer calls this method regularly (for example, every 2 seconds). This's non-blocking.
    // Other threads can also call this method.
    void Commit();

    // One thread can call this method when memory limit reach (for example, 200MB). This's blocking.
    void Dump();
}
```

### Type of change

- [x] Refactoring
  • Loading branch information
yuzhichang authored Feb 27, 2024
1 parent 1af0c66 commit 384e780
Show file tree
Hide file tree
Showing 21 changed files with 456 additions and 1,029 deletions.
4 changes: 2 additions & 2 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ export namespace std {
using std::is_floating_point_v;
using std::common_type_t;

using std::monostate;
using std::function;

using std::monostate;
using std::thread;
} // namespace std

namespace infinity {
Expand Down
42 changes: 23 additions & 19 deletions src/storage/invertedindex/column_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,47 @@ import fst;

namespace infinity {

ColumnIndexer::ColumnIndexer(Indexer *indexer,
u64 column_id,
ColumnIndexer::ColumnIndexer(u64 column_id,
const String directory,
const InvertedIndexConfig &index_config,
SharedPtr<MemoryPool> byte_slice_pool,
SharedPtr<RecyclePool> buffer_pool) {
active_memory_indexer_ = MakeUnique<MemoryIndexer>(indexer, this, column_id, index_config, byte_slice_pool, buffer_pool);
index_name_ = indexer->GetDirectory();
Path path = Path(index_name_) / std::to_string(column_id);
index_name_ = path.string();
SharedPtr<RecyclePool> buffer_pool,
ThreadPool &thread_pool)
: thread_pool_(thread_pool) {
active_memory_indexer_ = MakeUnique<MemoryIndexer>(column_id, index_config, byte_slice_pool, buffer_pool, thread_pool);
standby_memory_indexer_ = MakeUnique<MemoryIndexer>(column_id, index_config, byte_slice_pool, buffer_pool, thread_pool);
directory_ = directory;
std::error_code ec;
bool path_exists = std::filesystem::exists(path);
bool path_exists = std::filesystem::exists(directory);
if (!path_exists) {
std::filesystem::create_directories(path, ec);
std::filesystem::create_directories(directory, ec);
}
}

ColumnIndexer::~ColumnIndexer() {}

void ColumnIndexer::Insert(RowID row_id, String &data) { active_memory_indexer_->Insert(row_id, data); }

void ColumnIndexer::Insert(const ColumnVector &column_vector, RowID start_row_id) { active_memory_indexer_->Insert(column_vector, start_row_id); }

void ColumnIndexer::PreCommit() { active_memory_indexer_->PreCommit(); }
void ColumnIndexer::Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) {
active_memory_indexer_->Insert(column_vector, row_offset, row_count, row_id_begin);
}

void ColumnIndexer::Commit() { active_memory_indexer_->Commit(); }

void ColumnIndexer::Dump() {
active_memory_indexer_->DisableCommit();
Path path = Path(index_name_) / std::to_string(current_segment_id_);
{
std::unique_lock<std::mutex> lock(mutex_);
std::swap(active_memory_indexer_, standby_memory_indexer_);
}
standby_memory_indexer_->WaitInflightTasks();

Path path = Path(directory_) / std::to_string(current_segment_id_);
String index_prefix = path.string();
LocalFileSystem fs;
String posting_file = index_prefix + POSTING_SUFFIX;
SharedPtr<FileWriter> posting_file_writer = MakeShared<FileWriter>(fs, posting_file, 128000);
String dict_file = index_prefix + DICT_SUFFIX;
SharedPtr<FileWriter> dict_file_writer = MakeShared<FileWriter>(fs, dict_file, 128000);
MemoryIndexer::PostingTable *posting_table = active_memory_indexer_->GetPostingTable();
TermMetaDumper term_meta_dumpler(active_memory_indexer_->index_config_.GetPostingFormatOption());
MemoryIndexer::PostingTable *posting_table = standby_memory_indexer_->GetPostingTable();
TermMetaDumper term_meta_dumpler(standby_memory_indexer_->index_config_.GetPostingFormatOption());

String fst_file = index_prefix + DICT_SUFFIX + ".fst";
std::ofstream ofs(fst_file.c_str(), std::ios::binary | std::ios::trunc);
Expand All @@ -95,7 +99,7 @@ void ColumnIndexer::Dump() {
fs.AppendFile(dict_file, fst_file);
fs.DeleteFile(fst_file);
}
active_memory_indexer_->Reset();
standby_memory_indexer_->Reset();
}

} // namespace infinity
29 changes: 18 additions & 11 deletions src/storage/invertedindex/column_indexer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,36 @@ namespace infinity {
class Indexer;
export class ColumnIndexer {
public:
ColumnIndexer(Indexer *indexer,
u64 column_id,
ColumnIndexer(u64 column_id,
const String directory,
const InvertedIndexConfig &index_config,
SharedPtr<MemoryPool> byte_slice_pool,
SharedPtr<RecyclePool> buffer_pool);
SharedPtr<RecyclePool> buffer_pool,
ThreadPool &thread_pool);

~ColumnIndexer();

MemoryIndexer *GetMemoryIndexer() { return active_memory_indexer_.get(); }
// realtime insert
void Insert(RowID row_id, String &data);
MemoryIndexer *GetMemoryIndexer() {
std::unique_lock<std::mutex> lock(mutex_);
return active_memory_indexer_.get();
}

void Insert(const ColumnVector &column_vector, RowID start_row_id);

void PreCommit();
// One thread can call this method. This's non-blocking.
void Insert(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin);

// A background thread of Indexer calls this method regularly (for example, every 2 seconds). This's non-blocking.
// Other threads can also call this method.
void Commit();

// One thread can call this method when memory limit reach (for example, 200MB). This's blocking.
void Dump();

private:
UniquePtr<MemoryIndexer> active_memory_indexer_;
String index_name_;
std::mutex mutex_;
UniquePtr<MemoryIndexer> active_memory_indexer_, standby_memory_indexer_;

String directory_;
ThreadPool &thread_pool_;
u32 current_segment_id_{0};
};

Expand Down
166 changes: 146 additions & 20 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,165 @@

module;

module column_inverter;
#include <arpa/inet.h>
#include <cstring>
#include <vector>

module column_inverter;
import stl;
import column_vector;
import internal_types;

import analyzer;
import memory_pool;
import pool_allocator;
import string_ref;
import term;
import radix_sort;
import index_defines;
import memory_indexer;
namespace infinity {

RefCount::RefCount() : lock_(), cv_(), ref_count_(0u) {}
template <u32 T>
static u32 Align(u32 unaligned) {
return (unaligned + T - 1) & (-T);
}

ColumnInverter::ColumnInverter(MemoryIndexer &memory_indexer)
: memory_indexer_(memory_indexer), analyzer_(memory_indexer.GetAnalyzer()), jieba_specialize_(memory_indexer.IsJiebaSpecialize()),
alloc_(memory_indexer.GetPool()), terms_(alloc_), positions_(alloc_), term_refs_(alloc_) {}

bool ColumnInverter::CompareTermRef::operator()(const u32 lhs, const u32 rhs) const { return std::strcmp(GetTerm(lhs), GetTerm(rhs)) < 0; }

void ColumnInverter::InvertColumn(const ColumnVector &column_vector, u32 row_offset, u32 row_count, RowID row_id_begin) {
docid_t start_doc_id = RowID2DocID(row_id_begin);
for (SizeT i = 0; i < row_count; ++i) {
String data = column_vector.ToString(row_offset + i);
InvertColumn(start_doc_id + i, data);
}
}

void ColumnInverter::InvertColumn(u32 doc_id, const String &val) {
auto terms_once_ = MakeUnique<TermList>();
analyzer_->Analyze(val, *terms_once_, jieba_specialize_);
terms_per_doc_.push_back(Pair<u32, UniquePtr<TermList>>(doc_id, std::move(terms_once_)));
}

u32 ColumnInverter::AddTerm(StringRef term) {
const u32 terms_size = terms_.size();
const u32 unpadded_size = terms_size + 4 + term.size() + 1;
const u32 fully_padded_size = Align<4>(unpadded_size);
terms_.resize(fully_padded_size);

RefCount::~RefCount() {}
char *buf = &terms_[0] + terms_size;
memset(buf, 0, 4);
memcpy(buf + 4, term.data(), term.size());
memset(buf + 4 + term.size(), 0, fully_padded_size - unpadded_size + 1);

void RefCount::Retain() noexcept {
std::lock_guard<std::mutex> guard(lock_);
++ref_count_;
u32 term_ref = (terms_size + 4) >> 2;
term_refs_.push_back(term_ref);
return term_ref;
}

void RefCount::Release() noexcept {
std::lock_guard<std::mutex> guard(lock_);
--ref_count_;
if (ref_count_ == 0u) {
cv_.notify_all();
void ColumnInverter::Merge(ColumnInverter &rhs) {
if (positions_.empty()) {
for (auto &doc_terms : terms_per_doc_) {
u32 doc_id = doc_terms.first;
auto &terms_once_ = doc_terms.second;
for (auto it = terms_once_->begin(); it != terms_once_->end(); ++it) {
StringRef term(it->text_);
u32 term_ref = AddTerm(term);
positions_.emplace_back(term_ref, doc_id, it->word_offset_);
}
}
terms_per_doc_.clear();
}
for (auto &doc_terms : rhs.terms_per_doc_) {
u32 doc_id = doc_terms.first;
auto &terms_once_ = doc_terms.second;
for (auto it = terms_once_->begin(); it != terms_once_->end(); ++it) {
StringRef term(it->text_);
u32 term_ref = AddTerm(term);
positions_.emplace_back(term_ref, doc_id, it->word_offset_);
}
}
rhs.terms_per_doc_.clear();
}

void RefCount::WaitForZeroRefCount() {
std::unique_lock<std::mutex> guard(lock_);
cv_.wait(guard, [this] { return (ref_count_ == 0u); });
struct TermRefRadix {
u32 operator()(const u64 v) { return v >> 32; }
};

void ColumnInverter::SortTerms() {
Vector<u64> first_four_bytes(term_refs_.size());
for (u32 i = 1; i < term_refs_.size(); ++i) {
u64 first_four = ntohl(*reinterpret_cast<const u32 *>(GetTermFromRef(term_refs_[i])));
first_four_bytes[i] = (first_four << 32) | term_refs_[i];
}
ShiftBasedRadixSorter<u64, TermRefRadix, CompareTermRef, 24, true>::RadixSort(TermRefRadix(),
CompareTermRef(terms_),
&first_four_bytes[1],
first_four_bytes.size() - 1,
16);
for (u32 i(1); i < first_four_bytes.size(); i++) {
term_refs_[i] = first_four_bytes[i] & 0xffffffffl;
}
auto term_ref_begin(term_refs_.begin() + 1);
uint32_t term_num = 1; // First valid term number
const char *last_term = GetTermFromRef(*term_ref_begin);
UpdateTermNum(*term_ref_begin, term_num);
for (++term_ref_begin; term_ref_begin != term_refs_.end(); ++term_ref_begin) {
const char *term = GetTermFromRef(*term_ref_begin);
int cmpres = strcmp(last_term, term);
// assert(cmpres <= 0);
if (cmpres < 0) {
++term_num;
term_refs_[term_num] = *term_ref_begin;
last_term = term;
}
UpdateTermNum(*term_ref_begin, term_num);
}
// assert(term_refs_.size() >= term_num + 1);
term_refs_.resize(term_num + 1);
// Replace initial word reference by word number.
for (auto &p : positions_) {
p.term_num_ = GetTermNum(p.term_num_);
}
}

bool RefCount::ZeroRefCount() {
std::unique_lock<std::mutex> guard(lock_);
return (ref_count_ == 0u);
struct FullRadix {
u64 operator()(const ColumnInverter::PosInfo &p) const { return (static_cast<u64>(p.term_num_) << 32) | p.doc_id_; }
};

void ColumnInverter::Sort() {
SortTerms();
ShiftBasedRadixSorter<PosInfo, FullRadix, std::less<PosInfo>, 56, true>::RadixSort(FullRadix(),
std::less<PosInfo>(),
&positions_[0],
positions_.size(),
16);
}

void ColumnInverter::GeneratePosting() {
u32 last_term_num = 0;
u32 last_term_pos = 0;
u32 last_doc_id = 0;
StringRef term;
MemoryIndexer::PostingPtr posting = nullptr;
for (auto &i : positions_) {
if (last_term_num != i.term_num_ || last_doc_id != i.doc_id_) {
if (last_term_num != i.term_num_) {
last_term_num = i.term_num_;
term = GetTermFromNum(last_term_num);
posting = memory_indexer_.GetOrAddPosting(String(term.data()));
}
last_doc_id = i.doc_id_;
if (last_doc_id != 0) {
posting->EndDocument(last_doc_id, 0);
}
}
if (i.term_pos_ != last_term_pos) {
last_term_pos = i.term_pos_;
posting->AddPosition(last_term_pos);
}
}
}

} // namespace infinity
Loading

0 comments on commit 384e780

Please sign in to comment.