Skip to content

Commit

Permalink
[sys-4815] consistency check when replicating epoch number
Browse files Browse the repository at this point in the history
  • Loading branch information
seckcoder committed Sep 1, 2023
1 parent 112d771 commit 18d4334
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 120 deletions.
18 changes: 13 additions & 5 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ class ReplicationTest : public testing::Test {
protected:
std::shared_ptr<Logger> info_log_;
bool replicate_epoch_number_{true};
bool consistency_check_on_epoch_replication{true};
void resetFollowerSequence(int new_seq) {
followerSequence_ = new_seq;
}
Expand Down Expand Up @@ -454,10 +455,10 @@ size_t ReplicationTest::catchUpFollower(
DB::ApplyReplicationLogRecordInfo info;
size_t ret = 0;
unsigned flags = DB::AR_EVICT_OBSOLETE_FILES;
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
if (replicate_epoch_number_) {
flags |= DB::AR_REPLICATE_EPOCH_NUM;
} else {
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
flags |= DB::AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION;
}
for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) {
if (num_records && ret >= *num_records) {
Expand Down Expand Up @@ -1139,13 +1140,14 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) {
}

class ReplicationTestWithParam : public ReplicationTest,
public testing::WithParamInterface<bool> {
public testing::WithParamInterface<std::pair<bool, bool>> {
public:
ReplicationTestWithParam()
: ReplicationTest() {}

void SetUp() override {
replicate_epoch_number_ = GetParam();
std::tie(replicate_epoch_number_, consistency_check_on_epoch_replication) =
GetParam();
}
};

Expand Down Expand Up @@ -1231,7 +1233,13 @@ TEST_P(ReplicationTestWithParam, Stress) {
}

INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
::testing::Values(false, true));
::testing::ValuesIn(std::vector<std::pair<bool, bool>>{
// don't replicate epoch
{false, true},
// replicate epoch but no consistency check
{true, false},
// replicate epoch and do consistency check
{true, true}}));

TEST_F(ReplicationTest, DeleteRange) {
auto leader = openLeader();
Expand Down
314 changes: 200 additions & 114 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1427,129 +1427,50 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
edit_lists.push_back(std::move(el));
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
DescribeVersionEdit(e, cfd).c_str());
auto& newFiles = e.GetNewFiles();
bool epoch_recovery_succeeded = true;
std::ostringstream err_oss;
if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e.GetDeletedFiles();
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
epoch_recovery_succeeded = false;
err_oss << "newly flushed file: " << p.first << " is not at L0";
break;
}
}
if (!s.ok()) {
break;
}
if (flags & AR_REPLICATE_EPOCH_NUM) {
// replicate epoch number on follower

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for(auto& p: newFiles) {
added_files.push_back(&p.second);
}
s = CheckNextEpochNumberConsistency(e, cfd);
if (!s.ok()) {
break;
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
(flags & AR_RESET_IF_EPOCH_MISMATCH)) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number >
cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() ==
max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] unexpected epoch number: %" PRIu64
" for file: %" PRIu64
" ; max epoch number: %" PRIu64,
cfd->GetName().c_str(),
first_file->epoch_number,
first_file->fd.GetNumber(),
max_epoch_number);
s = Status::Corruption("unexpected epoch number for added file");
break;
}
}
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

