Skip to content

Commit

Permalink
snaphot num_point_reads before compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
littlepig2013 committed Oct 25, 2024
1 parent ad4b9c3 commit 973a776
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 144 deletions.
7 changes: 7 additions & 0 deletions db/bpk_alloc_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
tmp_num_entries_in_filter_by_file;
workload_aware_num_entries_with_empty_queries_ +=
tmp_num_entries_in_filter_by_file;
if (file_meta->filter_size != 0) {
file_meta->bpk =
file_meta->filter_size * 8 /
(file_meta->num_entries - file_meta->num_range_deletions);
} else {
file_meta->bpk = 0;
}
file_workload_state_pq_.push(
FileWorkloadState(tmp_num_entries_in_filter_by_file,
num_empty_point_reads, file_meta));
Expand Down
89 changes: 62 additions & 27 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,46 +376,81 @@ Compaction::Compaction(
// calculate max_num_entries_in_output_levels used in monkey allocation
max_num_entries_in_compaction_ = 0;
max_num_entries_in_output_level_ = 0;
min_avg_num_point_reads_ = std::numeric_limits<double>::max();
existing_entries_in_output_level_ = 0;
min_avg_num_point_reads_from_upper_level_ = 0;
max_avg_num_point_reads_ = 0;
agg_total_num_point_reads_ = vstorage->GetAccumulatedNumPointReads();

std::vector<size_t> num_entries_per_level(num_input_levels(), 0);
double avg_num_point_reads_per_lvl0_file = 0.0;
for (size_t which = 0; which < num_input_levels(); which++) {
if (inputs_[which].level == 0) {
avg_num_point_reads_per_lvl0_file =
vstorage->GetAvgNumPointReadsPerLvl0File();
} else {
avg_num_point_reads_per_lvl0_file = 0.0;
}
DoGenerateLevelFilesBrief(&input_levels_[which], inputs_[which].files,
&arena_);
&arena_, agg_total_num_point_reads_,
immutable_options_.point_read_learning_rate, -1,
avg_num_point_reads_per_lvl0_file);
num_input_files_ += inputs_[which].files.size();
for (FileMetaData* meta : inputs_[which].files) {
num_entries_per_level[which] +=
meta->num_entries - meta->num_range_deletions;
}
max_num_entries_in_compaction_ += num_entries_per_level[which];
}

