Skip to content

Commit

Permalink
Introduce SuperSnapshot
Browse files Browse the repository at this point in the history
Summary:
SuperSnapshot is a combination of a snapshot and a super-version. They
fulfill the same goal as snapshots, i.e. provide a consistent view of
the database. However, unlike snapshots, they also pin the current
super-version (memtable, immutable memtable list and file LSM tree). In
that way they are similar to iterators, which also pin the current
super-version for the duration of their lifetime.

Test Plan:
Added a basic unit test.

Reviewers:
  • Loading branch information
igorcanadi committed Oct 18, 2023
1 parent 5ff3b23 commit 3664001
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 11 deletions.
61 changes: 60 additions & 1 deletion cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ size_t ReplicationTest::catchUpFollower(
},
allow_new_manifest_writes, &info, flags);
assert(s.ok());
assert(info.mismatched_epoch_num == 0);
++ret;
}
if (info.has_new_manifest_writes) {
Expand Down Expand Up @@ -1279,6 +1278,66 @@ TEST_F(ReplicationTest, EpochNumberSimple) {
verifyEqual();
}

TEST_F(ReplicationTest, SuperSnapshot) {
auto options = leaderOptions();
options.disable_auto_compactions = true;
auto leader = openLeader();
auto follower = openFollower();

createColumnFamily("cf1");

ASSERT_OK(leader->Put(wo(), "k1", "v1"));
ASSERT_OK(leader->Put(wo(), leaderCF("cf1"), "cf1k1", "cf1v1"));
ASSERT_OK(leader->Flush({}));
catchUpFollower();

std::vector<ColumnFamilyHandle*> cf;
cf.push_back(follower->DefaultColumnFamily());
cf.push_back(followerCF("cf1"));
std::vector<const Snapshot*> snapshots;
ASSERT_OK(follower->GetSuperSnapshots(cf, &snapshots));

ASSERT_OK(leader->Put(wo(), "k1", "v2"));
ASSERT_OK(leader->Put(wo(), leaderCF("cf1"), "cf1k1", "cf1v2"));
ASSERT_OK(leader->Flush({}));
auto leaderFull = static_cast_with_check<DBImpl>(leader);
ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
ASSERT_OK(leaderFull->TEST_CompactRange(0, nullptr, nullptr, leaderCF("cf1"),
true));

catchUpFollower();

ReadOptions ro;
std::string val;
ASSERT_OK(follower->Get(ro, "k1", &val));
EXPECT_EQ(val, "v2");
ro.snapshot = snapshots[0];
ASSERT_OK(follower->Get(ro, "k1", &val));
EXPECT_EQ(val, "v1");

auto iter = follower->NewIterator(ro, follower->DefaultColumnFamily());
iter->SeekToFirst();
EXPECT_TRUE(iter->Valid());
EXPECT_EQ(iter->key(), "k1");
EXPECT_EQ(iter->value(), "v1");
iter->Next();
EXPECT_FALSE(iter->Valid());

ro.snapshot = nullptr;
ASSERT_OK(follower->Get(ro, followerCF("cf1"), "cf1k1", &val));
EXPECT_EQ(val, "cf1v2");
ro.snapshot = snapshots[1];
ASSERT_OK(follower->Get(ro, followerCF("cf1"), "cf1k1", &val));
EXPECT_EQ(val, "cf1v1");

// Column family <-> snapshot mismatch
ro.snapshot = snapshots[0];
ASSERT_FALSE(follower->Get(ro, followerCF("cf1"), "cf1k1", &val).ok());

follower->ReleaseSnapshot(snapshots[0]);
follower->ReleaseSnapshot(snapshots[1]);
}

} // namespace ROCKSDB_NAMESPACE

// A black-box test for the cloud wrapper around rocksdb
Expand Down
155 changes: 145 additions & 10 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2332,6 +2332,11 @@ InternalIterator* DBImpl::NewInternalIterator(
!read_options.ignore_range_deletions);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);

// Code related to super_snapshot in this function was contributed by
// RocksDB-Cloud
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);
if (s.ok()) {
// Collect iterators for files in L0 - Ln
if (read_options.read_tier != kMemtableTier) {
Expand All @@ -2341,14 +2346,19 @@ InternalIterator* DBImpl::NewInternalIterator(
}
internal_iter = merge_iter_builder.Finish(
read_options.ignore_range_deletions ? nullptr : db_iter);
SuperVersionHandle* cleanup = new SuperVersionHandle(
this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
immutable_db_options_.avoid_unnecessary_blocking_io);
internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr);
// Do not clean up the super version if super snapshot owns it
if (!super_snapshot) {
SuperVersionHandle* cleanup = new SuperVersionHandle(
this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
immutable_db_options_.avoid_unnecessary_blocking_io);
internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup,
nullptr);
}

