Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sys-4815] consistency check when replicating epoch number #283

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ class ReplicationTest : public testing::Test {
protected:
std::shared_ptr<Logger> info_log_;
bool replicate_epoch_number_{true};
bool consistency_check_on_epoch_replication{true};
void resetFollowerSequence(int new_seq) {
followerSequence_ = new_seq;
}
Expand Down Expand Up @@ -454,10 +455,10 @@ size_t ReplicationTest::catchUpFollower(
DB::ApplyReplicationLogRecordInfo info;
size_t ret = 0;
unsigned flags = DB::AR_EVICT_OBSOLETE_FILES;
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
if (replicate_epoch_number_) {
flags |= DB::AR_REPLICATE_EPOCH_NUM;
} else {
flags |= DB::AR_RESET_IF_EPOCH_MISMATCH;
flags |= DB::AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION;
}
for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) {
if (num_records && ret >= *num_records) {
Expand Down Expand Up @@ -1139,13 +1140,14 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) {
}

class ReplicationTestWithParam : public ReplicationTest,
public testing::WithParamInterface<bool> {
public testing::WithParamInterface<std::pair<bool, bool>> {
public:
ReplicationTestWithParam()
: ReplicationTest() {}

void SetUp() override {
replicate_epoch_number_ = GetParam();
std::tie(replicate_epoch_number_, consistency_check_on_epoch_replication) =
GetParam();
}
};

Expand Down Expand Up @@ -1231,7 +1233,13 @@ TEST_P(ReplicationTestWithParam, Stress) {
}

INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam,
::testing::Values(false, true));
::testing::ValuesIn(std::vector<std::pair<bool, bool>>{
// don't replicate epoch
{false, true},
// replicate epoch but no consistency check
{true, false},
// replicate epoch and do consistency check
{true, true}}));

TEST_F(ReplicationTest, DeleteRange) {
auto leader = openLeader();
Expand Down
314 changes: 200 additions & 114 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1427,129 +1427,50 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
edit_lists.push_back(std::move(el));
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
DescribeVersionEdit(e, cfd).c_str());
auto& newFiles = e.GetNewFiles();
bool epoch_recovery_succeeded = true;
std::ostringstream err_oss;
if (!(flags & AR_REPLICATE_EPOCH_NUM)) {
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e.GetDeletedFiles();
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
epoch_recovery_succeeded = false;
err_oss << "newly flushed file: " << p.first << " is not at L0";
break;
}
}
if (!s.ok()) {
break;
}
if (flags & AR_REPLICATE_EPOCH_NUM) {
// replicate epoch number on follower

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for(auto& p: newFiles) {
added_files.push_back(&p.second);
}
s = CheckNextEpochNumberConsistency(e, cfd);
if (!s.ok()) {
break;
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
(flags & AR_RESET_IF_EPOCH_MISMATCH)) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number >
cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() ==
max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(),
cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] unexpected epoch number: %" PRIu64
" for file: %" PRIu64
" ; max epoch number: %" PRIu64,
cfd->GetName().c_str(),
first_file->epoch_number,
first_file->fd.GetNumber(),
max_epoch_number);
s = Status::Corruption("unexpected epoch number for added file");
break;
}
}
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

