Skip to content

Commit

Permalink
Remove epoch calculation code
Browse files Browse the repository at this point in the history
  • Loading branch information
seckcoder committed Oct 2, 2023
1 parent 832da87 commit 7898de3
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 202 deletions.
28 changes: 1 addition & 27 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,6 @@ size_t ReplicationTest::catchUpFollower(
DB::ApplyReplicationLogRecordInfo info;
size_t ret = 0;
unsigned flags = DB::AR_EVICT_OBSOLETE_FILES;
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
if (replicate_epoch_number_) {
flags |= DB::AR_REPLICATE_EPOCH_NUM;
flags |= DB::AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION;
}
for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) {
if (num_records && ret >= *num_records) {
break;
Expand Down Expand Up @@ -1139,19 +1134,7 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) {
static_cast_with_check<DBImpl>(follower)->TEST_table_cache()->GetUsage());
}

class ReplicationTestWithParam : public ReplicationTest,
public testing::WithParamInterface<std::pair<bool, bool>> {
public:
ReplicationTestWithParam()
: ReplicationTest() {}

void SetUp() override {
std::tie(replicate_epoch_number_, consistency_check_on_epoch_replication) =
GetParam();
}
};

TEST_P(ReplicationTestWithParam, Stress) {
TEST_F(ReplicationTest, Stress) {
std::string val;
auto leader = openLeader();
openFollower();
Expand Down Expand Up @@ -1232,15 +1215,6 @@ TEST_P(ReplicationTestWithParam, Stress) {
verifyNextEpochNumber();
}

INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
::testing::ValuesIn(std::vector<std::pair<bool, bool>>{
// don't replicate epoch
{false, true},
// replicate epoch but no consistency check
{true, false},
// replicate epoch and do consistency check
{true, true}}));

TEST_F(ReplicationTest, DeleteRange) {
auto leader = openLeader();
openFollower();
Expand Down
162 changes: 10 additions & 152 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1430,48 +1430,18 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
if (!s.ok()) {
break;
}
if (flags & AR_REPLICATE_EPOCH_NUM) {
// replicate epoch number on follower

s = CheckNextEpochNumberConsistency(e, cfd);
if (!s.ok()) {
break;
}

auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) {
if (deletedFiles.empty() && !newFiles.empty()) {
// Set next epoch number properly before epoch number consistency check.
// This is necessary if next_epoch_number changes during db reopen.
cfd->SetNextEpochNumber(newFiles.begin()->second.epoch_number);
}
s = CheckNextEpochNumberConsistency(e, cfd);
if (!s.ok()) {
break;
}

// do consistency check by comparing the replicated epoch number
// against inferred epoch number No need to
// `reset_next_epoch_number` here since we have already done it
s = InferEpochNumber(&e, cfd, info,
false /* reset_next_epoch_number */);
if (s.ok() && info->mismatched_epoch_num > 0) {
s = Status::Poison("epoch number consistency check fails");
}
if (!s.ok()) {
break;
}
}
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

// Maintain next epoch number on follower
if (deletedFiles.empty() && !newFiles.empty()) {
cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1);
}
} else {
// infer epoch number on follower
s = InferEpochNumber(&e, cfd, info,
flags & AR_RESET_IF_EPOCH_MISMATCH);
if (!s.ok()) {
break;
}
// Maintain next epoch number on follower
if (deletedFiles.empty() && !newFiles.empty()) {
cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1);
}
}
if (!s.ok()) {
Expand Down Expand Up @@ -1548,119 +1518,7 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
return s;
}

Status DBImpl::InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd,
ApplyReplicationLogRecordInfo* info,
bool reset_next_epoch_number) {
auto& newFiles = e->GetNewFiles();
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e->GetDeletedFiles();
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"[%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d",
cfd->GetName().c_str(), p.second.fd.GetNumber(), p.first);
return Status::Corruption("Newly flushed file is not at L0");
}
}

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for (auto& p : newFiles) {
added_files.push_back(&p.second);
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if next_epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
reset_next_epoch_number) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number > cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
// Not safe to rewind/advance `next_epoch_number`. This can happen
// when we do epoch recovery during db open (i.e., nodes run
// with different rocksdb versions and nodes upgrading from old version
// to new version need to recover epoch). Poison is the best we can do
return Status::Poison("Poison due to diverged next epoch number");
}
}