return internal_iter;
} else {
} else if (!super_snapshot) { // Do not clean up the super version if super
// snapshot owns it
CleanupSuperVersion(super_version);
}
return NewErrorInternalIterator<Slice>(s, arena);
Expand Down Expand Up @@ -2487,8 +2497,20 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
}
}

// RocksDB-Cloud contribution begin
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);
if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) {
std::ostringstream oss;
oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName()
<< " doesn't match provided column family " << cfd->GetName();
return Status::InvalidArgument(oss.str());
}

// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
SuperVersion* sv =
super_snapshot ? super_snapshot->sv() : GetAndRefSuperVersion(cfd);
// RocksDB-Cloud contribution end

TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
Expand Down Expand Up @@ -2612,7 +2634,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
RecordTick(stats_, MEMTABLE_HIT);
}
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
// RocksDB-Cloud contribution begin
if (!super_snapshot && !done && !s.ok() && !s.IsMergeInProgress()) {
// RocksDB-Cloud contribution end
ReturnAndCleanupSuperVersion(cfd, sv);
return s;
}
Expand Down Expand Up @@ -2715,7 +2739,11 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
PERF_COUNTER_ADD(get_read_bytes, size);
}

ReturnAndCleanupSuperVersion(cfd, sv);
// RocksDB-Cloud contribution begin
if (!super_snapshot) {
ReturnAndCleanupSuperVersion(cfd, sv);
}
// RocksDB-Cloud contribution end

RecordInHistogram(stats_, BYTES_PER_READ, size);
}
Expand Down Expand Up @@ -2743,6 +2771,11 @@ std::vector<Status> DBImpl::MultiGet(
assert(column_family.size() == num_keys);
std::vector<Status> stat_list(num_keys);

// RocksDB-Cloud contribution begin
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);
// RocksDB-Cloud contribution end

bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
assert(column_family[i]);
Expand All @@ -2758,6 +2791,18 @@ std::vector<Status> DBImpl::MultiGet(
should_fail = true;
}
}
// RocksDB-Cloud contribution begin
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family[i]);
auto cfd = cfh->cfd();
if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) {
std::ostringstream oss;
oss << "[MultiGet] SuperSnapshot column family "
<< super_snapshot->cfd()->GetName()
<< " doesn't match provided column family " << cfd->GetName();
stat_list[i] = Status::InvalidArgument(oss.str());
should_fail = true;
}
// RocksDB-Cloud contribution end
}

if (should_fail) {
Expand Down Expand Up @@ -3070,6 +3115,15 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
return;
}

// RocksDB-Cloud contribution begin
if (dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported(
"MultiGet with timestamps does not support super snapshot");
}
}
// RocksDB-Cloud contribution end

bool should_fail = false;
for (size_t i = 0; i < num_keys; ++i) {
ColumnFamilyHandle* cfh = column_families[i];
Expand Down Expand Up @@ -3770,6 +3824,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
result = nullptr;

#else
if (dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
return NewErrorIterator(Status::NotSupported(
"Tailing iterator not supported with super snapshot"));
}

SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv,
/* allow_unprepared_value */ true);
Expand All @@ -3780,6 +3839,19 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
this, cfd);
#endif
} else {
// RocksDB-Cloud contribution begin
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);
if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) {
std::ostringstream oss;
oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName()
<< " doesn't match provided column family " << cfd->GetName();
// We do a check here instead of in NewIteratorImpl because
// NewIteratorImpl returns ArenaWrappedDBIter, which ErrorIterator does
// not subclass
return NewErrorIterator(Status::InvalidArgument(oss.str()));
}
// RocksDB-Cloud contribution end
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB
Expand All @@ -3798,7 +3870,14 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ReadCallback* read_callback,
bool expose_blob_index,
bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
// RocksDB-Cloud contribution begin
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);

// Acquire SuperVersion
SuperVersion* sv = super_snapshot ? super_snapshot->sv()
: cfd->GetReferencedSuperVersion(this);
// RocksDB-Cloud contribution end

