Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SuperSnapshots in one more variant of MultiGet #296

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1315,13 +1315,15 @@ TEST_F(ReplicationTest, SuperSnapshot) {
ASSERT_OK(follower->Get(ro, "k1", &val));
EXPECT_EQ(val, "v1");

auto iter = follower->NewIterator(ro, follower->DefaultColumnFamily());
auto iter = std::unique_ptr<rocksdb::Iterator>(
follower->NewIterator(ro, follower->DefaultColumnFamily()));
iter->SeekToFirst();
EXPECT_TRUE(iter->Valid());
EXPECT_EQ(iter->key(), "k1");
EXPECT_EQ(iter->value(), "v1");
iter->Next();
EXPECT_FALSE(iter->Valid());
iter.reset();

ro.snapshot = nullptr;
ASSERT_OK(follower->Get(ro, followerCF("cf1"), "cf1k1", &val));
Expand All @@ -1339,19 +1341,33 @@ TEST_F(ReplicationTest, SuperSnapshot) {
cfs.push_back(followerCF("cf1"));
std::vector<std::string> vals;

// Multiget on cf1, snapshot
ro.snapshot = snapshots[1];
auto statuses = follower->MultiGet(ro, cfs, keys, &vals);
ASSERT_OK(statuses[0]);
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_EQ(vals[0], "cf1v1");

// Multiget on cf1, no snapshot
ro.snapshot = nullptr;
statuses = follower->MultiGet(ro, cfs, keys, &vals);
ASSERT_OK(statuses[0]);
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_EQ(vals[0], "cf1v2");

// Column family <-> snapshot mismatch
// Multiget on default column family, snapshot, pinnable values
ro.snapshot = snapshots[0];
keys[0] = "k1";
std::vector<PinnableSlice> pinnableValues;
pinnableValues.resize(2);
follower->MultiGet(ro, follower->DefaultColumnFamily(), 2,
keys.data(), pinnableValues.data(), statuses.data(), true);
ASSERT_OK(statuses[0]);
ASSERT_TRUE(statuses[1].IsNotFound());
ASSERT_EQ(pinnableValues[0], "v1");
pinnableValues.clear();

// Column family <-> snapshot mismatch
ASSERT_FALSE(follower->Get(ro, followerCF("cf1"), "cf1k1", &val).ok());

follower->ReleaseSnapshot(snapshots[0]);
Expand Down
18 changes: 14 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3139,8 +3139,9 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
if (dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
for (size_t i = 0; i < num_keys; ++i) {
statuses[i] = Status::NotSupported(
"MultiGet with timestamps does not support super snapshot");
"This variant of MultiGet does not yet support super snapshots");
}
return;
}
// RocksDB-Cloud contribution end

Expand Down Expand Up @@ -3400,8 +3401,12 @@ void DBImpl::MultiGetWithCallback(
multiget_cf_data[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
// RocksDB-Cloud contribution begin
if (!dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
}
// RocksDB-Cloud contribution end
}

// The actual implementation of batched MultiGet. Parameters -
Expand Down Expand Up @@ -3844,10 +3849,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
result = nullptr;

#else
// RocksDB-Cloud contribution begin
if (dynamic_cast<const SuperSnapshotImpl*>(read_options.snapshot)) {
return NewErrorIterator(Status::NotSupported(
"Tailing iterator not supported with super snapshot"));
}
// RocksDB-Cloud contribution end

SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv,
Expand Down Expand Up @@ -4077,6 +4084,8 @@ Status DBImpl::GetSuperSnapshots(
return Status::InvalidArgument(
"GetSuperSnapshots only supported in RocksDB compiled with USE_RTTI=1");
#endif
InstrumentedMutexLock l(&mutex_);

if (!is_snapshot_supported_) {
return Status::InvalidArgument("Snapshot not supported");
}
Expand All @@ -4092,7 +4101,8 @@ Status DBImpl::GetSuperSnapshots(
for (auto& cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
auto sv = cfd->GetReferencedSuperVersion(this);
auto sv = cfd->GetSuperVersion();
sv->Ref();
auto ss = new SuperSnapshotImpl(cfd, sv);
snapshots_.New(ss, snapshot_seq, unix_time,
/*is_write_conflict_boundary=*/false);
Expand Down