Skip to content

Commit

Permalink
remove skipped filter when calculating averaging bits-per-key
Browse files Browse the repository at this point in the history
  • Loading branch information
littlepig2013 committed Oct 10, 2024
1 parent fc9d46b commit 416614f
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 33 deletions.
56 changes: 36 additions & 20 deletions db/bpk_alloc_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
const BlockBasedTableOptions tbo =
std::static_pointer_cast<BlockBasedTableFactory>(ioptions_->table_factory)
->GetBlockBasedTableOptions();
if (tbo.modular_filters) {
max_bits_per_key_ =
tbo.max_modulars * tbo.max_bits_per_key_granularity + 1.0;
}
if (tbo.filter_policy == nullptr) return;
overall_bits_per_key_ =
std::static_pointer_cast<const BloomLikeFilterPolicy>(tbo.filter_policy)
Expand Down Expand Up @@ -54,7 +58,7 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
!flush_flag_) { // skip the preparation phase for flush
return;
}

uint64_t skipped_filter_size = 0;
std::unordered_map<LevelState, uint64_t, LevelState::HashFunction>
level2removed_num_entries;
uint64_t added_entries_in_max_level = 0;
Expand All @@ -70,18 +74,17 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
}
if (level != 0) {
uint64_t num_entries = 0;
if (level2removed_num_entries.find(LevelState(level, 0, 0)) !=
if (level2removed_num_entries.find(LevelState(level, 0, 0, 0)) !=
level2removed_num_entries.end()) {
num_entries = level2removed_num_entries[LevelState(level, 0, 0)];
num_entries = level2removed_num_entries[LevelState(level, 0, 0, 0)];
}
uint64_t num_entries_in_bf = 0;
for (const FileMetaData* meta : input_files->at(i).files) {
num_entries_in_bf = meta->num_entries - meta->num_range_deletions;
num_entries += num_entries_in_bf;
num_entries_in_compaction_ += num_entries_in_bf;
if (meta->bpk != -1) {
num_bits_for_filter_to_be_removed_ +=
num_entries_in_bf * meta->bpk;
num_bits_for_filter_to_be_removed_ += meta->filter_size * 8;
} else {
num_bits_for_filter_to_be_removed_ +=
num_entries_in_bf * overall_bits_per_key_;
Expand Down Expand Up @@ -125,13 +128,16 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
if (tbo.no_filter_optimize_for_level0) {
start_level = 1;
}
uint64_t agg_filter_size = 0;
for (int level = start_level; level < vstorage_->num_levels(); ++level) {
num_entries_in_filter_by_level = 0;
agg_filter_size = 0;
for (auto* file_meta : vstorage_->LevelFiles(level)) {
tmp_num_entries_in_filter_by_file =
file_meta->num_entries - file_meta->num_range_deletions;
tmp_fd_number = file_meta->fd.GetNumber();
max_fd_number = std::max(max_fd_number, tmp_fd_number);
agg_filter_size += file_meta->filter_size;
if (tmp_num_entries_in_filter_by_file == 0) continue;
if (level == 0) {
if (compaction != nullptr && !level2removed_num_entries.empty()) {
Expand Down Expand Up @@ -169,8 +175,8 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
// skip the current level if the current level has no entries
if (num_entries_in_filter_by_level == 0) continue;

level_states_pq_.push(
LevelState(level, num_entries_in_filter_by_level, 0));
level_states_pq_.push(LevelState(level, num_entries_in_filter_by_level,
0, agg_filter_size));
temp_sum_in_bpk_optimization_ +=
std::log(num_entries_in_filter_by_level) *
num_entries_in_filter_by_level;
Expand Down Expand Up @@ -199,11 +205,13 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
for (FileMetaData* meta : vstorage_->LevelFiles(0)) {
if (meta->fd.GetNumber() == level_states_pq_.top().file_number) {
meta->bpk = 0.0;
skipped_filter_size += meta->filter_size;
}
}
} else {
levelIDs_with_bpk0_in_dynamic_monkey.insert(
level_states_pq_.top().level);
skipped_filter_size += level_states_pq_.top().agg_filter_size;
}
tmp_num_entries_in_filter_by_file = level_states_pq_.top().num_entries;
temp_sum_in_bpk_optimization_ -=
Expand All @@ -219,6 +227,7 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {

vstorage_->SetLevelIDsWithEmptyBpkInDynamicMonkey(
levelIDs_with_bpk0_in_dynamic_monkey);
vstorage_->UpdateSkippedFilterSize(skipped_filter_size);

if (!level_states_pq_.empty()) {
dynamic_monkey_bpk_num_entries_threshold_ =
Expand All @@ -240,6 +249,7 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
uint64_t num_empty_queries_in_compaction = 0;
uint64_t num_files_in_compaction = 0;
int max_level = -1;
uint64_t skipped_filter_size = 0;
if (compaction != nullptr) {
const std::vector<CompactionInputFiles>* input_files =
compaction->inputs();
Expand Down Expand Up @@ -272,7 +282,7 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
num_entries_in_bf = meta->num_entries - meta->num_range_deletions;
num_entries_in_compaction_ += num_entries_in_bf;
if (meta->bpk != -1) {
num_bits_for_filter_to_be_removed_ += num_entries_in_bf * meta->bpk;
num_bits_for_filter_to_be_removed_ += meta->filter_size;
} else {
num_bits_for_filter_to_be_removed_ +=
num_entries_in_bf * overall_bits_per_key_;
Expand Down Expand Up @@ -340,6 +350,9 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
file_workload_state_pq_.push(
FileWorkloadState(tmp_num_entries_in_filter_by_file,
num_empty_point_reads, file_meta));
} else {
file_meta->bpk = 0.0;
skipped_filter_size += file_meta->filter_size;
}
}
}
Expand Down Expand Up @@ -392,9 +405,10 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
std::log(file_workload_state_pq_.top().weight * total_empty_queries_) +
common_constant_in_bpk_optimization_ >
-log_2_squared) {
// if (file_workload_state_pq_.top().meta != nullptr) {
// file_workload_state_pq_.top().meta->bpk = 0.0;
// }
if (file_workload_state_pq_.top().meta != nullptr) {
file_workload_state_pq_.top().meta->bpk = 0.0;
skipped_filter_size += file_workload_state_pq_.top().meta->filter_size;
}
weight = file_workload_state_pq_.top().weight;
temp_sum_in_bpk_optimization_ -=
std::log(weight * total_empty_queries_) *
Expand All @@ -421,6 +435,7 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
}

vstorage_->UpdateNumEmptyPointReads(total_empty_queries_);
vstorage_->UpdateSkippedFilterSize(skipped_filter_size);
}

bpk_optimization_prepared_flag_ = true;
Expand Down Expand Up @@ -512,11 +527,15 @@ bool BitsPerKeyAllocHelper::IfNeedAllocateBitsPerKey(
}
}

uint64_t old_total_bits = vstorage_->GetCurrentTotalFilterSize() * 8.0 -
tmp_bits_per_key = std::min(tmp_bits_per_key, max_bits_per_key_);
uint64_t old_total_bits = vstorage_->GetCurrentTotalFilterSize() * 8 -
num_bits_for_filter_to_be_removed_;
if (old_total_bits > vstorage_->GetSkippedFilterSize() * 8) {
old_total_bits -= vstorage_->GetSkippedFilterSize() * 8;
}
uint64_t old_total_entries =
vstorage_->GetCurrentTotalNumEntries() - num_entries_in_compaction_;
const double overused_percentage = 0.01;
const double overused_percentage = 0.2;
std::string stats_log = "";
if (old_total_entries == 0 ||
old_total_bits > old_total_entries * overall_bits_per_key_ *
Expand All @@ -533,13 +552,10 @@ bool BitsPerKeyAllocHelper::IfNeedAllocateBitsPerKey(
(1 +
overused_percentage)) { // if the bits-per-key is overused

tmp_bits_per_key =
((overall_bits_per_key_ * 1.01) * (old_total_entries + num_entries) -
old_total_bits) /
num_entries;
if (tmp_bits_per_key < overall_bits_per_key_) {
return false;
}
tmp_bits_per_key = ((overall_bits_per_key_ * (1 + overused_percentage)) *
(old_total_entries + num_entries) -
old_total_bits) /
num_entries;
}
*bits_per_key = tmp_bits_per_key;
return true;
Expand Down
7 changes: 6 additions & 1 deletion db/bpk_alloc_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ class BitsPerKeyAllocHelper {
double overall_bits_per_key_ = 0.0;
uint64_t num_entries_in_compaction_ = 0;
uint64_t num_bits_for_filter_to_be_removed_ = 0;

double max_bits_per_key_ = 100;
struct LevelState {
int level;
uint64_t num_entries;
uint64_t file_number;
LevelState(int _level, uint64_t _num_entries, uint64_t _file_number) {
uint64_t agg_filter_size;
LevelState(int _level, uint64_t _num_entries, uint64_t _file_number,
uint64_t _agg_filter_size = 0) {
level = _level;
num_entries = _num_entries;
file_number = _file_number;
agg_filter_size = _agg_filter_size;
}
bool operator<(const LevelState& tmp) const {
if (num_entries < tmp.num_entries) return true;
Expand Down
6 changes: 6 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ Status BuildTable(
" with reset bits-per-key %.4f",
tboptions.column_family_name.c_str(),
meta->fd.GetNumber(), new_bits_per_key);
} else {
ROCKS_LOG_INFO(ioptions.info_log,
"[%s] Flushes generates new file %" PRIu64
" with no reset bits-per-key",
tboptions.column_family_name.c_str(),
meta->fd.GetNumber());
}
version->storage_info()->SetBpkCommonConstant(
bpk_alloc_helper.bpk_alloc_type_,
Expand Down
14 changes: 13 additions & 1 deletion db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,25 @@ Status CompactionOutputs::Finish(
ROCKS_LOG_INFO(
compaction_->immutable_options()->info_log,
"[%s] Compaction generates new file %" PRIu64
" in level %d"
" (num_point_reads=%" PRIu64 ", num_existing_point_reads=%" PRIu64
") with reset bits-per-key %.4f",
compaction_->column_family_data()->GetName().c_str(),
meta->fd.GetNumber(),
meta->fd.GetNumber(), compaction_->output_level(),
meta->stats.num_point_reads.load(std::memory_order_relaxed),
meta->stats.num_existing_point_reads.load(std::memory_order_relaxed),
new_bits_per_key);
} else {
ROCKS_LOG_INFO(
compaction_->immutable_options()->info_log,
"[%s] Compaction generates new file %" PRIu64
" in level %d"
" (num_point_reads=%" PRIu64 ", num_existing_point_reads=%" PRIu64
") with no reset bits-per-key",
compaction_->column_family_data()->GetName().c_str(),
meta->fd.GetNumber(), compaction_->output_level(),
meta->stats.num_point_reads.load(std::memory_order_relaxed),
meta->stats.num_existing_point_reads.load(std::memory_order_relaxed));
}

s = builder_->Finish();
Expand Down
9 changes: 7 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,7 @@ VersionStorageInfo::VersionStorageInfo(
accumulated_num_non_deletions_ =
ref_vstorage->accumulated_num_non_deletions_;
accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
skipped_filter_size_ = ref_vstorage->GetSkippedFilterSize();
current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
current_num_deletions_ = ref_vstorage->current_num_deletions_;
current_num_samples_ = ref_vstorage->current_num_samples_;
Expand Down Expand Up @@ -3280,6 +3281,7 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,
storage_info_.GetBitsPerKeyCommonConstant() == 0 ||
storage_info_.GetBitsPerKeyAllocationType() ==
BitsPerKeyAllocationType::kDefaultBpkAlloc) {
if (max_accessed_modulars) *max_accessed_modulars = max_modulars_;
return result;
}

Expand Down Expand Up @@ -3347,6 +3349,7 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,
if (num_point_reads <= num_existing_point_reads ||
storage_info_.accumulated_num_empty_point_reads_by_file_ == 0) {
// all queries are existing queries, we can skip all the modules

return true;
}

Expand All @@ -3357,15 +3360,17 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,

uint64_t num_empty_point_reads = num_point_reads - num_existing_point_reads;

*max_accessed_modulars = max_modulars_;
result =
(std::log((meta->num_entries - meta->num_range_deletions) * 1.0 /
num_empty_point_reads *
storage_info_.accumulated_num_empty_point_reads_by_file_.load(
std::memory_order_relaxed)) +
storage_info_.GetBitsPerKeyCommonConstant()) > 0;
// skip this filter if no bpk should be assigned
if (result || max_accessed_modulars == NULL) return result;
*max_accessed_modulars = max_modulars_;
if (result || max_accessed_modulars == NULL) {
return result;
}
return false;
// size_t filter_size = 0;
// double bpk = 0.0;
Expand Down
9 changes: 9 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ class VersionStorageInfo {
num_flushes_++;
}

void UpdateSkippedFilterSize(uint64_t _skipped_filter_size) const {
skipped_filter_size_.store(_skipped_filter_size, std::memory_order_relaxed);
}

void UpdateNumEmptyPointReads(
uint64_t estimated_num_empty_point_reads) const {
accumulated_num_empty_point_reads_by_file_.store(
Expand Down Expand Up @@ -712,6 +716,10 @@ class VersionStorageInfo {
return current_total_filter_size_;
}

uint64_t GetSkippedFilterSize() const {
return skipped_filter_size_.load(std::memory_order_relaxed);
}

uint64_t GetCurrentTotalNumEntries() const {
return current_num_deletions_ + current_num_non_deletions_;
}
Expand Down Expand Up @@ -847,6 +855,7 @@ class VersionStorageInfo {
mutable std::atomic<uint64_t> accumulated_num_empty_point_reads_by_file_;
mutable uint64_t point_reads_num_when_last_flush_ = 0;
mutable std::atomic<uint64_t> num_flushes_ = 0;
mutable std::atomic<uint64_t> skipped_filter_size_ = 0;
mutable std::thread::id leader_thread_id_;
mutable std::mutex thread_ids_mutex_;

Expand Down
20 changes: 11 additions & 9 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2155,13 +2155,7 @@ bool BlockBasedTable::FullFilterKeyMayMatch(
may_match = filter->KeyMayMatch(user_key_without_ts, no_io, const_ikey_ptr,
get_context, lookup_context, read_options,
modular_filter_index);
if (may_match) {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level);
} else {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
}

} else if (!PrefixExtractorChanged(prefix_extractor) &&
prefix_extractor->InDomain(user_key_without_ts)) {
// FIXME ^^^: there should be no reason for Get() to depend on current
Expand Down Expand Up @@ -2354,7 +2348,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
modular_filter_index >= get_context->max_accessed_modulars_) {
tmp_no_io = true;
}
may_match = FullFilterKeyMayMatch(filter, key, no_io, prefix_extractor,
may_match = FullFilterKeyMayMatch(filter, key, tmp_no_io, prefix_extractor,
get_context, &lookup_context,
read_options, modular_filter_index);
if (!use_origin_filter && !skip_filters && may_match &&
Expand All @@ -2371,9 +2365,17 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
}
get_context->filter_second_high_priority_cache_ = false;
get_context->hash_digest_ = origin_hash_digest;
get_context->max_accessed_modulars_ = 1;

TEST_SYNC_POINT("BlockBasedTable::Get:AfterFilterMatch");
if (rep_->whole_key_filtering) {
if (may_match) {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_POSITIVE);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_positive, 1, rep_->level);
} else {
RecordTick(rep_->ioptions.stats, BLOOM_FILTER_USEFUL);
PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_useful, 1, rep_->level);
}
}
if (may_match) {
IndexBlockIter iiter_on_stack;
// if prefix_extractor found in block differs from options, disable
Expand Down

0 comments on commit 416614f

Please sign in to comment.