diff --git a/cloud/replication_test.cc b/cloud/replication_test.cc index e8a932f19ec..740054735b6 100644 --- a/cloud/replication_test.cc +++ b/cloud/replication_test.cc @@ -467,7 +467,6 @@ size_t ReplicationTest::catchUpFollower( }, allow_new_manifest_writes, &info, flags); assert(s.ok()); - assert(info.mismatched_epoch_num == 0); ++ret; } if (info.has_new_manifest_writes) { @@ -1279,6 +1278,86 @@ TEST_F(ReplicationTest, EpochNumberSimple) { verifyEqual(); } +TEST_F(ReplicationTest, SuperSnapshot) { + auto options = leaderOptions(); + options.disable_auto_compactions = true; + auto leader = openLeader(); + auto follower = openFollower(); + + createColumnFamily("cf1"); + + ASSERT_OK(leader->Put(wo(), "k1", "v1")); + ASSERT_OK(leader->Put(wo(), leaderCF("cf1"), "cf1k1", "cf1v1")); + ASSERT_OK(leader->Flush({})); + catchUpFollower(); + + std::vector cf; + cf.push_back(follower->DefaultColumnFamily()); + cf.push_back(followerCF("cf1")); + std::vector snapshots; + ASSERT_OK(follower->GetSuperSnapshots(cf, &snapshots)); + + ASSERT_OK(leader->Put(wo(), "k1", "v2")); + ASSERT_OK(leader->Put(wo(), leaderCF("cf1"), "cf1k1", "cf1v2")); + ASSERT_OK(leader->Flush({})); + auto leaderFull = static_cast_with_check(leader); + ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, leaderCF("cf1"), + true)); + + catchUpFollower(); + + ReadOptions ro; + std::string val; + ASSERT_OK(follower->Get(ro, "k1", &val)); + EXPECT_EQ(val, "v2"); + ro.snapshot = snapshots[0]; + ASSERT_OK(follower->Get(ro, "k1", &val)); + EXPECT_EQ(val, "v1"); + + auto iter = follower->NewIterator(ro, follower->DefaultColumnFamily()); + iter->SeekToFirst(); + EXPECT_TRUE(iter->Valid()); + EXPECT_EQ(iter->key(), "k1"); + EXPECT_EQ(iter->value(), "v1"); + iter->Next(); + EXPECT_FALSE(iter->Valid()); + + ro.snapshot = nullptr; + ASSERT_OK(follower->Get(ro, followerCF("cf1"), "cf1k1", &val)); + EXPECT_EQ(val, "cf1v2"); + ro.snapshot = snapshots[1]; + ASSERT_OK(follower->Get(ro, followerCF("cf1"), "cf1k1", &val)); + EXPECT_EQ(val, "cf1v1"); + + // Test MultiGet + std::vector keys; + keys.push_back("cf1k1"); + keys.push_back("missing"); + std::vector cfs; + cfs.push_back(followerCF("cf1")); + cfs.push_back(followerCF("cf1")); + std::vector vals; + + auto statuses = follower->MultiGet(ro, cfs, keys, &vals); + ASSERT_OK(statuses[0]); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_EQ(vals[0], "cf1v1"); + + ro.snapshot = nullptr; + statuses = follower->MultiGet(ro, cfs, keys, &vals); + ASSERT_OK(statuses[0]); + ASSERT_TRUE(statuses[1].IsNotFound()); + ASSERT_EQ(vals[0], "cf1v2"); + + // Column family <-> snapshot mismatch + ro.snapshot = snapshots[0]; + ASSERT_FALSE(follower->Get(ro, followerCF("cf1"), "cf1k1", &val).ok()); + + follower->ReleaseSnapshot(snapshots[0]); + follower->ReleaseSnapshot(snapshots[1]); +} + } // namespace ROCKSDB_NAMESPACE // A black-box test for the cloud wrapper around rocksdb diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7a4d74c95b0..2af3fcf42be 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2332,6 +2332,16 @@ InternalIterator* DBImpl::NewInternalIterator( !read_options.ignore_range_deletions); } TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s); + + // Code related to super_snapshot in this function was contributed by + // RocksDB-Cloud + auto super_snapshot = + dynamic_cast(read_options.snapshot); + // Tricky: it's possible for NewInternalIterator to be passed super snapshot, + // but the super version given doesn't match the super snapshot. This is true + // for code paths that don't support super snapshot yet. + auto super_snapshot_owns_super_version = + super_snapshot ? super_snapshot->sv() == super_version : false; if (s.ok()) { // Collect iterators for files in L0 - Ln if (read_options.read_tier != kMemtableTier) { @@ -2341,14 +2351,19 @@ InternalIterator* DBImpl::NewInternalIterator( } internal_iter = merge_iter_builder.Finish( read_options.ignore_range_deletions ? nullptr : db_iter); - SuperVersionHandle* cleanup = new SuperVersionHandle( - this, &mutex_, super_version, - read_options.background_purge_on_iterator_cleanup || - immutable_db_options_.avoid_unnecessary_blocking_io); - internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr); + // Do not clean up the super version if super snapshot owns it + if (!super_snapshot_owns_super_version) { + SuperVersionHandle* cleanup = new SuperVersionHandle( + this, &mutex_, super_version, + read_options.background_purge_on_iterator_cleanup || + immutable_db_options_.avoid_unnecessary_blocking_io); + internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, + nullptr); + } return internal_iter; - } else { + } else if (!super_snapshot_owns_super_version) { + // Do not clean up the super version if super snapshot owns it CleanupSuperVersion(super_version); } return NewErrorInternalIterator(s, arena); @@ -2487,8 +2502,20 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, } } + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) { + std::ostringstream oss; + oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName() + << " doesn't match provided column family " << cfd->GetName(); + return Status::InvalidArgument(oss.str()); + } + // Acquire SuperVersion - SuperVersion* sv = GetAndRefSuperVersion(cfd); + SuperVersion* sv = + super_snapshot ? super_snapshot->sv() : GetAndRefSuperVersion(cfd); + // RocksDB-Cloud contribution end TEST_SYNC_POINT("DBImpl::GetImpl:1"); TEST_SYNC_POINT("DBImpl::GetImpl:2"); @@ -2612,7 +2639,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, RecordTick(stats_, MEMTABLE_HIT); } } - if (!done && !s.ok() && !s.IsMergeInProgress()) { + // RocksDB-Cloud contribution begin + if (!super_snapshot && !done && !s.ok() && !s.IsMergeInProgress()) { + // RocksDB-Cloud contribution end ReturnAndCleanupSuperVersion(cfd, sv); return s; } @@ -2715,7 +2744,11 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, PERF_COUNTER_ADD(get_read_bytes, size); } - ReturnAndCleanupSuperVersion(cfd, sv); + // RocksDB-Cloud contribution begin + if (!super_snapshot) { + ReturnAndCleanupSuperVersion(cfd, sv); + } + // RocksDB-Cloud contribution end RecordInHistogram(stats_, BYTES_PER_READ, size); } @@ -2743,6 +2776,11 @@ std::vector DBImpl::MultiGet( assert(column_family.size() == num_keys); std::vector stat_list(num_keys); + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + // RocksDB-Cloud contribution end + bool should_fail = false; for (size_t i = 0; i < num_keys; ++i) { assert(column_family[i]); @@ -2758,6 +2796,18 @@ std::vector DBImpl::MultiGet( should_fail = true; } } + // RocksDB-Cloud contribution begin + auto cfh = static_cast_with_check(column_family[i]); + auto cfd = cfh->cfd(); + if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) { + std::ostringstream oss; + oss << "[MultiGet] SuperSnapshot column family " + << super_snapshot->cfd()->GetName() + << " doesn't match provided column family " << cfd->GetName(); + stat_list[i] = Status::InvalidArgument(oss.str()); + should_fail = true; + } + // RocksDB-Cloud contribution end } if (should_fail) { @@ -2914,12 +2964,18 @@ std::vector DBImpl::MultiGet( PERF_TIMER_GUARD(get_post_process_time); autovector superversions_to_delete; - for (auto mgd_iter : multiget_cf_data) { - auto mgd = mgd_iter.second; - if (!unref_only) { - ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version); - } else { - mgd.cfd->GetSuperVersion()->Unref(); + // Only cleanup the super versions if we don't have super snapshot, which + // brought its own superversion. + // RocksDB-Cloud contribution begin + if (!dynamic_cast(read_options.snapshot)) { + // RocksDB-Cloud contribution end + for (auto mgd_iter : multiget_cf_data) { + auto mgd = mgd_iter.second; + if (!unref_only) { + ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version); + } else { + mgd.cfd->GetSuperVersion()->Unref(); + } } } RecordTick(stats_, NUMBER_MULTIGET_CALLS); @@ -2947,7 +3003,12 @@ bool DBImpl::MultiCFSnapshot( // super version auto cf_iter = cf_list->begin(); auto node = iter_deref_func(cf_iter); - node->super_version = GetAndRefSuperVersion(node->cfd); + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + node->super_version = super_snapshot ? super_snapshot->sv() + : GetAndRefSuperVersion(node->cfd); + // RocksDB-Cloud contribution end if (read_options.snapshot != nullptr) { // Note: In WritePrepared txns this is not necessary but not harmful // either. Because prep_seq > snapshot => commit_seq > snapshot so if @@ -2976,6 +3037,10 @@ bool DBImpl::MultiCFSnapshot( *snapshot = GetLastPublishedSequence(); } } else { + // RocksDB-Cloud contribution begin + // MultiGet across column families is not supported with super snapshot + assert(!dynamic_cast(read_options.snapshot)); + // RocksDB-Cloud contribution end // If we end up with the same issue of memtable geting sealed during 2 // consecutive retries, it means the write rate is very high. In that case // its probably ok to take the mutex on the 3rd try so we can succeed for @@ -3070,6 +3135,15 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, return; } + // RocksDB-Cloud contribution begin + if (dynamic_cast(read_options.snapshot)) { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::NotSupported( + "MultiGet with timestamps does not support super snapshot"); + } + } + // RocksDB-Cloud contribution end + bool should_fail = false; for (size_t i = 0; i < num_keys; ++i) { ColumnFamilyHandle* cfh = column_families[i]; @@ -3770,6 +3844,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, result = nullptr; #else + if (dynamic_cast(read_options.snapshot)) { + return NewErrorIterator(Status::NotSupported( + "Tailing iterator not supported with super snapshot")); + } + SuperVersion* sv = cfd->GetReferencedSuperVersion(this); auto iter = new ForwardIterator(this, read_options, cfd, sv, /* allow_unprepared_value */ true); @@ -3780,6 +3859,19 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, this, cfd); #endif } else { + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) { + std::ostringstream oss; + oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName() + << " doesn't match provided column family " << cfd->GetName(); + // We do a check here instead of in NewIteratorImpl because + // NewIteratorImpl returns ArenaWrappedDBIter, which ErrorIterator does + // not subclass + return NewErrorIterator(Status::InvalidArgument(oss.str())); + } + // RocksDB-Cloud contribution end // Note: no need to consider the special case of // last_seq_same_as_publish_seq_==false since NewIterator is overridden in // WritePreparedTxnDB @@ -3798,7 +3890,14 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ReadCallback* read_callback, bool expose_blob_index, bool allow_refresh) { - SuperVersion* sv = cfd->GetReferencedSuperVersion(this); + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + + // Acquire SuperVersion + SuperVersion* sv = super_snapshot ? super_snapshot->sv() + : cfd->GetReferencedSuperVersion(this); + // RocksDB-Cloud contribution end TEST_SYNC_POINT("DBImpl::NewIterator:1"); TEST_SYNC_POINT("DBImpl::NewIterator:2"); @@ -3913,6 +4012,10 @@ Status DBImpl::NewIterators( return Status::InvalidArgument( "Tailing iterator not supported in RocksDB lite"); #else + if (dynamic_cast(read_options.snapshot)) { + return Status::NotSupported( + "Tailing iterator not supported with super snapshot"); + } for (auto cfh : column_families) { auto cfd = static_cast_with_check(cfh)->cfd(); SuperVersion* sv = cfd->GetReferencedSuperVersion(this); @@ -3936,6 +4039,18 @@ Status DBImpl::NewIterators( auto* cfd = static_cast_with_check(column_families[i]) ->cfd(); + // RocksDB-Cloud contribution begin + auto super_snapshot = + dynamic_cast(read_options.snapshot); + if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) { + std::ostringstream oss; + oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName() + << " doesn't match provided column family " << cfd->GetName(); + // We do a check here instead of in NewIteratorImpl because + // NewIteratorImpl returns ArenaWrappedDBIter, which ErrorIterator does + // not subclass + return Status::InvalidArgument(oss.str()); + } iterators->push_back( NewIteratorImpl(read_options, cfd, snapshot, read_callback)); } @@ -3954,6 +4069,40 @@ SequenceNumber DBImpl::GetIteratorSequenceNumber(Iterator* it) { const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); } +// RocksDB-Cloud contribution begin +Status DBImpl::GetSuperSnapshots( + const std::vector& column_families, + std::vector* snapshots) { +#ifndef ROCKSDB_USE_RTTI + return Status::InvalidArgument( + "GetSuperSnapshots only supported in RocksDB compiled with USE_RTTI=1"); +#endif + if (!is_snapshot_supported_) { + return Status::InvalidArgument("Snapshot not supported"); + } + + int64_t unix_time = 0; + immutable_db_options_.clock->GetCurrentTime(&unix_time) + .PermitUncheckedError(); // Ignore error + + auto snapshot_seq = GetLastPublishedSequence(); + + snapshots->reserve(column_families.size()); + + for (auto& cf : column_families) { + auto cfh = static_cast_with_check(cf); + auto cfd = cfh->cfd(); + auto sv = cfd->GetReferencedSuperVersion(this); + auto ss = new SuperSnapshotImpl(cfd, sv); + snapshots_.New(ss, snapshot_seq, unix_time, + /*is_write_conflict_boundary=*/false); + snapshots->push_back(ss); + } + + return Status::OK(); +} +// RocksDB-Cloud contribution end + #ifndef ROCKSDB_LITE const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { return GetSnapshotImpl(true); @@ -4159,6 +4308,12 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { // inplace_update_support enabled. return; } + // RocksDB-Cloud contribution begin + if (auto ss = dynamic_cast(s)) { + CleanupSuperVersion(ss->sv()); + } + // RocksDB-Cloud contribution end + const SnapshotImpl* casted_s = reinterpret_cast(s); { InstrumentedMutexLock l(&mutex_); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c29272e1f4e..8bf9949865d 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -345,6 +345,11 @@ class DBImpl : public DB { SequenceNumber GetIteratorSequenceNumber(Iterator* it) override; virtual const Snapshot* GetSnapshot() override; + // RocksDB-Cloud contribution begin + Status GetSuperSnapshots( + const std::vector& column_families, + std::vector* snapshots) override; + // RocksDB-Cloud contribution end virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then diff --git a/db/db_test.cc b/db/db_test.cc index 8dc2adb7201..a492049d0e0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3196,6 +3196,12 @@ class ModelDB : public DB { return snapshot; } + virtual Status GetSuperSnapshots( + const std::vector& column_families, + std::vector* snapshots) { + return Status::NotSupported(""); + } + void ReleaseSnapshot(const Snapshot* snapshot) override { delete reinterpret_cast(snapshot); } diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 23e5e98cd2e..529486fa1a5 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -51,6 +51,24 @@ class SnapshotImpl : public Snapshot { bool is_write_conflict_boundary_; }; +// RocksDB-Cloud contribution begin +class SuperVersion; +class ColumnFamilyData; +class SuperSnapshotImpl : public SnapshotImpl { + public: + SuperSnapshotImpl(ColumnFamilyData* cfd, SuperVersion* sv) + : cfd_(cfd), sv_(sv) {} + + ColumnFamilyData* cfd() const { return cfd_; } + SuperVersion* sv() const { return sv_; } + + private: + ColumnFamilyData* cfd_; + // We hold a ref to sv_ + SuperVersion* sv_{nullptr}; +}; +// RocksDB-Cloud contribution end + class SnapshotList { public: SnapshotList() { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2cca6a45418..197e5975da3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -848,6 +848,23 @@ class DB { // not support snapshot (eg: inplace_update_support enabled). virtual const Snapshot* GetSnapshot() = 0; + // RocksDB-Cloud contribution begin + // SuperSnapshot is a combination of a snapshot and a super-version. They + // fulfill the same goal as snapshots, i.e. provide a consistent view of the + // database. However, unlike snapshots, they also pin the current + // super-version (memtable, immutable memtable list and file LSM tree). In + // that way they are similar to iterators, which also pin the current + // super-version for the duration of their lifetime. + // Each super-snapshot is tied to a column family and the column family in the + // read request has to match with the column family of the snapshot. + // + // Just like the regular snapshot, the caller must call ReleaseSnapshot() when + // it's done with the snapshot, and before closing the database. + virtual Status GetSuperSnapshots( + const std::vector& column_families, + std::vector* snapshots) = 0; + // RocksDB-Cloud contribution end + // Release a previously acquired snapshot. The caller must not // use "snapshot" after this call. virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 2bd1961412d..7b9b8ca29f1 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -245,6 +245,14 @@ class StackableDB : public DB { virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } + // RocksDB-Cloud contribution begin + Status GetSuperSnapshots( + const std::vector& column_families, + std::vector* snapshots) override { + return db_->GetSuperSnapshots(column_families, snapshots); + } + // RocksDB-Cloud contribution end + virtual void ReleaseSnapshot(const Snapshot* snapshot) override { return db_->ReleaseSnapshot(snapshot); }