From 973a776f973ddd6a3baea66ae26f3a0bccf22765 Mon Sep 17 00:00:00 2001 From: littlepig2013 Date: Fri, 25 Oct 2024 17:21:30 -0400 Subject: [PATCH] snaphot num_point_reads before compaction --- db/bpk_alloc_helper.cc | 7 + db/compaction/compaction.cc | 89 +++++++++---- db/compaction/compaction.h | 25 +++- db/compaction/compaction_iterator.h | 2 + db/compaction/compaction_outputs.cc | 39 +++++- db/compaction/compaction_outputs.h | 6 + db/table_cache.cc | 28 +--- db/table_cache.h | 4 +- db/version_edit.h | 26 +++- db/version_set.cc | 192 ++++++++++++++++----------- db/version_set.h | 8 +- include/rocksdb/options.h | 2 +- table/compaction_merging_iterator.cc | 2 + table/internal_iterator.h | 7 + table/iterator_wrapper.h | 12 ++ 15 files changed, 305 insertions(+), 144 deletions(-) diff --git a/db/bpk_alloc_helper.cc b/db/bpk_alloc_helper.cc index 13e70a2..5062736 100644 --- a/db/bpk_alloc_helper.cc +++ b/db/bpk_alloc_helper.cc @@ -347,6 +347,13 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) { tmp_num_entries_in_filter_by_file; workload_aware_num_entries_with_empty_queries_ += tmp_num_entries_in_filter_by_file; + if (file_meta->filter_size != 0) { + file_meta->bpk = + file_meta->filter_size * 8 / + (file_meta->num_entries - file_meta->num_range_deletions); + } else { + file_meta->bpk = 0; + } file_workload_state_pq_.push( FileWorkloadState(tmp_num_entries_in_filter_by_file, num_empty_point_reads, file_meta)); diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 693172e..03f7816 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -376,46 +376,81 @@ Compaction::Compaction( // calculate max_num_entries_in_output_levels used in monkey allocation max_num_entries_in_compaction_ = 0; max_num_entries_in_output_level_ = 0; - min_avg_num_point_reads_ = std::numeric_limits::max(); + existing_entries_in_output_level_ = 0; + min_avg_num_point_reads_from_upper_level_ = 0; + max_avg_num_point_reads_ = 0; + agg_total_num_point_reads_ = vstorage->GetAccumulatedNumPointReads(); + + std::vector num_entries_per_level(num_input_levels(), 0); + double avg_num_point_reads_per_lvl0_file = 0.0; for (size_t which = 0; which < num_input_levels(); which++) { + if (inputs_[which].level == 0) { + avg_num_point_reads_per_lvl0_file = + vstorage->GetAvgNumPointReadsPerLvl0File(); + } else { + avg_num_point_reads_per_lvl0_file = 0.0; + } DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files, - &arena_); + &arena_, agg_total_num_point_reads_, + immutable_options_.point_read_learning_rate, -1, + avg_num_point_reads_per_lvl0_file); num_input_files_ += inputs_[which].files.size(); + for (FileMetaData* meta : inputs_[which].files) { + num_entries_per_level[which] += + meta->num_entries - meta->num_range_deletions; + } + max_num_entries_in_compaction_ += num_entries_per_level[which]; } uint64_t tmp_entries = 0; + // use the minimum avg point reads per file to represent the avg point reads + // per sorted run and choose the maximum among them for (size_t which = 0; which < num_input_levels(); which++) { if (inputs_[which].level != output_level_) { - for (FileMetaData* meta : inputs_[which].files) { - tmp_entries = meta->num_entries - meta->num_range_deletions; - max_num_entries_in_compaction_ += tmp_entries; - max_num_entries_in_output_level_ += tmp_entries; - auto result = meta->stats.GetEstimatedNumPointReads( - vstorage->GetAccumulatedNumPointReads(), - immutable_options_.point_read_learning_rate); - if (inputs_[which].level == 0) { - min_avg_num_point_reads_ = - std::min(min_avg_num_point_reads_, - result.first * 1.0 / - (tmp_entries * inputs_[which].files.size() + - num_input_levels() - 1)); - } else { - min_avg_num_point_reads_ = std::min( - min_avg_num_point_reads_, result.first * 1.0 / tmp_entries); + max_num_entries_in_output_level_ += num_entries_per_level[which]; + if (inputs_[which].level == 0) { + for (size_t i = 0; i < input_levels_[which].num_files; i++) { + tmp_entries = inputs_[which].files[i]->num_entries - + inputs_[which].files[i]->num_range_deletions; + max_avg_num_point_reads_ = std::max( + max_avg_num_point_reads_, + input_levels_[which].files[i].snapshot_num_point_reads * 1.0 / + tmp_entries); + ; + min_avg_num_point_reads_from_upper_level_ = std::max( + min_avg_num_point_reads_from_upper_level_, + input_levels_[which].files[i].snapshot_num_point_reads * 1.0 / + tmp_entries / inputs_[which].files.size()); } + } else { + double temp_avg_num_point_reads_from_upper_level_ = + std::numeric_limits::max(); + if (inputs_[which].files.size() == 0) + temp_avg_num_point_reads_from_upper_level_ = 0; + for (size_t i = 0; i < input_levels_[which].num_files; i++) { + tmp_entries = inputs_[which].files[i]->num_entries - + inputs_[which].files[i]->num_range_deletions; + max_avg_num_point_reads_ = std::max( + max_avg_num_point_reads_, + input_levels_[which].files[i].snapshot_num_point_reads * 1.0 / + tmp_entries); + ; + temp_avg_num_point_reads_from_upper_level_ = std::min( + temp_avg_num_point_reads_from_upper_level_, + input_levels_[which].files[i].snapshot_num_point_reads * + num_entries_per_level[which] * 1.0 / + (tmp_entries * inputs_[which].files.size() * + max_num_entries_in_compaction_)); + } + min_avg_num_point_reads_from_upper_level_ = + std::max(min_avg_num_point_reads_from_upper_level_, + temp_avg_num_point_reads_from_upper_level_); } } else { - for (FileMetaData* meta : inputs_[which].files) { - tmp_entries = meta->num_entries - meta->num_range_deletions; - max_num_entries_in_compaction_ += tmp_entries; - auto result = meta->stats.GetEstimatedNumPointReads( - vstorage->GetAccumulatedNumPointReads(), - immutable_options_.point_read_learning_rate); - min_avg_num_point_reads_ = std::min(min_avg_num_point_reads_, - result.first * 1.0 / tmp_entries); - } + existing_entries_in_output_level_ = num_entries_per_level[which]; } } + if (input_vstorage_->GetBitsPerKeyAllocationType() == BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc) { for (const FileMetaData* meta : diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index cd4819d..705e7a9 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -125,6 +125,10 @@ class Compaction { return max_num_entries_in_output_level_; } + uint64_t existing_entries_in_output_level() const { + return existing_entries_in_output_level_; + } + uint64_t max_num_entries_in_compaction() const { return max_num_entries_in_compaction_; } @@ -452,7 +456,17 @@ class Compaction { return avg_num_existing_point_reads_with_naiive_track_; } - double GetMinAvgNumPointReads() const { return min_avg_num_point_reads_; } + double GetMaxAvgNumPointReads() const { return max_avg_num_point_reads_; } + + double GetMinAvgNumPointReadsUpperLevel() const { + return min_avg_num_point_reads_from_upper_level_; + } + // double GetMinAvgNumPointReadsDeepestLevel() const { return + // min_avg_num_point_reads_from_deepest_level_; } + + uint64_t GetSnapshotAggTotalNumPointReads() const { + return agg_total_num_point_reads_; + } private: void SetInputVersion(Version* input_version); @@ -514,6 +528,7 @@ class Compaction { Arena arena_; // Arena used to allocate space for file_levels_ uint64_t max_num_entries_in_output_level_; + uint64_t existing_entries_in_output_level_; uint64_t max_num_entries_in_compaction_; const uint32_t output_path_id_; CompressionType output_compression_; @@ -595,10 +610,14 @@ class Compaction { PenultimateOutputRangeType::kNotSupported; uint64_t num_input_files_; + uint64_t agg_total_num_point_reads_; uint64_t avg_num_point_reads_with_naiive_track_; uint64_t avg_num_existing_point_reads_with_naiive_track_; - double min_avg_num_point_reads_; // the inheritance method may underestimate - // the real number of point reads + double + min_avg_num_point_reads_from_upper_level_; // the inheritance method may + // underestimate the real + // number of point reads + double max_avg_num_point_reads_; }; #ifndef NDEBUG diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index f3d2732..3a4e693 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -44,6 +44,7 @@ class SequenceIterWrapper : public InternalIterator { inner_iter_->Next(); SetAvgNumPointReads(inner_iter_->GetAvgNumPointReads()); SetAvgNumExistingPointReads(inner_iter_->GetAvgNumExistingPointReads()); + SetIsDeepestLevel(inner_iter_->IsDeepestLevel()); } void Seek(const Slice& target) override { if (!need_count_entries_) { @@ -280,6 +281,7 @@ class CompactionIterator { double GetAvgNumExistingPointReads() const { return input_.GetAvgNumExistingPointReads(); } + bool IsDeepestLevel() const { return input_.IsDeepestLevel(); } bool IsDeleteRangeSentinelKey() const { return is_range_del_; } diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index bb6a29e..013ddaf 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -31,6 +31,7 @@ Status CompactionOutputs::Finish( assert(meta != nullptr); Status s = intput_status; uint64_t min_num_point_reads = 0; + uint64_t num_point_reads = 0; if (s.ok()) { std::string seqno_to_time_mapping_str; seqno_to_time_mapping.Encode( @@ -41,12 +42,26 @@ Status CompactionOutputs::Finish( meta->num_entries = builder_->NumEntries(); if (compaction_->immutable_options()->point_reads_track_method == kDynamicCompactionAwareTrack) { - uint64_t num_point_reads = + num_point_reads = std::min(compaction_->input_vstorage()->GetAccumulatedNumPointReads(), (uint64_t)round(current_output().agg_num_point_reads)); - min_num_point_reads = (uint64_t)round( - compaction_->GetMinAvgNumPointReads() * meta->num_entries); - num_point_reads = std::max(min_num_point_reads, num_point_reads); + uint64_t agg_num_entries_from_upper_level = + current_output().agg_num_entries_from_upper_level; + min_num_point_reads = + round(compaction_->GetMinAvgNumPointReadsUpperLevel() * + agg_num_entries_from_upper_level); + if (meta->num_entries > agg_num_entries_from_upper_level) { + min_num_point_reads = std::max( + min_num_point_reads, + (uint64_t)round( + current_output().min_curr_avg_point_reads_from_deepest_level * + (meta->num_entries - agg_num_entries_from_upper_level))); + } + num_point_reads = std::max(num_point_reads, min_num_point_reads); + double avg_num_point_reads = + std::min(num_point_reads * 1.0 / meta->num_entries, + compaction_->GetMaxAvgNumPointReads()); + num_point_reads = round(avg_num_point_reads * meta->num_entries); file_point_read_inc(meta, num_point_reads); file_existing_point_read_inc( @@ -77,13 +92,14 @@ Status CompactionOutputs::Finish( " in level %d" " (num_point_reads=%" PRIu64 ", num_existing_point_reads=%" PRIu64 " , min_num_point_reads=%" PRIu64 - ", avg_min_num_point_reads %.4f) with reset bits-per-key %.4f", + ", num_entries_from_upper_level=%" PRIu64 + ") with reset bits-per-key %.4f", compaction_->column_family_data()->GetName().c_str(), meta->fd.GetNumber(), compaction_->output_level(), meta->stats.num_point_reads.load(std::memory_order_relaxed), meta->stats.num_existing_point_reads.load(std::memory_order_relaxed), - min_num_point_reads, compaction_->GetMinAvgNumPointReads(), - new_bits_per_key); + min_num_point_reads, + current_output().agg_num_entries_from_upper_level, new_bits_per_key); } else { ROCKS_LOG_INFO( compaction_->immutable_options()->info_log, @@ -477,6 +493,15 @@ Status CompactionOutputs::AddToOutput( current_output().agg_num_existing_point_reads += c_iter.GetAvgNumExistingPointReads(); } + if (!c_iter.IsDeepestLevel()) { + current_output().agg_num_entries_from_upper_level++; + current_output().agg_num_point_reads_from_upper_level += + c_iter.GetAvgNumPointReads(); + } else { + current_output().min_curr_avg_point_reads_from_deepest_level = + std::min(current_output().min_curr_avg_point_reads_from_deepest_level, + c_iter.GetAvgNumPointReads()); + } if (!s.ok()) { return s; diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 20b7d14..9c6111d 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -10,6 +10,8 @@ #pragma once +#include + #include "db/blob/blob_garbage_meter.h" #include "db/bpk_alloc_helper.h" #include "db/compaction/compaction.h" @@ -42,7 +44,11 @@ class CompactionOutputs { bool finished; std::shared_ptr table_properties; double agg_num_point_reads = 0.0; + double agg_num_point_reads_from_upper_level = 0.0; double agg_num_existing_point_reads = 0.0; + uint64_t agg_num_entries_from_upper_level = 0; + double min_curr_avg_point_reads_from_deepest_level = + std::numeric_limits::max(); }; CompactionOutputs() = delete; diff --git a/db/table_cache.cc b/db/table_cache.cc index 881a7e3..e619521 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -225,9 +225,8 @@ InternalIterator* TableCache::NewIterator( const InternalKey* smallest_compaction_key, const InternalKey* largest_compaction_key, bool allow_unprepared_value, uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno, - TruncatedRangeDelIterator** range_del_iter, - uint64_t current_global_num_point_reads_counter, - double avg_lvl0_num_point_reads_ratio) { + TruncatedRangeDelIterator** range_del_iter, uint64_t est_num_point_reads, + uint64_t est_num_existing_point_reads) { PERF_TIMER_GUARD(new_table_iterator_nanos); Status s; @@ -270,25 +269,12 @@ InternalIterator* TableCache::NewIterator( file_meta.num_entries - file_meta.num_range_deletions; if (num_entries_in_filter > 0 && (ioptions_.point_reads_track_method == kDynamicCompactionAwareTrack)) { - uint64_t min_num_point_reads = 0; - if (level == 0) { - min_num_point_reads = - round(file_meta.stats.start_global_point_read_number * - avg_lvl0_num_point_reads_ratio); - } - std::pair estimated_num_point_read_stats = - file_meta.stats.GetEstimatedNumPointReads( - current_global_num_point_reads_counter, - ioptions_.point_read_learning_rate, -1, min_num_point_reads); - uint64_t num_point_reads = estimated_num_point_read_stats.first; - uint64_t num_existing_point_reads = - estimated_num_point_read_stats.second; - if (num_point_reads > 0) { - result->SetAvgNumPointReads(num_point_reads * 1.0 / + if (est_num_point_reads > 0) { + result->SetAvgNumPointReads(est_num_point_reads * 1.0 / num_entries_in_filter); - if (num_existing_point_reads > 0) { - result->SetAvgNumExistingPointReads(num_existing_point_reads * 1.0 / - num_entries_in_filter); + if (est_num_existing_point_reads > 0) { + result->SetAvgNumExistingPointReads(est_num_existing_point_reads * + 1.0 / num_entries_in_filter); } } } diff --git a/db/table_cache.h b/db/table_cache.h index bd7e212..8ff2bea 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -102,8 +102,8 @@ class TableCache { uint8_t protection_bytes_per_key, const SequenceNumber* range_del_read_seqno = nullptr, TruncatedRangeDelIterator** range_del_iter = nullptr, - uint64_t current_global_num_point_reads_counter = 0, - double avg_lvl0_num_point_reads_ratio = 1.0); + uint64_t est_num_point_reads = 0, + uint64_t est_num_existing_point_reads = 0); // If a seek to internal key "k" in specified file finds an entry, // call get_context->SaveValue() repeatedly until diff --git a/db/version_edit.h b/db/version_edit.h index b0bf731..960b477 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -325,6 +325,8 @@ struct FileSampledStats { } est_num_point_reads = std::max(est_num_point_reads, min_num_point_reads + GetNumPointReads()); + est_num_point_reads = + std::min(est_num_point_reads, current_global_point_read_number); double est_existing_ratio = 0.0; uint64_t num_existing_point_reads_in_window = 0; if (global_point_read_number_window.size() == 64) { @@ -550,16 +552,36 @@ struct FdWithKeyRange { FileMetaData* file_metadata; // Point to all metadata Slice smallest_key; // slice that contain smallest key Slice largest_key; // slice that contain largest key + uint64_t snapshot_num_point_reads; + uint64_t snapshot_num_existing_point_reads; FdWithKeyRange() - : fd(), file_metadata(nullptr), smallest_key(), largest_key() {} + : fd(), + file_metadata(nullptr), + smallest_key(), + largest_key(), + snapshot_num_point_reads(0), + snapshot_num_existing_point_reads(0) {} FdWithKeyRange(FileDescriptor _fd, Slice _smallest_key, Slice _largest_key, FileMetaData* _file_metadata) : fd(_fd), file_metadata(_file_metadata), smallest_key(_smallest_key), - largest_key(_largest_key) {} + largest_key(_largest_key), + snapshot_num_point_reads(0), + snapshot_num_existing_point_reads(0) {} + + void InitNumPointReadStats(uint64_t global_num_point_read_counter, + double learning_rate, int est_interval, + uint64_t min_point_reads) { + if (file_metadata) { + std::tie(snapshot_num_point_reads, snapshot_num_existing_point_reads) = + file_metadata->stats.GetEstimatedNumPointReads( + global_num_point_read_counter, learning_rate, est_interval, + min_point_reads); + } + } }; // Data structure to store an array of FdWithKeyRange in one level diff --git a/db/version_set.cc b/db/version_set.cc index d7f7f35..2f076cd 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -859,7 +860,10 @@ int FindFile(const InternalKeyComparator& icmp, void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, const std::vector& files, - Arena* arena) { + Arena* arena, + uint64_t current_global_point_read_number, + double learning_rate, int est_interval, + double min_num_point_reads_ratio) { assert(file_level); assert(arena); @@ -879,9 +883,19 @@ void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, memcpy(mem, smallest_key.data(), smallest_size); memcpy(mem + smallest_size, largest_key.data(), largest_size); + uint64_t min_num_point_reads = 0; FdWithKeyRange& f = file_level->files[i]; f.fd = files[i]->fd; f.file_metadata = files[i]; + if (f.file_metadata) { + min_num_point_reads = + round(f.file_metadata->stats.start_global_point_read_number * + min_num_point_reads_ratio); + } else { + min_num_point_reads = 0; + } + f.InitNumPointReadStats(current_global_point_read_number, learning_rate, + est_interval, min_num_point_reads); f.smallest_key = Slice(mem, smallest_size); f.largest_key = Slice(mem + smallest_size, largest_size); } @@ -992,6 +1006,8 @@ class LevelIterator final : public InternalIterator { if (range_tombstone_iter_ptr_) { *range_tombstone_iter_ptr_ = &range_tombstone_iter_; } + + is_deepest_level_in_compaction_ = false; } ~LevelIterator() override { delete file_iter_.Set(nullptr); } @@ -1081,6 +1097,7 @@ class LevelIterator final : public InternalIterator { adjusted_avg_num_point_reads_ = adjusted_avg_num_point_reads; } + // set the factor as the number of sorted runs void SetNumPointReadsFactor(double factor) { num_point_reads_factor_ = factor; } @@ -1152,6 +1169,7 @@ class LevelIterator final : public InternalIterator { } CheckMayBeOutOfLowerBound(); ClearRangeTombstoneIter(); + return table_cache_->NewIterator( read_options_, file_options_, icomparator_, *file_meta.file_metadata, range_del_agg_, prefix_extractor_, @@ -1160,7 +1178,8 @@ class LevelIterator final : public InternalIterator { /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key, largest_compaction_key, allow_unprepared_value_, block_protection_bytes_per_key_, &read_seq_, range_tombstone_iter_, - current_global_point_reads_counter_); + flevel_->files[file_index_].snapshot_num_point_reads, + flevel_->files[file_index_].snapshot_num_existing_point_reads); } // Check if current file being fully within iterate_lower_bound. @@ -1557,9 +1576,17 @@ void LevelIterator::SetFileIterator(InternalIterator* iter, if (should_inherit_num_point_reads) { this->SetAvgNumPointReads(iter->GetAvgNumPointReads() / num_point_reads_factor_); + iter->SetAvgNumPointReads(iter->GetAvgNumPointReads() / + num_point_reads_factor_); + this->SetIsDeepestLevel(true); + iter->SetIsDeepestLevel(true); } else { this->SetAvgNumPointReads(iter->GetAvgNumExistingPointReads() + adjusted_avg_num_point_reads); + iter->SetAvgNumPointReads(iter->GetAvgNumExistingPointReads() + + adjusted_avg_num_point_reads); + this->SetIsDeepestLevel(false); + iter->SetIsDeepestLevel(false); } this->SetAvgNumExistingPointReads(iter->GetAvgNumExistingPointReads()); @@ -3288,9 +3315,9 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level, // we already mark the bpk as 0 for each FileMetaData that is supposed to be // skipped, so in most cases we can read the filter if bpk is not 0. if (meta->bpk == 0) { - PERF_COUNTER_ADD(num_skipped_times, 1); - return true; // bpk == 0 means we choose not to build filter, thus we - // should skip it + if (max_accessed_modulars) *max_accessed_modulars = 0; + return false; // bpk == 0 means we choose not to build filter, thus we + // should skip it } if (storage_info_.GetBitsPerKeyAllocationType() == @@ -3429,11 +3456,21 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level, return false; } -void VersionStorageInfo::GenerateLevelFilesBrief() { +void VersionStorageInfo::GenerateLevelFilesBrief( + const ImmutableOptions& immutable_options) { level_files_brief_.resize(num_non_empty_levels_); for (int level = 0; level < num_non_empty_levels_; level++) { - DoGenerateLevelFilesBrief(&level_files_brief_[level], files_[level], - &arena_); + if (level == 0) { + DoGenerateLevelFilesBrief(&level_files_brief_[level], files_[level], + &arena_, accumulated_num_point_reads_, + immutable_options.point_read_learning_rate, -1, + avg_num_point_reads_per_lvl0_file_); + } else { + DoGenerateLevelFilesBrief(&level_files_brief_[level], files_[level], + &arena_, accumulated_num_point_reads_, + immutable_options.point_read_learning_rate, -1, + 0); + } } } @@ -3445,7 +3482,7 @@ void VersionStorageInfo::PrepareForVersionAppend( CalculateBaseBytes(immutable_options, mutable_cf_options); UpdateFilesByCompactionPri(immutable_options, mutable_cf_options); GenerateFileIndexer(); - GenerateLevelFilesBrief(); + GenerateLevelFilesBrief(immutable_options); GenerateLevel0NonOverlapping(); if (!immutable_options.allow_ingest_behind) { GenerateBottommostFiles(); @@ -7345,7 +7382,6 @@ InternalIterator* VersionSet::MakeInputIterator( c->num_input_levels() - 1 : c->num_input_levels()); InternalIterator** list = new InternalIterator*[space]; - Slice smallest_key; Slice largest_key; uint64_t num_non_existing_point_reads_for_file_with_largest_key = 0; @@ -7359,18 +7395,19 @@ InternalIterator* VersionSet::MakeInputIterator( size_t iterator_index_with_smallest_key = 0; size_t iterator_index_with_largest_key = 0; - std::string stats_log = ""; - uint64_t num_entries_with_max_num_non_existing_point_reads = 0; uint64_t max_num_non_existing_point_reads = 0; uint64_t temp_num_existing_point_reads_in_a_level = 0; uint64_t temp_num_non_existing_point_reads = 0; uint64_t temp_num_existing_point_reads = 0; + uint64_t temp_num_point_reads = 0; size_t iterator_index_with_max_num_non_existing_point_reads = 0; + std::string stats_log = ""; + std::pair temp_estimated_num_point_read_stats; uint64_t current_global_point_reads_counter = - c->input_vstorage()->GetAccumulatedNumPointReads(); + c->GetSnapshotAggTotalNumPointReads(); // First item in the pair is a pointer to range tombstones. // Second item is a pointer to a member of a LevelIterator, // that will be initialized to where CompactionMergingIterator stores @@ -7380,12 +7417,20 @@ InternalIterator* VersionSet::MakeInputIterator( std::pair> range_tombstones; size_t num = 0; + double factor = 0.0; + size_t input_sorted_runs = c->num_input_levels(); for (size_t which = 0; which < c->num_input_levels(); which++) { if (c->input_levels(which)->num_files != 0) { if (c->level(which) == 0) { const LevelFilesBrief* flevel = c->input_levels(which); + input_sorted_runs = c->num_input_levels() + flevel->num_files - 1; for (size_t i = 0; i < flevel->num_files; i++) { const FileMetaData& fmd = *flevel->files[i].file_metadata; + + temp_num_point_reads = flevel->files[i].snapshot_num_point_reads; + temp_num_existing_point_reads = + flevel->files[i].snapshot_num_existing_point_reads; + if (start.has_value() && cfd->user_comparator()->CompareWithoutTimestamp( *start, fmd.largest.user_key()) > 0) { @@ -7415,28 +7460,12 @@ InternalIterator* VersionSet::MakeInputIterator( /*allow_unprepared_value=*/false, c->mutable_cf_options()->block_protection_bytes_per_key, /*range_del_read_seqno=*/nullptr, - /*range_del_iter=*/&range_tombstone_iter, - current_global_point_reads_counter, - c->input_vstorage()->GetAvgNumPointReadsPerLvl0File()); - - temp_estimated_num_point_read_stats = - fmd.stats.GetEstimatedNumPointReads( - current_global_point_reads_counter, - db_options_->point_read_learning_rate, -1, - round(fmd.stats.start_global_point_read_number * - c->input_vstorage()->GetAvgNumPointReadsPerLvl0File())); - stats_log += - "[ fileID:" + std::to_string(fmd.fd.GetNumber()) + - ", num_point_reads:" + - std::to_string(temp_estimated_num_point_read_stats.first) + - ", num_existing_point_reads:" + - std::to_string(temp_estimated_num_point_read_stats.second) + - "], "; - temp_num_existing_point_reads = - temp_estimated_num_point_read_stats.second; + /*range_del_iter=*/&range_tombstone_iter, temp_num_point_reads, + temp_num_existing_point_reads); + list[num]->SetIsDeepestLevel(false); + temp_num_non_existing_point_reads = - temp_estimated_num_point_read_stats.first - - temp_estimated_num_point_read_stats.second; + temp_num_point_reads - temp_num_existing_point_reads; if (max_num_non_existing_point_reads > temp_num_existing_point_reads) { max_num_non_existing_point_reads -= temp_num_existing_point_reads; @@ -7473,10 +7502,18 @@ InternalIterator* VersionSet::MakeInputIterator( meta_for_file_with_largest_key = flevel->files[i].file_metadata; } - if (c->num_input_levels() > 0) { - list[num]->SetAvgNumPointReads( - list[num]->GetAvgNumExistingPointReads()); - } + stats_log += + "[ fileID:" + std::to_string(fmd.fd.GetNumber()) + + ", num_point_reads:" + std::to_string(temp_num_point_reads) + + ", num_existing_point_reads:" + + std::to_string(temp_num_existing_point_reads) + + ", avg_num_point_reads:" + + std::to_string(temp_num_point_reads * 1.0 / + (fmd.num_entries - fmd.num_range_deletions)) + + "], "; + + list[num]->SetAvgNumPointReads(list[num]->GetAvgNumPointReads() * + 1.0 / input_sorted_runs); num++; range_tombstones.emplace_back(range_tombstone_iter, nullptr); } @@ -7489,25 +7526,25 @@ InternalIterator* VersionSet::MakeInputIterator( bool is_largest_key_changed = false; for (size_t i = 0; i < flevel->num_files; i++) { const FileMetaData& fmd = *flevel->files[i].file_metadata; - temp_estimated_num_point_read_stats = - fmd.stats.GetEstimatedNumPointReads( - current_global_point_reads_counter, - db_options_->point_read_learning_rate); + + temp_num_point_reads = flevel->files[i].snapshot_num_point_reads; + temp_num_existing_point_reads = + flevel->files[i].snapshot_num_existing_point_reads; + stats_log += "[ fileID:" + std::to_string(fmd.fd.GetNumber()) + - ", num_point_reads:" + - std::to_string(temp_estimated_num_point_read_stats.first) + + ", num_point_reads:" + std::to_string(temp_num_point_reads) + ", num_existing_point_reads:" + - std::to_string(temp_estimated_num_point_read_stats.second) + + std::to_string(temp_num_existing_point_reads) + ", avg_num_point_reads:" + - std::to_string(temp_estimated_num_point_read_stats.first * 1.0 / + std::to_string(temp_num_point_reads * 1.0 / (fmd.num_entries - fmd.num_range_deletions)) + "], "; + temp_num_non_existing_point_reads = - temp_estimated_num_point_read_stats.first - - temp_estimated_num_point_read_stats.second; + temp_num_point_reads - temp_num_existing_point_reads; temp_num_existing_point_reads_in_a_level += - temp_estimated_num_point_read_stats.second; + temp_num_existing_point_reads; temp_agg_num_non_existing_point_reads += temp_num_non_existing_point_reads; temp_num_entries_with_max_num_non_existing_point_reads += @@ -7533,14 +7570,12 @@ InternalIterator* VersionSet::MakeInputIterator( if (cfd->user_comparator()->CompareWithoutTimestamp( meta_for_file_with_smallest_key->largest.user_key(), meta->largest.user_key()) > 0) { - temp_estimated_num_point_read_stats = - meta->stats.GetEstimatedNumPointReads( - current_global_point_reads_counter, - db_options_->point_read_learning_rate); + temp_num_point_reads = + flevel->files[j].snapshot_num_point_reads; if (num_non_existing_point_reads_for_file_with_smallest_key > - temp_estimated_num_point_read_stats.first) { + temp_num_point_reads) { num_non_existing_point_reads_for_file_with_smallest_key -= - temp_estimated_num_point_read_stats.first; + temp_num_point_reads; } else { num_non_existing_point_reads_for_file_with_smallest_key = 0; break; @@ -7573,14 +7608,13 @@ InternalIterator* VersionSet::MakeInputIterator( if (cfd->user_comparator()->CompareWithoutTimestamp( meta_for_file_with_largest_key->smallest.user_key(), meta->smallest.user_key()) < 0) { - temp_estimated_num_point_read_stats = - meta->stats.GetEstimatedNumPointReads( - current_global_point_reads_counter, - db_options_->point_read_learning_rate); + temp_num_point_reads = + flevel->files[j].snapshot_num_point_reads; + if (num_non_existing_point_reads_for_file_with_largest_key > - temp_estimated_num_point_read_stats.first) { + temp_num_point_reads) { num_non_existing_point_reads_for_file_with_largest_key -= - temp_estimated_num_point_read_stats.first; + temp_num_point_reads; } else { num_non_existing_point_reads_for_file_with_largest_key = 0; break; @@ -7607,7 +7641,7 @@ InternalIterator* VersionSet::MakeInputIterator( c->mutable_cf_options()->block_protection_bytes_per_key, range_del_agg, c->boundaries(which), false, &tombstone_iter_ptr, current_global_point_reads_counter); - list[num] = tmp_level_iterator; + // non_existing point reads in shallower levels could be existing point // reads in lower level if (max_num_non_existing_point_reads > @@ -7635,9 +7669,14 @@ InternalIterator* VersionSet::MakeInputIterator( tmp_level_iterator->SetIsDeepestLevelInCompaction(true); num_non_existing_point_reads_in_last_level = temp_agg_num_non_existing_point_reads; - tmp_level_iterator->SetNumPointReadsFactor(c->num_input_files()); + factor = c->max_num_entries_in_compaction() * 1.0 * + flevel->num_files / c->existing_entries_in_output_level(); + tmp_level_iterator->SetNumPointReadsFactor(factor); + stats_log += " factor : " + std::to_string(factor); } + list[num] = tmp_level_iterator; + if (is_smallest_key_changed) { level_iter_with_smallest_key = tmp_level_iterator; } @@ -7706,8 +7745,7 @@ InternalIterator* VersionSet::MakeInputIterator( ((max_num_non_existing_point_reads - num_non_existing_point_reads_in_last_level) * 1.0) / - c->num_input_files() / - num_entries_with_max_num_non_existing_point_reads; + c->max_num_entries_in_compaction(); level_iter_with_max_non_existing_point_reads ->SetAdjustedAvgNumPointQueries(adjusted_avg_num_point_queries); stats_log += " adjusting level_iter with " + @@ -7721,15 +7759,13 @@ InternalIterator* VersionSet::MakeInputIterator( ((max_num_non_existing_point_reads - num_non_existing_point_reads_in_last_level) * 1.0) / - c->num_input_files() / - num_entries_with_max_num_non_existing_point_reads); + c->max_num_entries_in_compaction()); stats_log += " adjusting pure iter with " + std::to_string(((max_num_non_existing_point_reads - num_non_existing_point_reads_in_last_level) * 1.0) / - c->num_input_files() / - num_entries_with_max_num_non_existing_point_reads) + + c->max_num_entries_in_compaction()) + ". "; } } @@ -7754,10 +7790,9 @@ InternalIterator* VersionSet::MakeInputIterator( std::to_string(meta_for_file_with_smallest_key->fd.GetNumber()) + " by " + std::to_string(num_non_existing_point_reads_for_file_with_smallest_key * - 1.0 / + 1.0 / c->num_input_files() / (meta_for_file_with_smallest_key->num_entries - - meta_for_file_with_smallest_key->num_range_deletions) / - c->num_input_files()); + meta_for_file_with_smallest_key->num_range_deletions)); } else { level_iter_with_smallest_key->SetAdjustedAvgNumPointQueriesForLeftmostFile( num_non_existing_point_reads_for_file_with_smallest_key * 1.0 / @@ -7769,19 +7804,18 @@ InternalIterator* VersionSet::MakeInputIterator( std::to_string(meta_for_file_with_smallest_key->fd.GetNumber()) + " by " + std::to_string(num_non_existing_point_reads_for_file_with_smallest_key * - 1.0 / + 1.0 / c->num_input_files() / (meta_for_file_with_smallest_key->num_entries - - meta_for_file_with_smallest_key->num_range_deletions) / - c->num_input_files()); + meta_for_file_with_smallest_key->num_range_deletions)); } if (level_iter_with_largest_key == nullptr) { list[iterator_index_with_largest_key]->SetAvgNumPointReads( list[iterator_index_with_largest_key]->GetAvgNumExistingPointReads() + num_non_existing_point_reads_for_file_with_largest_key * 1.0 / + c->num_input_files() / (meta_for_file_with_largest_key->num_entries - - meta_for_file_with_largest_key->num_range_deletions) / - c->num_input_files()); + meta_for_file_with_largest_key->num_range_deletions)); stats_log += " adjusting file (pure iter) with largest key: " + @@ -7794,9 +7828,9 @@ InternalIterator* VersionSet::MakeInputIterator( } else { level_iter_with_largest_key->SetAdjustedAvgNumPointQueriesForRightmostFile( num_non_existing_point_reads_for_file_with_largest_key * 1.0 / + c->num_input_files() / (meta_for_file_with_largest_key->num_entries - - meta_for_file_with_largest_key->num_range_deletions) / - c->num_input_files()); + meta_for_file_with_largest_key->num_range_deletions)); stats_log += " adjusting file (level iter) with largestkey: " + std::to_string(meta_for_file_with_largest_key->fd.GetNumber()) + diff --git a/db/version_set.h b/db/version_set.h index edcd32e..59e552a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -122,7 +122,11 @@ extern bool SomeFileOverlapsRange(const InternalKeyComparator& icmp, // arena: Arena used to allocate the memory extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, const std::vector& files, - Arena* arena); + Arena* arena, + uint64_t current_global_point_read_number, + double learning_rate, + int est_interval = -1, + double min_num_point_reads_ratio = 0.0); enum EpochNumberRequirement { kMightMissing, kMustPresent, @@ -738,7 +742,7 @@ class VersionStorageInfo { file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); } - void GenerateLevelFilesBrief(); + void GenerateLevelFilesBrief(const ImmutableOptions& ioptions); void GenerateLevel0NonOverlapping(); void GenerateBottommostFiles(); void GenerateFileLocationIndex(); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1f7f5c7..b6c7d81 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1461,7 +1461,7 @@ struct DBOptions { PointReadsTrackMethod point_reads_track_method = PointReadsTrackMethod::kNoTrack; uint8_t track_point_read_number_window_size = 64; - double point_read_learning_rate = 0.8; + double point_read_learning_rate = 0.2; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/table/compaction_merging_iterator.cc b/table/compaction_merging_iterator.cc index cde806a..acc541a 100644 --- a/table/compaction_merging_iterator.cc +++ b/table/compaction_merging_iterator.cc @@ -284,6 +284,7 @@ void CompactionMergingIterator::Next() { current_->iter.Next(); SetAvgNumPointReads(current_->iter.GetAvgNumPointReads()); SetAvgNumExistingPointReads(current_->iter.GetAvgNumExistingPointReads()); + SetIsDeepestLevel(current_->iter.IsDeepestLevel()); if (current_->iter.Valid()) { // current is still valid after the Next() call above. Call // replace_top() to restore the heap property. When the same child @@ -329,6 +330,7 @@ void CompactionMergingIterator::FindNextVisibleKey() { current->iter.Next(); SetAvgNumPointReads(current_->iter.GetAvgNumPointReads()); SetAvgNumExistingPointReads(current_->iter.GetAvgNumExistingPointReads()); + SetIsDeepestLevel(current_->iter.IsDeepestLevel()); if (current->iter.Valid()) { assert(current->iter.status().ok()); minHeap_.replace_top(current); diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 7e276f2..a6a765e 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -49,6 +49,12 @@ class InternalIteratorBase : public Cleanable { return avg_num_existing_point_reads_; } + bool IsDeepestLevel() const { return is_deepest_level_; } + + void SetIsDeepestLevel(bool is_deepest_level) { + is_deepest_level_ = is_deepest_level; + } + void SetAvgNumPointReads(double avg_num_point_reads) { avg_num_point_reads_ = avg_num_point_reads; } @@ -232,6 +238,7 @@ class InternalIteratorBase : public Cleanable { private: double avg_num_point_reads_ = 0.0; double avg_num_existing_point_reads_ = 0.0; + bool is_deepest_level_ = false; }; using InternalIterator = InternalIteratorBase; diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 7eb2363..7d66523 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -52,6 +52,14 @@ class IteratorWrapperBase { } } + bool IsDeepestLevel() const { + if (iter_) { + return iter_->IsDeepestLevel(); + } else { + return false; + } + } + void SetAvgNumPointReads(double avg_num_point_reads) { iter_->SetAvgNumPointReads(avg_num_point_reads); } @@ -60,6 +68,10 @@ class IteratorWrapperBase { iter_->SetAvgNumExistingPointReads(avg_num_existing_point_reads); } + void SetIsDeepestLevel(bool is_deepest_level) { + iter_->SetIsDeepestLevel(is_deepest_level); + } + // Set the underlying Iterator to _iter and return // previous underlying Iterator. InternalIteratorBase* Set(InternalIteratorBase* _iter) {