Skip to content

Commit

Permalink
Fix RowID for index merger (#768)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix RowID for inverted index merger,  use RowID instead of docid_t

Issue link:#366

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Refactoring
  • Loading branch information
yingfeng authored Mar 12, 2024
1 parent efab874 commit 449206b
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 19 deletions.
5 changes: 3 additions & 2 deletions src/storage/invertedindex/column_index_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import index_full_text;
import column_index_iterator;
import segment_term_posting;
import fst;
import internal_types;

namespace infinity {
ColumnIndexMerger::ColumnIndexMerger(const String &index_dir, optionflag_t flag, MemoryPool *memory_pool, RecyclePool *buffer_pool)
Expand All @@ -28,7 +29,7 @@ ColumnIndexMerger::~ColumnIndexMerger() {}

SharedPtr<PostingMerger> ColumnIndexMerger::CreatePostingMerger() { return MakeShared<PostingMerger>(memory_pool_, buffer_pool_); }

void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<docid_t> &base_docids, const String &dst_base_name) {
void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<RowID> &base_rowids, const String &dst_base_name) {
Path path = Path(index_dir_) / dst_base_name;
String index_prefix = path.string();
String dict_file = index_prefix + DICT_SUFFIX;
Expand All @@ -43,7 +44,7 @@ void ColumnIndexMerger::Merge(const Vector<String> &base_names, const Vector<doc
OstreamWriter wtr(ofs);
FstBuilder fst_builder(wtr);

SegmentTermPostingQueue term_posting_queue(index_dir_, base_names, base_docids, flag_);
SegmentTermPostingQueue term_posting_queue(index_dir_, base_names, base_rowids, flag_);
String term;
TermMeta term_meta;
SizeT term_meta_offset = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_index_merger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import index_full_text;
import column_index_iterator;
import segment_term_posting;
import local_file_system;
import internal_types;

namespace infinity {
export class ColumnIndexMerger {
public:
ColumnIndexMerger(const String &index_dir, optionflag_t flag, MemoryPool *memory_pool, RecyclePool *buffer_pool);
~ColumnIndexMerger();

void Merge(const Vector<String> &base_names, const Vector<docid_t> &base_docids, const String &dst_base_name);
void Merge(const Vector<String> &base_names, const Vector<RowID> &base_rowids, const String &dst_base_name);

private:
SharedPtr<PostingMerger> CreatePostingMerger();
Expand Down
10 changes: 5 additions & 5 deletions src/storage/invertedindex/posting_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import posting_decoder;
import term_meta;
import column_index_iterator;
import segment_term_posting;
import internal_types;

namespace infinity {

Expand Down Expand Up @@ -146,7 +147,7 @@ class PostingDumper {

class SortedPosting {
public:
SortedPosting(const PostingFormatOption &format_option, docid_t base_doc_id, PostingDecoder *posting_decoder)
SortedPosting(const PostingFormatOption &format_option, RowID base_doc_id, PostingDecoder *posting_decoder)
: format_option_(format_option), base_doc_id_(base_doc_id), doc_merger_(format_option, posting_decoder) {}
~SortedPosting() {}

Expand All @@ -167,12 +168,11 @@ class SortedPosting {
doc_merger_.Merge(current_doc_id_, posting_writer.get());
}

docid_t GetCurrentDocID() const { return current_doc_id_; }
RowID GetCurrentDocID() const { return base_doc_id_ + current_doc_id_; }

private:
PostingFormatOption format_option_;
docid_t base_doc_id_;
segmentid_t current_segment_{INVALID_SEGMENTID};
RowID base_doc_id_;
docid_t current_doc_id_{INVALID_DOCID};
DocMerger doc_merger_;
};
Expand All @@ -193,7 +193,7 @@ void PostingMerger::Merge(const Vector<SegmentTermPosting *> &segment_term_posti
PriorityQueue queue;
for (u32 i = 0; i < segment_term_postings.size(); ++i) {
SegmentTermPosting *term_posting = segment_term_postings[i];
u64 base_doc_id = term_posting->GetBaesDocId();
RowID base_doc_id = term_posting->GetBaesDocId();
PostingDecoder *decoder = term_posting->GetPostingDecoder();
SortedPosting *sorted_posting = new SortedPosting(format_option_, base_doc_id, decoder);
queue.push(sorted_posting);
Expand Down
10 changes: 5 additions & 5 deletions src/storage/invertedindex/segment_term_posting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import column_index_iterator;

namespace infinity {

SegmentTermPosting::SegmentTermPosting(const String &index_dir, const String &base_name, docid_t base_doc_id, optionflag_t flag)
: base_doc_id_(base_doc_id) {
SegmentTermPosting::SegmentTermPosting(const String &index_dir, const String &base_name, RowID base_doc_id, optionflag_t flag)
: base_row_id_(base_doc_id) {
column_index_iterator_ = MakeShared<ColumnIndexIterator>(index_dir, base_name, flag);
}

Expand All @@ -25,11 +25,11 @@ bool SegmentTermPosting::HasNext() {

SegmentTermPostingQueue::SegmentTermPostingQueue(const String &index_dir,
const Vector<String> &base_names,
const Vector<docid_t> &base_docids,
const Vector<RowID> &base_rowids,
optionflag_t flag)
: index_dir_(index_dir), base_names_(base_names), base_docids_(base_docids) {
: index_dir_(index_dir), base_names_(base_names), base_docids_(base_rowids) {
for (u32 i = 0; i < base_names.size(); ++i) {
SegmentTermPosting *segment_term_posting = new SegmentTermPosting(index_dir, base_names[i], base_docids[i], flag);
SegmentTermPosting *segment_term_posting = new SegmentTermPosting(index_dir, base_names[i], base_rowids[i], flag);
if (segment_term_posting->HasNext()) {
segment_term_postings_.push(segment_term_posting);
} else
Expand Down
13 changes: 7 additions & 6 deletions src/storage/invertedindex/segment_term_posting.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,24 @@ import index_defines;
import term_meta;
import column_index_iterator;
import index_defines;
import internal_types;

namespace infinity {
// Utility class for posting merging
export class SegmentTermPosting {
public:
SegmentTermPosting();

SegmentTermPosting(const String &index_dir, const String &base_name, docid_t base_doc_id, optionflag_t flag);
SegmentTermPosting(const String &index_dir, const String &base_name, RowID base_doc_id, optionflag_t flag);

docid_t GetBaesDocId() const { return base_doc_id_; }
RowID GetBaesDocId() const { return base_row_id_; }

bool HasNext();

PostingDecoder *GetPostingDecoder() { return posting_decoder_; }

public:
docid_t base_doc_id_{};
RowID base_row_id_{};
String term_{};
PostingDecoder *posting_decoder_{nullptr};
SharedPtr<ColumnIndexIterator> column_index_iterator_{};
Expand All @@ -37,13 +38,13 @@ public:
int ret = item1->term_.compare(item2->term_);
if (ret != 0)
return ret > 0;
return item1->base_doc_id_ > item2->base_doc_id_;
return item1->base_row_id_ > item2->base_row_id_;
}
};

export class SegmentTermPostingQueue {
public:
SegmentTermPostingQueue(const String &index_dir, const Vector<String> &base_names, const Vector<docid_t> &base_docids, optionflag_t flag);
SegmentTermPostingQueue(const String &index_dir, const Vector<String> &base_names, const Vector<RowID> &base_rowids, optionflag_t flag);

~SegmentTermPostingQueue();

Expand All @@ -57,7 +58,7 @@ private:
using PriorityQueue = Heap<SegmentTermPosting *, SegmentTermPostingComparator>;
const String &index_dir_;
const Vector<String> &base_names_;
const Vector<docid_t> &base_docids_;
const Vector<RowID> &base_docids_;

PriorityQueue segment_term_postings_;
Vector<SegmentTermPosting *> merging_term_postings_;
Expand Down

0 comments on commit 449206b

Please sign in to comment.