diff --git a/cloud/replication_test.cc b/cloud/replication_test.cc index 501805e7be6f..e8a932f19ecf 100644 --- a/cloud/replication_test.cc +++ b/cloud/replication_test.cc @@ -455,11 +455,6 @@ size_t ReplicationTest::catchUpFollower( DB::ApplyReplicationLogRecordInfo info; size_t ret = 0; unsigned flags = DB::AR_EVICT_OBSOLETE_FILES; - flags |= DB::AR_RESET_IF_EPOCH_MISMATCH; - if (replicate_epoch_number_) { - flags |= DB::AR_REPLICATE_EPOCH_NUM; - flags |= DB::AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION; - } for (; followerSequence_ < (int)log_records_.size(); ++followerSequence_) { if (num_records && ret >= *num_records) { break; @@ -1139,19 +1134,7 @@ TEST_F(ReplicationTest, EvictObsoleteFiles) { static_cast_with_check(follower)->TEST_table_cache()->GetUsage()); } -class ReplicationTestWithParam : public ReplicationTest, - public testing::WithParamInterface> { - public: - ReplicationTestWithParam() - : ReplicationTest() {} - - void SetUp() override { - std::tie(replicate_epoch_number_, consistency_check_on_epoch_replication) = - GetParam(); - } -}; - -TEST_P(ReplicationTestWithParam, Stress) { +TEST_F(ReplicationTest, Stress) { std::string val; auto leader = openLeader(); openFollower(); @@ -1232,15 +1215,6 @@ TEST_P(ReplicationTestWithParam, Stress) { verifyNextEpochNumber(); } -INSTANTIATE_TEST_CASE_P(ReplicationTest, ReplicationTestWithParam, - ::testing::ValuesIn(std::vector>{ - // don't replicate epoch - {false, true}, - // replicate epoch but no consistency check - {true, false}, - // replicate epoch and do consistency check - {true, true}})); - TEST_F(ReplicationTest, DeleteRange) { auto leader = openLeader(); openFollower(); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 991168dc74a2..bc198aa17818 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1430,48 +1430,18 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, if (!s.ok()) { break; } - if (flags & AR_REPLICATE_EPOCH_NUM) { - // replicate epoch number on follower - s = CheckNextEpochNumberConsistency(e, cfd); - if (!s.ok()) { - break; - } - - auto& newFiles = e.GetNewFiles(); - auto& deletedFiles = e.GetDeletedFiles(); - - if (flags & AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION) { - if (deletedFiles.empty() && !newFiles.empty()) { - // Set next epoch number properly before epoch number consistency check. - // This is necessary if next_epoch_number changes during db reopen. - cfd->SetNextEpochNumber(newFiles.begin()->second.epoch_number); - } + s = CheckNextEpochNumberConsistency(e, cfd); + if (!s.ok()) { + break; + } - // do consistency check by comparing the replicated epoch number - // against inferred epoch number No need to - // `reset_next_epoch_number` here since we have already done it - s = InferEpochNumber(&e, cfd, info, - false /* reset_next_epoch_number */); - if (s.ok() && info->mismatched_epoch_num > 0) { - s = Status::Poison("epoch number consistency check fails"); - } - if (!s.ok()) { - break; - } - } + auto& newFiles = e.GetNewFiles(); + auto& deletedFiles = e.GetDeletedFiles(); - // Maintain next epoch number on follower - if (deletedFiles.empty() && !newFiles.empty()) { - cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1); - } - } else { - // infer epoch number on follower - s = InferEpochNumber(&e, cfd, info, - flags & AR_RESET_IF_EPOCH_MISMATCH); - if (!s.ok()) { - break; - } + // Maintain next epoch number on follower + if (deletedFiles.empty() && !newFiles.empty()) { + cfd->SetNextEpochNumber(newFiles.rbegin()->second.epoch_number + 1); } } if (!s.ok()) { @@ -1548,119 +1518,7 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, return s; } -Status DBImpl::InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd, - ApplyReplicationLogRecordInfo* info, - bool reset_next_epoch_number) { - auto& newFiles = e->GetNewFiles(); - // Epoch number calculation on the fly. - // There are two cases in which we need to calculate epoch number - // when applying `kManifestWrite` - // 1. flush which generates L0 files. epoch number is allocated - // based on `next_epoch_number` of each CF. The L0 files are sorted - // based on `largest seqno`. - // 2. compaction which merges files in lower levels to higher - // levels. epoch number = min epoch number of input files. - const auto& deletedFiles = e->GetDeletedFiles(); - if (deletedFiles.empty() && !newFiles.empty()) { - // case 1: flush into L0 files. New files must be level 0 - - for (auto& p : newFiles) { - if (p.first != 0) { - ROCKS_LOG_ERROR( - immutable_db_options_.info_log, - "[%s] newly flushed file: %" PRIu64 " < is not at L0 but Level: %d", - cfd->GetName().c_str(), p.second.fd.GetNumber(), p.first); - return Status::Corruption("Newly flushed file is not at L0"); - } - } - - // sort added files by largest seqno - std::vector added_files; - for (auto& p : newFiles) { - added_files.push_back(&p.second); - } - - NewestFirstBySeqNo cmp; - std::sort(added_files.begin(), added_files.end(), cmp); - auto first_file = added_files[0]; - // Rewind/advance next_epoch_number. This is necessary if next_epoch_number - // mismtaches due to db reopen. - if (first_file->epoch_number != kUnknownEpochNumber && - first_file->epoch_number != cfd->GetNextEpochNumber() && - reset_next_epoch_number) { - auto max_epoch_number = - cfd->current()->storage_info()->GetMaxEpochNumberOfFiles(); - if (first_file->epoch_number < cfd->GetNextEpochNumber() && - (first_file->epoch_number == max_epoch_number + 1)) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[%s] rewind next_epoch_number from: %" PRIu64 - " to %" PRIu64, - cfd->GetName().c_str(), cfd->GetNextEpochNumber(), - max_epoch_number + 1); - cfd->SetNextEpochNumber(max_epoch_number + 1); - } else if (first_file->epoch_number > cfd->GetNextEpochNumber() && - (cfd->GetNextEpochNumber() == max_epoch_number + 1)) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[%s] advance next_epoch_number from: %" PRIu64 - " to %" PRIu64, - cfd->GetName().c_str(), cfd->GetNextEpochNumber(), - first_file->epoch_number); - cfd->SetNextEpochNumber(first_file->epoch_number); - } else { - // Not safe to rewind/advance `next_epoch_number`. This can happen - // when we do epoch recovery during db open (i.e., nodes run - // with different rocksdb versions and nodes upgrading from old version - // to new version need to recover epoch). Poison is the best we can do - return Status::Poison("Poison due to diverged next epoch number"); - } - } - - for (auto meta : added_files) { - auto replicated_epoch_number = meta->epoch_number; - auto inferred_epoch_number = cfd->NewEpochNumber(); - if (replicated_epoch_number != inferred_epoch_number) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[%s] mismatched epoch for file: %" PRIu64 - "; incoming: %" PRIu64 ", calculated: %" PRIu64, - cfd->GetName().c_str(), meta->fd.GetNumber(), - replicated_epoch_number, inferred_epoch_number); - info->mismatched_epoch_num += 1; - meta->epoch_number = inferred_epoch_number; - } - } - } else if (!deletedFiles.empty() && !newFiles.empty()) { - // case 2: compaction - uint64_t min_input_epoch_number = std::numeric_limits::max(); - const auto& storage_info = cfd->current()->storage_info(); - for (auto [level, file_number] : deletedFiles) { - auto meta = storage_info->GetFileMetaDataByNumber(file_number); - if (!meta) { - ROCKS_LOG_ERROR(immutable_db_options_.info_log, - "[%s] deleted file: %" PRIu64 " at level: %d not found", - cfd->GetName().c_str(), file_number, level); - return Status::Corruption("Deleted file not found"); - } - min_input_epoch_number = - std::min(meta->epoch_number, min_input_epoch_number); - } - - for (auto& p : newFiles) { - auto replicated_epoch_number = p.second.epoch_number; - if (replicated_epoch_number != min_input_epoch_number) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[%s] mismatched epoch for file: %" PRIu64 - "; incoming: %" PRIu64 ", calculated: %" PRIu64, - cfd->GetName().c_str(), p.second.fd.GetNumber(), - replicated_epoch_number, min_input_epoch_number); - info->mismatched_epoch_num += 1; - p.second.epoch_number = min_input_epoch_number; - } - } - } - return Status::OK(); -} - -Status DBImpl::CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd) { +Status DBImpl::CheckNextEpochNumberConsistency(const VersionEdit& e, ColumnFamilyData* cfd) { auto& newFiles = e.GetNewFiles(); auto& deletedFiles = e.GetDeletedFiles(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index bb9c0110858c..3e08d23036d3 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -412,15 +412,10 @@ class DBImpl : public DB { ApplyReplicationLogRecordInfo* info, unsigned flags) override; - // Calculate epoch number on follower - Status InferEpochNumber(VersionEdit* e, ColumnFamilyData* cfd, - ApplyReplicationLogRecordInfo* info, - bool reset_next_epoch_number); - // Check that replicated epoch number of newly flushed files >= cfd's next // epoch number. // TODO: make `VersionEdit` const - Status CheckNextEpochNumberConsistency(VersionEdit& e, ColumnFamilyData* cfd); + Status CheckNextEpochNumberConsistency(const VersionEdit& e, ColumnFamilyData* cfd); Status GetReplicationRecordDebugString( const ReplicationLogRecord& record, std::string* out) const override; Status GetPersistedReplicationSequence(std::string* out) override; diff --git a/db/version_edit.h b/db/version_edit.h index 0f69c2b3984f..24938de0bbd2 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -504,7 +504,6 @@ class VersionEdit { // Retrieve the table files added as well as their associated levels. using NewFiles = std::vector>; const NewFiles& GetNewFiles() const { return new_files_; } - NewFiles& GetNewFiles() { return new_files_; } // Retrieve all the compact cursors using CompactCursors = std::vector>; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c1747f179d8a..4681e4e5ae33 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1326,22 +1326,6 @@ class DB { // REQUIRES: info needs to be provided, can't be nullptr. enum ApplyReplicationLogRecordFlags : unsigned { AR_EVICT_OBSOLETE_FILES = 1U << 0, - // If set, replicate epoch number instead of calculating the number when - // applying `kManifestWrite` - AR_REPLICATE_EPOCH_NUM = 1U << 1, - // If set, rewind/advance `next_epoch_number` if mismatches found. - // Rocksdb doesn't track `next_epoch_number` in manifest file. When db is - // reopened, it calculates the `next_epoch_number` based on max epoch number - // of existing live files. So it's possible for the `next_epoch_number` to - // go backwards. Following two cases are possible: - // 1. leader reopens db, causing `next_epoch_number` on leader to go - // backwards. So follower needs to rewind it. - // 2. follower reopens db, causing `next_epoch_number` on follower to go - // backwards. So follower needs to advance it - AR_RESET_IF_EPOCH_MISMATCH = 1U << 2, - - // Check the consistency of epoch number during replication - AR_CONSISTENCY_CHECK_ON_EPOCH_REPLICATION = 1U << 3 }; using CFOptionsFactory = std::function; virtual Status ApplyReplicationLogRecord(ReplicationLogRecord record,