Skip to content

Commit

Permalink
Clean up use of StorageProvider
Browse files Browse the repository at this point in the history
There were a lot of places where the code would make two calls into the StorageProvider -- one for the dest bucket and one for source.  Some -- but not all -- of the calls were guarded by a "SrcMatchesDest" check to prevent an extra call.  This checkin moves most of the calls into a single method that will check the two buckets and perform the proper checks.

Also, makes sure the storage provider and log controller are deleted prior to the cloud env completely being deleted.  This can help guard against some cases where the provider attempts to access some cloud env resources during detruction.
  • Loading branch information
mrambacher committed May 5, 2020
1 parent d82aa33 commit 002a4eb
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 83 deletions.
5 changes: 4 additions & 1 deletion cloud/cloud_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base,
const std::shared_ptr<Logger>& logger)
: cloud_env_options(options), base_env_(base), info_log_(logger) {}

CloudEnv::~CloudEnv() {}
CloudEnv::~CloudEnv() {
cloud_env_options.cloud_log_controller.reset();
cloud_env_options.storage_provider.reset();
}

Status CloudEnv::NewAwsEnv(
Env* base_env, const std::string& src_cloud_bucket,
Expand Down
192 changes: 110 additions & 82 deletions cloud/cloud_env_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,104 @@ CloudEnvImpl::~CloudEnvImpl() {
StopPurger();
}

Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) {
Status st = Status::NotFound();
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->ExistsCloudObject(
GetDestBucketName(), destname(fname));
}
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->ExistsCloudObject(
GetSrcBucketName(), srcname(fname));
}
return st;
}

Status CloudEnvImpl::GetCloudObject(const std::string& fname) {
Status st = Status::NotFound();
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetDestBucketName(), destname(fname), fname);
}
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetSrcBucketName(), srcname(fname), fname);
}
return st;
}

Status CloudEnvImpl::GetCloudObjectSize(const std::string& fname,
uint64_t* remote_size) {
Status st = Status::NotFound();
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectSize(
GetDestBucketName(), destname(fname), remote_size);
}
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->GetCloudObjectSize(
GetSrcBucketName(), srcname(fname), remote_size);
}
return st;
}

Status CloudEnvImpl::GetCloudObjectModificationTime(const std::string& fname,
uint64_t* time) {
Status st = Status::NotFound();
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
GetDestBucketName(), destname(fname), time);
}
if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
GetSrcBucketName(), srcname(fname), time);
}
return st;
}

Status CloudEnvImpl::ListCloudObjects(const std::string& path,
std::vector<std::string>* result) {
Status st;
// Fetch the list of children from both cloud buckets
if (HasSrcBucket()) {
st = cloud_env_options.storage_provider->ListCloudObjects(
GetSrcBucketName(), GetSrcObjectPath(), result);
if (!st.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"[%s] GetChildren src bucket %s %s error from S3 %s", Name(),
GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str());
return st;
}
}
if (HasDestBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->ListCloudObjects(
GetDestBucketName(), GetDestObjectPath(), result);
if (!st.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"[%s] GetChildren dest bucket %s %s error from S3 %s", Name(),
GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str());
}
}
return st;
}

Status CloudEnvImpl::NewCloudReadableFile(
const std::string& fname, std::unique_ptr<CloudStorageReadableFile>* result,
const EnvOptions& options) {
Status st = Status::NotFound();
if (HasDestBucket()) { // read from destination
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetDestBucketName(), destname(fname), result, options);
if (st.ok()) {
return st;
}
}
if (HasSrcBucket() && !SrcMatchesDest()) { // read from src bucket
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetSrcBucketName(), srcname(fname), result, options);
}
return st;
}