for (auto meta : added_files) {
auto replicated_epoch_number = meta->epoch_number;
auto inferred_epoch_number = cfd->NewEpochNumber();
if (replicated_epoch_number != inferred_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), meta->fd.GetNumber(),
replicated_epoch_number, inferred_epoch_number);
info->mismatched_epoch_num += 1;
meta->epoch_number = inferred_epoch_number;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number = std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] deleted file: %" PRIu64 " at level: %d not found",
cfd->GetName().c_str(), file_number, level);
return Status::Corruption("Deleted file not found");
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
}

for (auto& p : newFiles) {
auto replicated_epoch_number = p.second.epoch_number;
if (replicated_epoch_number != min_input_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), p.second.fd.GetNumber(),
replicated_epoch_number, min_input_epoch_number);
info->mismatched_epoch_num += 1;
p.second.epoch_number = min_input_epoch_number;
}
}
}
return Status::OK();
}

Status DBImpl::CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd) {
Status DBImpl::CheckNextEpochNumberConsistency(const VersionEdit& e, ColumnFamilyData* cfd) {
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

Expand Down
7 changes: 1 addition & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,10 @@ class DBImpl : public DB {
ApplyReplicationLogRecordInfo* info,
unsigned flags) override;

// Calculate epoch number on follower
Status InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd,
ApplyReplicationLogRecordInfo* info,
bool reset_next_epoch_number);

// Check that replicated epoch number of newly flushed files >= cfd's next
// epoch number.
// TODO: make `VersionEdit` const
Status CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd);
Status CheckNextEpochNumberConsistency(const VersionEdit& e, ColumnFamilyData* cfd);
Status GetReplicationRecordDebugString(
const ReplicationLogRecord& record, std::string* out) const override;
Status GetPersistedReplicationSequence(std::string* out) override;
Expand Down
1 change: 0 additions & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ class VersionEdit {
// Retrieve the table files added as well as their associated levels.
using NewFiles = std::vector<std::pair<int, FileMetaData>>;
const NewFiles& GetNewFiles() const { return new_files_; }
NewFiles& GetNewFiles() { return new_files_; }

// Retrieve all the compact cursors
using CompactCursors = std::vector<std::pair<int, InternalKey>>;
Expand Down
16 changes: 0 additions & 16 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1326,22 +1326,6 @@ class DB {
// REQUIRES: info needs to be provided, can't be nullptr.
enum ApplyReplicationLogRecordFlags : unsigned {
AR_EVICT_OBSOLETE_FILES = 1U << 0,
// If set, replicate epoch number instead of calculating the number when
// applying `kManifestWrite`
AR_REPLICATE_EPOCH_NUM = 1U << 1,
// If set, rewind/advance `next_epoch_number` if mismatches found.
// Rocksdb doesn't track `next_epoch_number` in manifest file. When db is
// reopened, it calculates the `next_epoch_number` based on max epoch number
// of existing live files. So it's possible for the `next_epoch_number` to
// go backwards. Following two cases are possible:
// 1. leader reopens db, causing `next_epoch_number` on leader to go
// backwards. So follower needs to rewind it.
// 2. follower reopens db, causing `next_epoch_number` on follower to go
// backwards. So follower needs to advance it
AR_RESET_IF_EPOCH_MISMATCH = 1U << 2,

// Check the consistency of epoch number during replication
AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION = 1U << 3
};
using CFOptionsFactory = std::function<ColumnFamilyOptions(Slice)>;
virtual Status ApplyReplicationLogRecord(ReplicationLogRecord record,
Expand Down

0 comments on commit 7898de3

Please sign in to comment.