for (auto meta: added_files) {
auto old_epoch_number = meta->epoch_number;
meta->epoch_number = cfd->NewEpochNumber();
if (old_epoch_number != meta->epoch_number) {
info->mismatched_epoch_num += 1;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number =
std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
err_oss << "deleted file: " << file_number
<< " at level: " << level << " not found";
break;
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) {
if (deletedFiles.empty() && !newFiles.empty()) {
// Set next epoch number properly before epoch number consistency check.
// This is necessary if next_epoch_number changes during db reopen.
cfd->SetNextEpochNumber(newFiles.begin()->second.epoch_number);
}

for (auto& p: newFiles) {
auto old_epoch_number = p.second.epoch_number;
p.second.epoch_number = min_input_epoch_number;
if (old_epoch_number != p.second.epoch_number) {
info->mismatched_epoch_num += 1;
}
// do consistency check by comparing the replicated epoch number against
// inferred epoch number
s = InferEpochNumber(&e, cfd, info,
false /* reset_next_epoch_number */);
if (s.ok() && info->mismatched_epoch_num > 0) {
s = Status::Corruption("epoch number consistency check fails");
}
}
} else if (newFiles.size() > 0) {
// Maintain next epoch number on follower
auto next_epoch_number = cfd->GetNextEpochNumber();
for (auto& p : newFiles) {
auto epoch_number = p.second.epoch_number;
// advance next epoch number. next_epoch_number never goes
// backwards
if (epoch_number != kUnknownEpochNumber &&
(epoch_number >= next_epoch_number)) {
next_epoch_number = epoch_number + 1;
if (!s.ok()) {
break;
}
}
cfd->SetNextEpochNumber(next_epoch_number);
}

if (!epoch_recovery_succeeded) {
s = Status::Corruption(err_oss.str());
break;
// Maintain next epoch number on follower
if (deletedFiles.empty() && !newFiles.empty()) {
cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1);
}
} else {
// infer epoch number on follower
s = InferEpochNumber(&e, cfd, info,
flags & AR_RESET_IF_EPOCH_MISMATCH);
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
Expand Down Expand Up @@ -1626,6 +1547,171 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
return s;
}

Status DBImpl::InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd,
ApplyReplicationLogRecordInfo* info,
bool reset_next_epoch_number) {
auto& newFiles = e->GetNewFiles();
// Epoch number calculation on the fly.
// There are two cases in which we need to calculate epoch number
// when applying `kManifestWrite`
// 1. flush which generates L0 files. epoch number is allocated
// based on `next_epoch_number` of each CF. The L0 files are sorted
// based on `largest seqno`.
// 2. compaction which merges files in lower levels to higher
// levels. epoch number = min epoch number of input files.
const auto& deletedFiles = e->GetDeletedFiles();
if (deletedFiles.empty() && !newFiles.empty()) {
// case 1: flush into L0 files. New files must be level 0

for (auto& p : newFiles) {
if (p.first != 0) {
ROCKS_LOG_ERROR(
immutable_db_options_.info_log,
"[%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d",
cfd->GetName().c_str(), p.second.fd.GetNumber(), p.first);
return Status::Corruption("Newly flushed file is not at L0");
}
}

// sort added files by largest seqno
std::vector<FileMetaData*> added_files;
for (auto& p : newFiles) {
added_files.push_back(&p.second);
}

NewestFirstBySeqNo cmp;
std::sort(added_files.begin(), added_files.end(), cmp);
auto first_file = added_files[0];
// Rewind/advance next_epoch_number. This is necessary if next_epoch_number
// mismtaches due to db reopen.
if (first_file->epoch_number != kUnknownEpochNumber &&
first_file->epoch_number != cfd->GetNextEpochNumber() &&
reset_next_epoch_number) {
auto max_epoch_number =
cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (first_file->epoch_number < cfd->GetNextEpochNumber() &&
(first_file->epoch_number == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] rewind next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
max_epoch_number + 1);
cfd->SetNextEpochNumber(max_epoch_number + 1);
} else if (first_file->epoch_number > cfd->GetNextEpochNumber() &&
(cfd->GetNextEpochNumber() == max_epoch_number + 1)) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] advance next_epoch_number from: %" PRIu64
" to %" PRIu64,
cfd->GetName().c_str(), cfd->GetNextEpochNumber(),
first_file->epoch_number);
cfd->SetNextEpochNumber(first_file->epoch_number);
} else {
// Not safe to rewind/advance `next_epoch_number`. This can happen
// when we do epoch recovery during db open (i.e., nodes run
// with different rocksdb versions and nodes upgrading from old version
// to new version need to recover epoch). Poison is the best we can do
return Status::Poison("Poison due to diverged next epoch number");
}
}