TEST_SYNC_POINT("DBImpl::NewIterator:1");
TEST_SYNC_POINT("DBImpl::NewIterator:2");
Expand Down Expand Up @@ -3913,6 +3992,10 @@ Status DBImpl::NewIterators(
return Status::InvalidArgument(
"Tailing iterator not supported in RocksDB lite");
#else
if (dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
return Status::NotSupported(
"Tailing iterator not supported with super snapshot");
}
for (auto cfh : column_families) {
auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
Expand All @@ -3936,6 +4019,18 @@ Status DBImpl::NewIterators(
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
->cfd();
// RocksDB-Cloud contribution begin
auto super_snapshot =
dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot);
if (super_snapshot && cfd->GetID() != super_snapshot->cfd()->GetID()) {
std::ostringstream oss;
oss << "SuperSnapshot column family " << super_snapshot->cfd()->GetName()
<< " doesn't match provided column family " << cfd->GetName();
// We do a check here instead of in NewIteratorImpl because
// NewIteratorImpl returns ArenaWrappedDBIter, which ErrorIterator does
// not subclass
return Status::InvalidArgument(oss.str());
}
iterators->push_back(
NewIteratorImpl(read_options, cfd, snapshot, read_callback));
}
Expand All @@ -3954,6 +4049,40 @@ SequenceNumber DBImpl::GetIteratorSequenceNumber(Iterator* it) {

const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }

// RocksDB-Cloud contribution begin
Status DBImpl::GetSuperSnapshots(
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<const Snapshot*>* snapshots) {
#ifndef ROCKSDB_USE_RTTI
return Status::InvalidArgument(
"GetSuperSnapshots only supported in RocksDB compiled with USE_RTTI=1");
#endif
if (!is_snapshot_supported_) {
return Status::InvalidArgument("Snapshot not supported");
}

int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error

auto snapshot_seq = GetLastPublishedSequence();

snapshots->reserve(column_families.size());

for (auto& cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
auto sv = cfd->GetReferencedSuperVersion(this);
auto ss = new SuperSnapshotImpl(cfd, sv);
snapshots_.New(ss, snapshot_seq, unix_time,
/*is_write_conflict_boundary=*/false);
snapshots->push_back(ss);
}

return Status::OK();
}
// RocksDB-Cloud contribution end

#ifndef ROCKSDB_LITE
const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
return GetSnapshotImpl(true);
Expand Down Expand Up @@ -4159,6 +4288,12 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
// inplace_update_support enabled.
return;
}
// RocksDB-Cloud contribution begin
if (auto ss = dynamic_cast<const SuperSnapshotImpl*>(s)) {
CleanupSuperVersion(ss->sv());
}
// RocksDB-Cloud contribution end

const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
{
InstrumentedMutexLock l(&mutex_);
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ class DBImpl : public DB {
SequenceNumber GetIteratorSequenceNumber(Iterator* it) override;

virtual const Snapshot* GetSnapshot() override;
// RocksDB-Cloud contribution begin
Status GetSuperSnapshots(
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<const Snapshot*>* snapshots) override;
// RocksDB-Cloud contribution end
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
Expand Down
6 changes: 6 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,12 @@ class ModelDB : public DB {
return snapshot;
}

virtual Status GetSuperSnapshots(
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<const Snapshot*>* snapshots) {
return Status::NotSupported("");
}

void ReleaseSnapshot(const Snapshot* snapshot) override {
delete reinterpret_cast<const ModelSnapshot*>(snapshot);
}
Expand Down
18 changes: 18 additions & 0 deletions db/snapshot_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ class SnapshotImpl : public Snapshot {
bool is_write_conflict_boundary_;
};

// RocksDB-Cloud contribution begin
class SuperVersion;
class ColumnFamilyData;
class SuperSnapshotImpl : public SnapshotImpl {
public:
SuperSnapshotImpl(ColumnFamilyData* cfd, SuperVersion* sv)
: cfd_(cfd), sv_(sv) {}

ColumnFamilyData* cfd() const { return cfd_; }
SuperVersion* sv() const { return sv_; }

private:
ColumnFamilyData* cfd_;
// We hold a ref to sv_
SuperVersion* sv_{nullptr};
};
// RocksDB-Cloud contribution end

class SnapshotList {
public:
SnapshotList() {
Expand Down
Loading

0 comments on commit 3664001

Please sign in to comment.