Skip to content

Commit

Permalink
Add method to extract live file list from local manifest (#286)
Browse files Browse the repository at this point in the history
Add a new method `FindLiveFilesFromLocalManifest` that mirrors FindAllLiveFiles, but doesn't resolve manifest filename or fetch it from the cloud, making it somewhat safer (in return for the need to ensure the manifest is present) and more flexible as it can operate for example on a copy of the live manifest.

### Test plan

Most of the code is already covered by tests for `FindAllLiveFiles`, new test added in `db_cloud_test` for retrieving live files from a manifest copy.
  • Loading branch information
aanq authored Sep 26, 2023
1 parent 587e968 commit feb393b
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 41 deletions.
43 changes: 34 additions & 9 deletions cloud/cloud_file_system_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2002,7 +2002,6 @@ IOStatus CloudFileSystemImpl::UploadCloudManifest(
return st;
}


IOStatus CloudFileSystemImpl::ApplyCloudManifestDelta(
const CloudManifestDelta& delta, bool* delta_applied) {
*delta_applied = cloud_manifest_->AddEpoch(delta.file_num, delta.epoch);
Expand Down Expand Up @@ -2271,6 +2270,19 @@ Status CloudFileSystemImpl::CheckValidity() const {
}
}

void CloudFileSystemImpl::RemapFileNumbers(
const std::set<uint64_t>& file_numbers,
std::vector<std::string>* sst_file_names) {
sst_file_names->resize(file_numbers.size());

size_t idx = 0;
for (auto num : file_numbers) {
std::string logical_path = MakeTableFileName("" /* path */, num);
(*sst_file_names)[idx] = RemapFilename(logical_path);
idx++;
}
}

IOStatus CloudFileSystemImpl::FindAllLiveFiles(
const std::string& local_dbname, std::vector<std::string>* live_sst_files,
std::string* manifest_file) {
Expand All @@ -2282,18 +2294,29 @@ IOStatus CloudFileSystemImpl::FindAllLiveFiles(
return st;
}

live_sst_files->resize(file_nums.size());

// filename will be remapped correctly based on current_epoch of
// cloud_manifest
*manifest_file =
RemapFilename(ManifestFileWithEpoch("" /* dbname */, "" /* epoch */));
size_t idx = 0;
for (auto num : file_nums) {
std::string logical_path = MakeTableFileName("" /* path */, num);
(*live_sst_files)[idx] = RemapFilename(logical_path);
idx++;

RemapFileNumbers(file_nums, live_sst_files);

return IOStatus::OK();
}

IOStatus CloudFileSystemImpl::FindLiveFilesFromLocalManifest(
const std::string& manifest_file,
std::vector<std::string>* live_sst_files) {
std::unique_ptr<LocalManifestReader> extractor(
new LocalManifestReader(info_log_, this));
std::set<uint64_t> file_nums;
auto st = extractor->GetManifestLiveFiles(manifest_file, &file_nums);
if (!st.ok()) {
return st;
}

RemapFileNumbers(file_nums, live_sst_files);

return IOStatus::OK();
}

Expand All @@ -2307,7 +2330,9 @@ void CloudFileSystemImpl::TEST_InitEmptyCloudManifest() {
}

size_t CloudFileSystemImpl::TEST_NumScheduledJobs() const {
return cloud_file_deletion_scheduler_ ? cloud_file_deletion_scheduler_->TEST_NumScheduledJobs() : 0;
return cloud_file_deletion_scheduler_
? cloud_file_deletion_scheduler_->TEST_NumScheduledJobs()
: 0;
}

#endif
Expand Down
18 changes: 14 additions & 4 deletions cloud/cloud_file_system_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <condition_variable>
#include <mutex>
#include <thread>
#include <set>

#include "cloud/cloud_manifest.h"
#include "port/port_posix.h"
Expand Down Expand Up @@ -151,13 +152,17 @@ class CloudFileSystemImpl : public CloudFileSystem {

// Find all live files based on cloud_manifest_ and local MANIFEST FILE
// If local MANIFEST file doesn't exist, it will pull from cloud
//
//
// REQUIRES: cloud_manifest_ is loaded
// REQUIRES: cloud_manifest_ is not updated when calling this function
IOStatus FindAllLiveFiles(const std::string& local_dbname,
std::vector<std::string>* live_sst_files,
std::string* manifest_file) override;

IOStatus FindLiveFilesFromLocalManifest(
const std::string& manifest_file,
std::vector<std::string>* live_sst_files) override;

IOStatus extractParents(const std::string& bucket_name_prefix,
const DbidList& dbid_list, DbidParents* parents);
IOStatus PreloadCloudManifest(const std::string& local_dbname) override;
Expand Down Expand Up @@ -256,8 +261,7 @@ class CloudFileSystemImpl : public CloudFileSystem {
const CloudManifestDelta& delta) const override;

IOStatus GetMaxFileNumberFromCurrentManifest(
const std::string& local_dbname,
uint64_t* max_file_number) override;
const std::string& local_dbname, uint64_t* max_file_number) override;

// Upload MANIFEST-epoch to the cloud
IOStatus UploadManifest(const std::string& local_dbname,
Expand Down Expand Up @@ -374,10 +378,15 @@ class CloudFileSystemImpl : public CloudFileSystem {
// 00010.sst-[epochX], but the real mapping for 00010.sst is [epochY], the
// file will be treated as invisible
bool IsFileInvisible(const std::vector<std::string>& active_cookies,
const std::string& fname) const;
const std::string& fname) const;

void log(InfoLogLevel level, const std::string& fname,
const std::string& msg);

// Remap SST file numbers to file names
void RemapFileNumbers(const std::set<uint64_t>& file_numbers,
std::vector<std::string>* sst_file_names);

// Fetch the cloud manifest based on the cookie
IOStatus FetchCloudManifest(const std::string& local_dbname,
const std::string& cookie);
Expand All @@ -386,6 +395,7 @@ class CloudFileSystemImpl : public CloudFileSystem {
IOStatus FetchManifest(const std::string& local_dbname,
const std::string& epoch);
std::string GenerateNewEpochId();

std::unique_ptr<CloudManifest> cloud_manifest_;
// This runs only in tests when we want to disable cloud manifest
// functionality
Expand Down
6 changes: 6 additions & 0 deletions cloud/cloud_file_system_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ class MockCloudFileSystem : public CloudFileSystem {
return notsup_;
}

IOStatus FindLiveFilesFromLocalManifest(
const std::string& /* manifest_file */,
std::vector<std::string>* /* live_sst_files */) override {
return notsup_;
}

private:
IOStatus notsup_;
std::string empty_;
Expand Down
44 changes: 43 additions & 1 deletion cloud/db_cloud_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <atomic>
#include <chrono>
#include <cinttypes>
#include <filesystem>

#include "cloud/cloud_file_deletion_scheduler.h"
#include "cloud/cloud_file_system_impl.h"
Expand Down Expand Up @@ -429,7 +430,7 @@ class CloudTest : public testing::Test {
ASSERT_EQ(sst_files.size(), 1);
}

// check that fname existsin in src bucket/object path
// check that fname exists in in src bucket/object path
rocksdb::Status ExistsCloudObject(const std::string& filename) const {
return GetCloudFileSystem()->GetStorageProvider()->ExistsCloudObject(
GetCloudFileSystem()->GetSrcBucketName(),
Expand Down Expand Up @@ -593,6 +594,47 @@ TEST_F(CloudTest, GetChildrenTest) {
EXPECT_EQ(sst_files, 1);
}

TEST_F(CloudTest, FindLiveFilesFromLocalManifestTest) {
OpenDB();
ASSERT_OK(db_->Put(WriteOptions(), "Hello", "Universe"));
ASSERT_OK(db_->Flush(FlushOptions()));

// wait until files are persisted into s3
GetDBImpl()->TEST_WaitForBackgroundWork();

CloseDB();

// determine the manifest name and store a copy in a different location
auto cfs = GetCloudFileSystem();
auto manifest_file = cfs->RemapFilename("MANIFEST");
auto manifest_path = std::filesystem::path(dbname_) / manifest_file;

auto alt_manifest_path =
std::filesystem::temp_directory_path() / ("ALT-" + manifest_file);
std::filesystem::copy_file(manifest_path, alt_manifest_path);

DestroyDir(dbname_);

std::vector<std::string> tablefiles;
// verify the copied manifest can be processed correctly
ASSERT_OK(GetCloudFileSystem()->FindLiveFilesFromLocalManifest(
alt_manifest_path, &tablefiles));

// verify the result
EXPECT_EQ(tablefiles.size(), 1);

for (auto name : tablefiles) {
EXPECT_EQ(GetFileType(name), RocksDBFileType::kSstFile);
// verify that the sst file indeed exists in cloud
EXPECT_OK(GetCloudFileSystem()->GetStorageProvider()->ExistsCloudObject(
GetCloudFileSystem()->GetSrcBucketName(),
GetCloudFileSystem()->GetSrcObjectPath() + pathsep + name));
}

// clean up
std::filesystem::remove(alt_manifest_path);
}

//
// Create and read from a clone.
//
Expand Down
33 changes: 27 additions & 6 deletions cloud/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ IOStatus LocalManifestReader::GetLiveFilesLocally(
std::unique_ptr<SequentialFileReader> manifest_file_reader;
IOStatus s;
{
// file name here doesn't matter, it will always be mapped to the correct Manifest file.
// use empty epoch here so that it will be recognized as manifest file type
// file name here doesn't matter, it will always be mapped to the correct
// Manifest file. use empty epoch here so that it will be recognized as
// manifest file type
auto local_manifest_file = cfs_impl->RemapFilename(
ManifestFileWithEpoch(local_dbname, "" /* epoch */));

Expand All @@ -53,6 +54,27 @@ IOStatus LocalManifestReader::GetLiveFilesLocally(
return GetLiveFilesFromFileReader(std::move(manifest_file_reader), list);
}

IOStatus LocalManifestReader::GetManifestLiveFiles(
const std::string& manifest_file, std::set<uint64_t>* list) const {
auto* cfs_impl = dynamic_cast<CloudFileSystemImpl*>(cfs_);
assert(cfs_impl);

std::unique_ptr<SequentialFileReader> manifest_file_reader;
IOStatus s;
{
std::unique_ptr<FSSequentialFile> file;
s = cfs_impl->NewSequentialFile(manifest_file, FileOptions(), &file,
nullptr /*dbg*/);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(file), manifest_file));
}

return GetLiveFilesFromFileReader(std::move(manifest_file_reader), list);
}

IOStatus LocalManifestReader::GetLiveFilesFromFileReader(
std::unique_ptr<SequentialFileReader> file_reader,
std::set<uint64_t>* list) const {
Expand Down Expand Up @@ -92,8 +114,7 @@ IOStatus LocalManifestReader::GetLiveFilesFromFileReader(
uint64_t num = one.second;
// Deleted files should belong to some CF
auto it = cf_live_files.find(edit.GetColumnFamily());
if ((it == cf_live_files.end()) ||
(it->second.count(level) == 0) ||
if ((it == cf_live_files.end()) || (it->second.count(level) == 0) ||
(it->second[level].count(num) == 0)) {
return IOStatus::Corruption(
"Corrupted Manifest file with unrecognized deleted file: " +
Expand Down Expand Up @@ -158,8 +179,8 @@ IOStatus ManifestReader::GetLiveFiles(const std::string& bucket_path,
}
std::unique_ptr<SequentialFileReader> file_reader;
{
auto manifestFile = ManifestFileWithEpoch(
bucket_path, cloud_manifest->GetCurrentEpoch());
auto manifestFile =
ManifestFileWithEpoch(bucket_path, cloud_manifest->GetCurrentEpoch());
std::unique_ptr<FSSequentialFile> file;
s = cfs_->NewSequentialFileCloud(bucket_prefix_, manifestFile, file_opts,
&file, dbg);
Expand Down
14 changes: 12 additions & 2 deletions cloud/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ class LocalManifestReader {
IOStatus GetLiveFilesLocally(const std::string& local_dbname,
std::set<uint64_t>* list) const;

// Read given local manifest file and return all live files that it
// references. This doesn't rely on CLOUDMANIFEST and just accepts (any valid)
// manifest file.
//
// Provided manifest file is not updated or pulled from cloud when calling the
// function.
IOStatus GetManifestLiveFiles(const std::string& manifest_file,
std::set<uint64_t>* list) const;

protected:
// Get all the live sst file number by reading version_edit records from file_reader
// Get all the live SST file numbers by reading version_edit records from
// file_reader
IOStatus GetLiveFilesFromFileReader(
std::unique_ptr<SequentialFileReader> file_reader,
std::set<uint64_t>* list) const;
Expand All @@ -42,7 +52,7 @@ class LocalManifestReader {
//
// Operates on MANIFEST files stored in the cloud bucket directly
//
class ManifestReader: public LocalManifestReader {
class ManifestReader : public LocalManifestReader {
public:
ManifestReader(std::shared_ptr<Logger> info_log, CloudFileSystem* cfs,
const std::string& bucket_prefix);
Expand Down
Loading

0 comments on commit feb393b

Please sign in to comment.