for (auto meta : added_files) {
auto replicated_epoch_number = meta->epoch_number;
auto inferred_epoch_number = cfd->NewEpochNumber();
if (replicated_epoch_number != inferred_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), meta->fd.GetNumber(),
replicated_epoch_number, inferred_epoch_number);
info->mismatched_epoch_num += 1;
meta->epoch_number = inferred_epoch_number;
}
}
} else if (!deletedFiles.empty() && !newFiles.empty()) {
// case 2: compaction
uint64_t min_input_epoch_number = std::numeric_limits<uint64_t>::max();
const auto& storage_info = cfd->current()->storage_info();
for (auto [level, file_number] : deletedFiles) {
auto meta = storage_info->GetFileMetaDataByNumber(file_number);
if (!meta) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"[%s] deleted file: %" PRIu64 " at level: %d not found",
cfd->GetName().c_str(), file_number, level);
return Status::Corruption("Deleted file not found");
}
min_input_epoch_number =
std::min(meta->epoch_number, min_input_epoch_number);
}

for (auto& p : newFiles) {
auto replicated_epoch_number = p.second.epoch_number;
if (replicated_epoch_number != min_input_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] mismatched epoch for file: %" PRIu64
"; incoming: %" PRIu64 ", calculated: %" PRIu64,
cfd->GetName().c_str(), p.second.fd.GetNumber(),
replicated_epoch_number, min_input_epoch_number);
info->mismatched_epoch_num += 1;
p.second.epoch_number = min_input_epoch_number;
}
}
}
return Status::OK();
}

Status DBImpl::CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd) {
auto& newFiles = e.GetNewFiles();
auto& deletedFiles = e.GetDeletedFiles();

if (deletedFiles.empty() && !newFiles.empty()) {
// Case 1: new files generated after flushing.
// New files should be sorted by epoch number
for (size_t i = 0; i + 1 < newFiles.size(); i++) {
if (newFiles[i].second.epoch_number >= newFiles[i+1].second.epoch_number) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] unexpected epoch number ordering for file: %" PRIu64
": %" PRIu64 " and file: %" PRIu64 ": %" PRIu64,
cfd->GetName().c_str(), newFiles[i].second.fd.GetNumber(),
newFiles[i].second.epoch_number,
newFiles[i + 1].second.fd.GetNumber(),
newFiles[i + 1].second.epoch_number);
return Status::Corruption("New L0 files not sorted by epoch number");
}
}

if (newFiles.begin()->second.epoch_number < cfd->GetNextEpochNumber()) {
// If we need to rewind next epoch number during epoch replication, let's
// make sure it doesn't break epoch number consistency
auto max_epoch_number = cfd->current()->storage_info()->GetMaxEpochNumberOfFiles();
if (newFiles.begin()->second.epoch_number <= max_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Out of order epoch number for file: %" PRIu64
":%" PRIu64 "; max epoch number: %" PRIu64,
cfd->GetName().c_str(),
newFiles.begin()->second.fd.GetNumber(),
newFiles.begin()->second.epoch_number, max_epoch_number);
return Status::Corruption("Out of order epoch number for flush");
}
} // Otherwise, advance next epoch number
} else if (!newFiles.empty()) {
// Case 2: compaction.
// New files should all have the same epoch number and it's smaller than next_epoch_number
auto next_epoch_number = cfd->GetNextEpochNumber();
for (auto& f: newFiles) {
if (f.second.epoch_number >= next_epoch_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Out of order epoch number for file: %" PRIu64
":%" PRIu64 "; next epoch number: %" PRIu64,
cfd->GetName().c_str(), f.second.fd.GetNumber(),
f.second.epoch_number, next_epoch_number);
return Status::Corruption("Out of order epoch number for compaction");
}
}
}
return Status::OK();
}

Status DBImpl::GetReplicationRecordDebugString(
const ReplicationLogRecord& record, std::string* out) const {
std::ostringstream oss;
Expand Down
Loading