for (auto meta: added_files) {
auto old_epoch_number = meta->epoch_number;
meta->epoch_number = cfd->NewEpochNumber();
if (old_epoch_number != meta->epoch_number) {
info->mismatched_epoch_num += 1;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number =
std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
err_oss << "deleted file: " << file_number
<< " at level: " << level << " not found";
break;
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) {
if (deletedFiles.empty() && !newFiles.empty()) {
// Set next epoch number properly before epoch number consistency check.
// This is necessary if next_epoch_number changes during db reopen.
cfd->SetNextEpochNumber(newFiles.begin()->second.epoch_number);
}

for (auto& p: newFiles) {
auto old_epoch_number = p.second.epoch_number;
p.second.epoch_number = min_input_epoch_number;
if (old_epoch_number != p.second.epoch_number) {
info->mismatched_epoch_num += 1;
}
// do consistency check by comparing the replicated epoch number against
// inferred epoch number
s = InferEpochNumber(&e, cfd, info,
false /* reset_next_epoch_number */);
if (s.ok() && info->mismatched_epoch_num > 0) {
s = Status::Corruption("epoch number consistency check fails");
}
}
} else if (newFiles.size() > 0) {
// Maintain next epoch number on follower
auto next_epoch_number = cfd->GetNextEpochNumber();
for (auto& p : newFiles) {
auto epoch_number = p.second.epoch_number;
// advance next epoch number. next_epoch_number never goes
// backwards
if (epoch_number != kUnknownEpochNumber &&
(epoch_number >= next_epoch_number)) {
next_epoch_number = epoch_number + 1;
if (!s.ok()) {
break;
}
}
cfd->SetNextEpochNumber(next_epoch_number);
}

if (!epoch_recovery_succeeded) {
s = Status::Corruption(err_oss.str());
break;
// Maintain next epoch number on follower
if (deletedFiles.empty() && !newFiles.empty()) {
cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1);
}
} else {
// infer epoch number on follower
s = InferEpochNumber(&e, cfd, info,
flags & AR_RESET_IF_EPOCH_MISMATCH);
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
Expand Down Expand Up @@ -1626,6 +1547,171 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
return s;
}

Status DBImpl::InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd,
ApplyReplicationLogRecordInfo* info,
bool reset_next_epoch_number) {
auto& newFiles = e->GetNewFiles();
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e->GetDeletedFiles();
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"[%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d",
cfd->GetName().c_str(), p.second.fd.GetNumber(), p.first);
return Status::Corruption("Newly flushed file is not at L0");
}
}

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for (auto& p : newFiles) {
added_files.push_back(&p.second);
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if next_epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
reset_next_epoch_number) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number > cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
// Not safe to rewind/advance `next_epoch_number`. This can happen
// when we do epoch recovery during db open (i.e., nodes run
// with different rocksdb versions and nodes upgrading from old version
// to new version need to recover epoch). Poison is the best we can do
return Status::Poison("Poison due to diverged next epoch number");
}
}

for (auto meta : added_files) {
auto replicated_epoch_number = meta->epoch_number;
auto inferred_epoch_number = cfd->NewEpochNumber();
if (replicated_epoch_number != inferred_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), meta->fd.GetNumber(),
replicated_epoch_number, inferred_epoch_number);
info->mismatched_epoch_num += 1;
meta->epoch_number = inferred_epoch_number;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number = std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] deleted file: %" PRIu64 " at level: %d not found",
cfd->GetName().c_str(), file_number, level);
return Status::Corruption("Deleted file not found");
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
}

for (auto& p : newFiles) {
auto replicated_epoch_number = p.second.epoch_number;
if (replicated_epoch_number != min_input_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), p.second.fd.GetNumber(),
replicated_epoch_number, min_input_epoch_number);
info->mismatched_epoch_num += 1;
p.second.epoch_number = min_input_epoch_number;
}
}
}
return Status::OK();
}

Status DBImpl::CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd) {
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

if (deletedFiles.empty() && !newFiles.empty()) {
// Case 1: new files generated after flushing.
// New files should be sorted by epoch number
for (size_t i = 0; i + 1 < newFiles.size(); i++) {
if (newFiles[i].second.epoch_number >= newFiles[i+1].second.epoch_number) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] unexpected epoch number ordering for file: %" PRIu64
": %" PRIu64 " and file: %" PRIu64 ": %" PRIu64,
cfd->GetName().c_str(), newFiles[i].second.fd.GetNumber(),
newFiles[i].second.epoch_number,
newFiles[i + 1].second.fd.GetNumber(),
newFiles[i + 1].second.epoch_number);
return Status::Corruption("New L0 files not sorted by epoch number");
}
}

if (newFiles.begin()->second.epoch_number < cfd->GetNextEpochNumber()) {
// If we need to rewind next epoch number during epoch replication, let's
// make sure it doesn't break epoch number consistency
auto max_epoch_number = cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (newFiles.begin()->second.epoch_number <= max_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Out of order epoch number for file: %" PRIu64
":%" PRIu64 "; max epoch number: %" PRIu64,
cfd->GetName().c_str(),
newFiles.begin()->second.fd.GetNumber(),
newFiles.begin()->second.epoch_number, max_epoch_number);
return Status::Corruption("Out of order epoch number for flush");
}
} // Otherwise, advance next epoch number
} else if (!newFiles.empty()) {
// Case 2: compaction.
// New files should all have the same epoch number and it's smaller than next_epoch_number
auto next_epoch_number = cfd->GetNextEpochNumber();
for (auto& f: newFiles) {
if (f.second.epoch_number >= next_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Out of order epoch number for file: %" PRIu64
":%" PRIu64 "; next epoch number: %" PRIu64,
cfd->GetName().c_str(), f.second.fd.GetNumber(),
f.second.epoch_number, next_epoch_number);
return Status::Corruption("Out of order epoch number for compaction");
}
}
}
return Status::OK();
}

Status DBImpl::GetReplicationRecordDebugString(
const ReplicationLogRecord& record, std::string* out) const {
std::ostringstream oss;
Expand Down
Loading

0 comments on commit 18d4334

Please sign in to comment.