uint64_t tmp_entries = 0;
// use the minimum avg point reads per file to represent the avg point reads
// per sorted run and choose the maximum among them
for (size_t which = 0; which < num_input_levels(); which++) {
if (inputs_[which].level != output_level_) {
for (FileMetaData* meta : inputs_[which].files) {
tmp_entries = meta->num_entries - meta->num_range_deletions;
max_num_entries_in_compaction_ += tmp_entries;
max_num_entries_in_output_level_ += tmp_entries;
auto result = meta->stats.GetEstimatedNumPointReads(
vstorage->GetAccumulatedNumPointReads(),
immutable_options_.point_read_learning_rate);
if (inputs_[which].level == 0) {
min_avg_num_point_reads_ =
std::min(min_avg_num_point_reads_,
result.first * 1.0 /
(tmp_entries * inputs_[which].files.size() +
num_input_levels() - 1));
} else {
min_avg_num_point_reads_ = std::min(
min_avg_num_point_reads_, result.first * 1.0 / tmp_entries);
max_num_entries_in_output_level_ += num_entries_per_level[which];
if (inputs_[which].level == 0) {
for (size_t i = 0; i < input_levels_[which].num_files; i++) {
tmp_entries = inputs_[which].files[i]->num_entries -
inputs_[which].files[i]->num_range_deletions;
max_avg_num_point_reads_ = std::max(
max_avg_num_point_reads_,
input_levels_[which].files[i].snapshot_num_point_reads * 1.0 /
tmp_entries);
;
min_avg_num_point_reads_from_upper_level_ = std::max(
min_avg_num_point_reads_from_upper_level_,
input_levels_[which].files[i].snapshot_num_point_reads * 1.0 /
tmp_entries / inputs_[which].files.size());
}
} else {
double temp_avg_num_point_reads_from_upper_level_ =
std::numeric_limits<double>::max();
if (inputs_[which].files.size() == 0)
temp_avg_num_point_reads_from_upper_level_ = 0;
for (size_t i = 0; i < input_levels_[which].num_files; i++) {
tmp_entries = inputs_[which].files[i]->num_entries -
inputs_[which].files[i]->num_range_deletions;
max_avg_num_point_reads_ = std::max(
max_avg_num_point_reads_,
input_levels_[which].files[i].snapshot_num_point_reads * 1.0 /
tmp_entries);
;
temp_avg_num_point_reads_from_upper_level_ = std::min(
temp_avg_num_point_reads_from_upper_level_,
input_levels_[which].files[i].snapshot_num_point_reads *
num_entries_per_level[which] * 1.0 /
(tmp_entries * inputs_[which].files.size() *
max_num_entries_in_compaction_));
}
min_avg_num_point_reads_from_upper_level_ =
std::max(min_avg_num_point_reads_from_upper_level_,
temp_avg_num_point_reads_from_upper_level_);
}
} else {
for (FileMetaData* meta : inputs_[which].files) {
tmp_entries = meta->num_entries - meta->num_range_deletions;
max_num_entries_in_compaction_ += tmp_entries;
auto result = meta->stats.GetEstimatedNumPointReads(
vstorage->GetAccumulatedNumPointReads(),
immutable_options_.point_read_learning_rate);
min_avg_num_point_reads_ = std::min(min_avg_num_point_reads_,
result.first * 1.0 / tmp_entries);
}
existing_entries_in_output_level_ = num_entries_per_level[which];
}
}

if (input_vstorage_->GetBitsPerKeyAllocationType() ==
BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc) {
for (const FileMetaData* meta :
Expand Down
25 changes: 22 additions & 3 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ class Compaction {
return max_num_entries_in_output_level_;
}

uint64_t existing_entries_in_output_level() const {
return existing_entries_in_output_level_;
}

uint64_t max_num_entries_in_compaction() const {
return max_num_entries_in_compaction_;
}
Expand Down Expand Up @@ -452,7 +456,17 @@ class Compaction {
return avg_num_existing_point_reads_with_naiive_track_;
}

double GetMinAvgNumPointReads() const { return min_avg_num_point_reads_; }
double GetMaxAvgNumPointReads() const { return max_avg_num_point_reads_; }

double GetMinAvgNumPointReadsUpperLevel() const {
return min_avg_num_point_reads_from_upper_level_;
}
// double GetMinAvgNumPointReadsDeepestLevel() const { return
// min_avg_num_point_reads_from_deepest_level_; }

uint64_t GetSnapshotAggTotalNumPointReads() const {
return agg_total_num_point_reads_;
}

private:
void SetInputVersion(Version* input_version);
Expand Down Expand Up @@ -514,6 +528,7 @@ class Compaction {
Arena arena_; // Arena used to allocate space for file_levels_

uint64_t max_num_entries_in_output_level_;
uint64_t existing_entries_in_output_level_;
uint64_t max_num_entries_in_compaction_;
const uint32_t output_path_id_;
CompressionType output_compression_;
Expand Down Expand Up @@ -595,10 +610,14 @@ class Compaction {
PenultimateOutputRangeType::kNotSupported;

uint64_t num_input_files_;
uint64_t agg_total_num_point_reads_;
uint64_t avg_num_point_reads_with_naiive_track_;
uint64_t avg_num_existing_point_reads_with_naiive_track_;
double min_avg_num_point_reads_; // the inheritance method may underestimate
// the real number of point reads
double
min_avg_num_point_reads_from_upper_level_; // the inheritance method may
// underestimate the real
// number of point reads
double max_avg_num_point_reads_;
};

#ifndef NDEBUG
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SequenceIterWrapper : public InternalIterator {
inner_iter_->Next();
SetAvgNumPointReads(inner_iter_->GetAvgNumPointReads());
SetAvgNumExistingPointReads(inner_iter_->GetAvgNumExistingPointReads());
SetIsDeepestLevel(inner_iter_->IsDeepestLevel());
}
void Seek(const Slice& target) override {
if (!need_count_entries_) {
Expand Down Expand Up @@ -280,6 +281,7 @@ class CompactionIterator {
double GetAvgNumExistingPointReads() const {
return input_.GetAvgNumExistingPointReads();
}
bool IsDeepestLevel() const { return input_.IsDeepestLevel(); }

bool IsDeleteRangeSentinelKey() const { return is_range_del_; }

Expand Down
39 changes: 32 additions & 7 deletions db/compaction/compaction_outputs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Status CompactionOutputs::Finish(
assert(meta != nullptr);
Status s = intput_status;
uint64_t min_num_point_reads = 0;
uint64_t num_point_reads = 0;
if (s.ok()) {
std::string seqno_to_time_mapping_str;
seqno_to_time_mapping.Encode(
Expand All @@ -41,12 +42,26 @@ Status CompactionOutputs::Finish(
meta->num_entries = builder_->NumEntries();
if (compaction_->immutable_options()->point_reads_track_method ==
kDynamicCompactionAwareTrack) {
uint64_t num_point_reads =
num_point_reads =
std::min(compaction_->input_vstorage()->GetAccumulatedNumPointReads(),
(uint64_t)round(current_output().agg_num_point_reads));
min_num_point_reads = (uint64_t)round(
compaction_->GetMinAvgNumPointReads() * meta->num_entries);
num_point_reads = std::max(min_num_point_reads, num_point_reads);
uint64_t agg_num_entries_from_upper_level =
current_output().agg_num_entries_from_upper_level;
min_num_point_reads =
round(compaction_->GetMinAvgNumPointReadsUpperLevel() *
agg_num_entries_from_upper_level);
if (meta->num_entries > agg_num_entries_from_upper_level) {
min_num_point_reads = std::max(
min_num_point_reads,
(uint64_t)round(
current_output().min_curr_avg_point_reads_from_deepest_level *
(meta->num_entries - agg_num_entries_from_upper_level)));
}
num_point_reads = std::max(num_point_reads, min_num_point_reads);
double avg_num_point_reads =
std::min(num_point_reads * 1.0 / meta->num_entries,
compaction_->GetMaxAvgNumPointReads());
num_point_reads = round(avg_num_point_reads * meta->num_entries);

file_point_read_inc(meta, num_point_reads);
file_existing_point_read_inc(
Expand Down Expand Up @@ -77,13 +92,14 @@ Status CompactionOutputs::Finish(
" in level %d"
" (num_point_reads=%" PRIu64 ", num_existing_point_reads=%" PRIu64
" , min_num_point_reads=%" PRIu64
", avg_min_num_point_reads %.4f) with reset bits-per-key %.4f",
", num_entries_from_upper_level=%" PRIu64
") with reset bits-per-key %.4f",
compaction_->column_family_data()->GetName().c_str(),
meta->fd.GetNumber(), compaction_->output_level(),
meta->stats.num_point_reads.load(std::memory_order_relaxed),
meta->stats.num_existing_point_reads.load(std::memory_order_relaxed),
min_num_point_reads, compaction_->GetMinAvgNumPointReads(),
new_bits_per_key);
min_num_point_reads,
current_output().agg_num_entries_from_upper_level, new_bits_per_key);
} else {
ROCKS_LOG_INFO(
compaction_->immutable_options()->info_log,
Expand Down Expand Up @@ -477,6 +493,15 @@ Status CompactionOutputs::AddToOutput(
current_output().agg_num_existing_point_reads +=
c_iter.GetAvgNumExistingPointReads();
}
if (!c_iter.IsDeepestLevel()) {
current_output().agg_num_entries_from_upper_level++;
current_output().agg_num_point_reads_from_upper_level +=
c_iter.GetAvgNumPointReads();
} else {
current_output().min_curr_avg_point_reads_from_deepest_level =
std::min(current_output().min_curr_avg_point_reads_from_deepest_level,
c_iter.GetAvgNumPointReads());
}

if (!s.ok()) {
return s;
Expand Down
6 changes: 6 additions & 0 deletions db/compaction/compaction_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#pragma once

#include <unordered_map>

#include "db/blob/blob_garbage_meter.h"
#include "db/bpk_alloc_helper.h"
#include "db/compaction/compaction.h"
Expand Down Expand Up @@ -42,7 +44,11 @@ class CompactionOutputs {
bool finished;
std::shared_ptr<const TableProperties> table_properties;
double agg_num_point_reads = 0.0;
double agg_num_point_reads_from_upper_level = 0.0;
double agg_num_existing_point_reads = 0.0;
uint64_t agg_num_entries_from_upper_level = 0;
double min_curr_avg_point_reads_from_deepest_level =
std::numeric_limits<double>::max();
};

CompactionOutputs() = delete;
Expand Down
28 changes: 7 additions & 21 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,8 @@ InternalIterator* TableCache::NewIterator(
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
uint8_t block_protection_bytes_per_key, const SequenceNumber* read_seqno,
TruncatedRangeDelIterator** range_del_iter,
uint64_t current_global_num_point_reads_counter,
double avg_lvl0_num_point_reads_ratio) {
TruncatedRangeDelIterator** range_del_iter, uint64_t est_num_point_reads,
uint64_t est_num_existing_point_reads) {
PERF_TIMER_GUARD(new_table_iterator_nanos);

Status s;
Expand Down Expand Up @@ -270,25 +269,12 @@ InternalIterator* TableCache::NewIterator(
file_meta.num_entries - file_meta.num_range_deletions;
if (num_entries_in_filter > 0 && (ioptions_.point_reads_track_method ==
kDynamicCompactionAwareTrack)) {
uint64_t min_num_point_reads = 0;
if (level == 0) {
min_num_point_reads =
round(file_meta.stats.start_global_point_read_number *
avg_lvl0_num_point_reads_ratio);
}
std::pair<uint64_t, uint64_t> estimated_num_point_read_stats =
file_meta.stats.GetEstimatedNumPointReads(
current_global_num_point_reads_counter,
ioptions_.point_read_learning_rate, -1, min_num_point_reads);
uint64_t num_point_reads = estimated_num_point_read_stats.first;
uint64_t num_existing_point_reads =
estimated_num_point_read_stats.second;
if (num_point_reads > 0) {
result->SetAvgNumPointReads(num_point_reads * 1.0 /
if (est_num_point_reads > 0) {
result->SetAvgNumPointReads(est_num_point_reads * 1.0 /
num_entries_in_filter);
if (num_existing_point_reads > 0) {
result->SetAvgNumExistingPointReads(num_existing_point_reads * 1.0 /
num_entries_in_filter);
if (est_num_existing_point_reads > 0) {
result->SetAvgNumExistingPointReads(est_num_existing_point_reads *
1.0 / num_entries_in_filter);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class TableCache {
uint8_t protection_bytes_per_key,
const SequenceNumber* range_del_read_seqno = nullptr,
TruncatedRangeDelIterator** range_del_iter = nullptr,
uint64_t current_global_num_point_reads_counter = 0,
double avg_lvl0_num_point_reads_ratio = 1.0);
uint64_t est_num_point_reads = 0,
uint64_t est_num_existing_point_reads = 0);

// If a seek to internal key "k" in specified file finds an entry,
// call get_context->SaveValue() repeatedly until
Expand Down
26 changes: 24 additions & 2 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ struct FileSampledStats {
}
est_num_point_reads =
std::max(est_num_point_reads, min_num_point_reads + GetNumPointReads());
est_num_point_reads =
std::min(est_num_point_reads, current_global_point_read_number);
double est_existing_ratio = 0.0;
uint64_t num_existing_point_reads_in_window = 0;
if (global_point_read_number_window.size() == 64) {
Expand Down Expand Up @@ -550,16 +552,36 @@ struct FdWithKeyRange {
FileMetaData* file_metadata; // Point to all metadata
Slice smallest_key; // slice that contain smallest key
Slice largest_key; // slice that contain largest key
uint64_t snapshot_num_point_reads;
uint64_t snapshot_num_existing_point_reads;

FdWithKeyRange()
: fd(), file_metadata(nullptr), smallest_key(), largest_key() {}
: fd(),
file_metadata(nullptr),
smallest_key(),
largest_key(),
snapshot_num_point_reads(0),
snapshot_num_existing_point_reads(0) {}

FdWithKeyRange(FileDescriptor _fd, Slice _smallest_key, Slice _largest_key,
FileMetaData* _file_metadata)
: fd(_fd),
file_metadata(_file_metadata),
smallest_key(_smallest_key),
largest_key(_largest_key) {}
largest_key(_largest_key),
snapshot_num_point_reads(0),
snapshot_num_existing_point_reads(0) {}

void InitNumPointReadStats(uint64_t global_num_point_read_counter,
double learning_rate, int est_interval,
uint64_t min_point_reads) {
if (file_metadata) {
std::tie(snapshot_num_point_reads, snapshot_num_existing_point_reads) =
file_metadata->stats.GetEstimatedNumPointReads(
global_num_point_read_counter, learning_rate, est_interval,
min_point_reads);
}
}
};

// Data structure to store an array of FdWithKeyRange in one level
Expand Down
Loading

0 comments on commit 973a776

Please sign in to comment.