// open a file for sequential reading
Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname,
std::unique_ptr<SequentialFile>* result,
Expand All @@ -58,28 +156,14 @@ Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname,
if (!st.ok()) {
if (cloud_env_options.keep_local_sst_files || !sstfile) {
// copy the file to the local storage if keep_local_sst_files is true
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetDestBucketName(), destname(fname), fname);
}
if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetSrcBucketName(), srcname(fname), fname);
}
st = GetCloudObject(fname);
if (st.ok()) {
// we successfully copied the file, try opening it locally now
st = base_env_->NewSequentialFile(fname, result, options);
}
} else {
std::unique_ptr<CloudStorageReadableFile> file;
if (!st.ok() && HasDestBucket()) { // read from destination
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetDestBucketName(), destname(fname), &file, options);
}
if (!st.ok() && HasSrcBucket()) { // read from src bucket
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetSrcBucketName(), srcname(fname), &file, options);
}
st = NewCloudReadableFile(fname, &file, options);
if (st.ok()) {
result->reset(file.release());
}
Expand Down Expand Up @@ -145,14 +229,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
if (cloud_env_options.keep_local_sst_files || !sstfile) {
if (!st.ok()) {
// copy the file to the local storage if keep_local_sst_files is true
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetDestBucketName(), destname(fname), fname);
}
if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) {
st = cloud_env_options.storage_provider->GetCloudObject(
GetSrcBucketName(), srcname(fname), fname);
}
st = GetCloudObject(fname);
if (st.ok()) {
// we successfully copied the file, try opening it locally now
st = base_env_->NewRandomAccessFile(fname, result, options);
Expand All @@ -169,12 +246,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
}
stax = Status::NotFound();
if (HasDestBucket()) {
stax = cloud_env_options.storage_provider->GetCloudObjectSize(
GetDestBucketName(), destname(fname), &remote_size);
}
if (stax.IsNotFound() && HasSrcBucket()) {
stax = cloud_env_options.storage_provider->GetCloudObjectSize(
GetSrcBucketName(), srcname(fname), &remote_size);
stax = GetCloudObjectSize(fname, &remote_size);
}
if (stax.IsNotFound() && !HasDestBucket()) {
// It is legal for file to not be present in S3 if destination bucket
Expand All @@ -193,14 +265,7 @@ Status CloudEnvImpl::NewRandomAccessFile(
// true, we will never use CloudReadableFile to read; we copy the file
// locally and read using base_env.
std::unique_ptr<CloudStorageReadableFile> file;
if (!st.ok() && HasDestBucket()) {
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetDestBucketName(), destname(fname), &file, options);
}
if (!st.ok() && HasSrcBucket()) {
st = cloud_env_options.storage_provider->NewCloudReadableFile(
GetSrcBucketName(), srcname(fname), &file, options);
}
st = NewCloudReadableFile(fname, &file, options);
if (st.ok()) {
result->reset(file.release());
}
Expand Down Expand Up @@ -294,13 +359,8 @@ Status CloudEnvImpl::FileExists(const std::string& logical_fname) {
if (sstfile || manifest || identity) {
// We read first from local storage and then from cloud storage.
st = base_env_->FileExists(fname);
if (st.IsNotFound() && HasDestBucket()) {
st = cloud_env_options.storage_provider->ExistsCloudObject(
GetDestBucketName(), destname(fname));
}
if (!st.ok() && HasSrcBucket()) {
st = cloud_env_options.storage_provider->ExistsCloudObject(
GetSrcBucketName(), srcname(fname));
if (st.IsNotFound()) {
st = ExistsCloudObject(fname);
}
} else if (logfile && !cloud_env_options.keep_local_log_files) {
// read from Kinesis
Expand All @@ -319,26 +379,11 @@ Status CloudEnvImpl::GetChildren(const std::string& path,
Name(), path.c_str());
result->clear();

// Fetch the list of children from both buckets in S3
Status st;
if (HasSrcBucket() && !cloud_env_options.skip_cloud_files_in_getchildren) {
st = cloud_env_options.storage_provider->ListCloudObjects(
GetSrcBucketName(), GetSrcObjectPath(), result);
if (!cloud_env_options.skip_cloud_files_in_getchildren) {
// Fetch the list of children from the cloud
st = ListCloudObjects(path, result);
if (!st.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"[%s] GetChildren src bucket %s %s error from S3 %s", Name(),
GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str());
return st;
}
}
if (HasDestBucket() && !SrcMatchesDest() &&
!cloud_env_options.skip_cloud_files_in_getchildren) {
st = cloud_env_options.storage_provider->ListCloudObjects(
GetDestBucketName(), GetDestObjectPath(), result);
if (!st.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"[%s] GetChildren dest bucket %s %s error from S3 %s", Name(),
GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str());
return st;
}
}
Expand Down Expand Up @@ -399,16 +444,7 @@ Status CloudEnvImpl::GetFileSize(const std::string& logical_fname,
if (base_env_->FileExists(fname).ok()) {
st = base_env_->GetFileSize(fname, size);
} else {
st = Status::NotFound();
// Get file length from CloudStorage
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectSize(
GetDestBucketName(), destname(fname), size);
}
if (st.IsNotFound() && HasSrcBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectSize(
GetSrcBucketName(), srcname(fname), size);
}
st = GetCloudObjectSize(fname, size);
}
} else if (logfile && !cloud_env_options.keep_local_log_files) {
st = cloud_env_options.cloud_log_controller->GetFileSize(fname, size);
Expand All @@ -435,15 +471,7 @@ Status CloudEnvImpl::GetFileModificationTime(const std::string& logical_fname,
if (base_env_->FileExists(fname).ok()) {
st = base_env_->GetFileModificationTime(fname, time);
} else {
st = Status::NotFound();
if (HasDestBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
GetDestBucketName(), destname(fname), time);
}
if (st.IsNotFound() && HasSrcBucket()) {
st = cloud_env_options.storage_provider->GetCloudObjectModificationTime(
GetSrcBucketName(), srcname(fname), time);
}
st = GetCloudObjectModificationTime(fname, time);
}
} else if (logfile && !cloud_env_options.keep_local_log_files) {
st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname,
Expand Down
24 changes: 24 additions & 0 deletions cloud/cloud_env_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "rocksdb/status.h"

namespace rocksdb {
class CloudStorageReadableFile;

//
// The Cloud environment
Expand Down Expand Up @@ -218,6 +219,29 @@ class CloudEnvImpl : public CloudEnv {
}

protected:
// Checks to see if the input fname exists in the dest or src bucket
Status ExistsCloudObject(const std::string& fname);

// Gets the cloud object fname from the dest or src bucket
Status GetCloudObject(const std::string& fname);

// Gets the size of the named cloud object from the dest or src bucket
Status GetCloudObjectSize(const std::string& fname, uint64_t* remote_size);

// Gets the modification time of the named cloud object from the dest or src
// bucket
Status GetCloudObjectModificationTime(const std::string& fname,
uint64_t* time);

// Returns the list of cloud objects from the src and dest buckets.
Status ListCloudObjects(const std::string& path,
std::vector<std::string>* result);

// Returns a CloudStorageReadableFile from the dest or src bucket
Status NewCloudReadableFile(const std::string& fname,
std::unique_ptr<CloudStorageReadableFile>* result,
const EnvOptions& options);

// Copy IDENTITY file to cloud storage. Update dbid registry.
Status SaveIdentityToCloud(const std::string& localfile,
const std::string& idfile);
Expand Down

0 comments on commit 002a4eb

Please sign in to comment.