diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 4bf5c160996..3d6be9c710e 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -89,7 +89,10 @@ CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base, const std::shared_ptr& logger) : cloud_env_options(options), base_env_(base), info_log_(logger) {} -CloudEnv::~CloudEnv() {} +CloudEnv::~CloudEnv() { + cloud_env_options.cloud_log_controller.reset(); + cloud_env_options.storage_provider.reset(); +} Status CloudEnv::NewAwsEnv( Env* base_env, const std::string& src_cloud_bucket, diff --git a/cloud/cloud_env_impl.cc b/cloud/cloud_env_impl.cc index 1b4291b4e82..e5432899526 100644 --- a/cloud/cloud_env_impl.cc +++ b/cloud/cloud_env_impl.cc @@ -33,6 +33,104 @@ CloudEnvImpl::~CloudEnvImpl() { StopPurger(); } +Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->ExistsCloudObject( + GetDestBucketName(), destname(fname)); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->ExistsCloudObject( + GetSrcBucketName(), srcname(fname)); + } + return st; +} + +Status CloudEnvImpl::GetCloudObject(const std::string& fname) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObject( + GetDestBucketName(), destname(fname), fname); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObject( + GetSrcBucketName(), srcname(fname), fname); + } + return st; +} + +Status CloudEnvImpl::GetCloudObjectSize(const std::string& fname, + uint64_t* remote_size) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObjectSize( + GetDestBucketName(), destname(fname), remote_size); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObjectSize( + GetSrcBucketName(), srcname(fname), remote_size); + } + return st; +} + +Status CloudEnvImpl::GetCloudObjectModificationTime(const std::string& fname, + uint64_t* time) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( + GetDestBucketName(), destname(fname), time); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( + GetSrcBucketName(), srcname(fname), time); + } + return st; +} + +Status CloudEnvImpl::ListCloudObjects(const std::string& path, + std::vector* result) { + Status st; + // Fetch the list of children from both cloud buckets + if (HasSrcBucket()) { + st = cloud_env_options.storage_provider->ListCloudObjects( + GetSrcBucketName(), GetSrcObjectPath(), result); + if (!st.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "[%s] GetChildren src bucket %s %s error from S3 %s", Name(), + GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str()); + return st; + } + } + if (HasDestBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->ListCloudObjects( + GetDestBucketName(), GetDestObjectPath(), result); + if (!st.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "[%s] GetChildren dest bucket %s %s error from S3 %s", Name(), + GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str()); + } + } + return st; +} + +Status CloudEnvImpl::NewCloudReadableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + Status st = Status::NotFound(); + if (HasDestBucket()) { // read from destination + st = cloud_env_options.storage_provider->NewCloudReadableFile( + GetDestBucketName(), destname(fname), result, options); + if (st.ok()) { + return st; + } + } + if (HasSrcBucket() && !SrcMatchesDest()) { // read from src bucket + st = cloud_env_options.storage_provider->NewCloudReadableFile( + GetSrcBucketName(), srcname(fname), result, options); + } + return st; +} + // open a file for sequential reading Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname, std::unique_ptr* result, @@ -58,28 +156,14 @@ Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname, if (!st.ok()) { if (cloud_env_options.keep_local_sst_files || !sstfile) { // copy the file to the local storage if keep_local_sst_files is true - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetDestBucketName(), destname(fname), fname); - } - if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetSrcBucketName(), srcname(fname), fname); - } + st = GetCloudObject(fname); if (st.ok()) { // we successfully copied the file, try opening it locally now st = base_env_->NewSequentialFile(fname, result, options); } } else { std::unique_ptr file; - if (!st.ok() && HasDestBucket()) { // read from destination - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetDestBucketName(), destname(fname), &file, options); - } - if (!st.ok() && HasSrcBucket()) { // read from src bucket - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetSrcBucketName(), srcname(fname), &file, options); - } + st = NewCloudReadableFile(fname, &file, options); if (st.ok()) { result->reset(file.release()); } @@ -145,14 +229,7 @@ Status CloudEnvImpl::NewRandomAccessFile( if (cloud_env_options.keep_local_sst_files || !sstfile) { if (!st.ok()) { // copy the file to the local storage if keep_local_sst_files is true - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetDestBucketName(), destname(fname), fname); - } - if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetSrcBucketName(), srcname(fname), fname); - } + st = GetCloudObject(fname); if (st.ok()) { // we successfully copied the file, try opening it locally now st = base_env_->NewRandomAccessFile(fname, result, options); @@ -169,12 +246,7 @@ Status CloudEnvImpl::NewRandomAccessFile( } stax = Status::NotFound(); if (HasDestBucket()) { - stax = cloud_env_options.storage_provider->GetCloudObjectSize( - GetDestBucketName(), destname(fname), &remote_size); - } - if (stax.IsNotFound() && HasSrcBucket()) { - stax = cloud_env_options.storage_provider->GetCloudObjectSize( - GetSrcBucketName(), srcname(fname), &remote_size); + stax = GetCloudObjectSize(fname, &remote_size); } if (stax.IsNotFound() && !HasDestBucket()) { // It is legal for file to not be present in S3 if destination bucket @@ -193,14 +265,7 @@ Status CloudEnvImpl::NewRandomAccessFile( // true, we will never use CloudReadableFile to read; we copy the file // locally and read using base_env. std::unique_ptr file; - if (!st.ok() && HasDestBucket()) { - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetDestBucketName(), destname(fname), &file, options); - } - if (!st.ok() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetSrcBucketName(), srcname(fname), &file, options); - } + st = NewCloudReadableFile(fname, &file, options); if (st.ok()) { result->reset(file.release()); } @@ -294,13 +359,8 @@ Status CloudEnvImpl::FileExists(const std::string& logical_fname) { if (sstfile || manifest || identity) { // We read first from local storage and then from cloud storage. st = base_env_->FileExists(fname); - if (st.IsNotFound() && HasDestBucket()) { - st = cloud_env_options.storage_provider->ExistsCloudObject( - GetDestBucketName(), destname(fname)); - } - if (!st.ok() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->ExistsCloudObject( - GetSrcBucketName(), srcname(fname)); + if (st.IsNotFound()) { + st = ExistsCloudObject(fname); } } else if (logfile && !cloud_env_options.keep_local_log_files) { // read from Kinesis @@ -319,26 +379,11 @@ Status CloudEnvImpl::GetChildren(const std::string& path, Name(), path.c_str()); result->clear(); - // Fetch the list of children from both buckets in S3 Status st; - if (HasSrcBucket() && !cloud_env_options.skip_cloud_files_in_getchildren) { - st = cloud_env_options.storage_provider->ListCloudObjects( - GetSrcBucketName(), GetSrcObjectPath(), result); + if (!cloud_env_options.skip_cloud_files_in_getchildren) { + // Fetch the list of children from the cloud + st = ListCloudObjects(path, result); if (!st.ok()) { - Log(InfoLogLevel::ERROR_LEVEL, info_log_, - "[%s] GetChildren src bucket %s %s error from S3 %s", Name(), - GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str()); - return st; - } - } - if (HasDestBucket() && !SrcMatchesDest() && - !cloud_env_options.skip_cloud_files_in_getchildren) { - st = cloud_env_options.storage_provider->ListCloudObjects( - GetDestBucketName(), GetDestObjectPath(), result); - if (!st.ok()) { - Log(InfoLogLevel::ERROR_LEVEL, info_log_, - "[%s] GetChildren dest bucket %s %s error from S3 %s", Name(), - GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str()); return st; } } @@ -399,16 +444,7 @@ Status CloudEnvImpl::GetFileSize(const std::string& logical_fname, if (base_env_->FileExists(fname).ok()) { st = base_env_->GetFileSize(fname, size); } else { - st = Status::NotFound(); - // Get file length from CloudStorage - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectSize( - GetDestBucketName(), destname(fname), size); - } - if (st.IsNotFound() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectSize( - GetSrcBucketName(), srcname(fname), size); - } + st = GetCloudObjectSize(fname, size); } } else if (logfile && !cloud_env_options.keep_local_log_files) { st = cloud_env_options.cloud_log_controller->GetFileSize(fname, size); @@ -435,15 +471,7 @@ Status CloudEnvImpl::GetFileModificationTime(const std::string& logical_fname, if (base_env_->FileExists(fname).ok()) { st = base_env_->GetFileModificationTime(fname, time); } else { - st = Status::NotFound(); - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( - GetDestBucketName(), destname(fname), time); - } - if (st.IsNotFound() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( - GetSrcBucketName(), srcname(fname), time); - } + st = GetCloudObjectModificationTime(fname, time); } } else if (logfile && !cloud_env_options.keep_local_log_files) { st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname, diff --git a/cloud/cloud_env_impl.h b/cloud/cloud_env_impl.h index 00ad68a8a3d..4184909a005 100644 --- a/cloud/cloud_env_impl.h +++ b/cloud/cloud_env_impl.h @@ -12,6 +12,7 @@ #include "rocksdb/status.h" namespace rocksdb { +class CloudStorageReadableFile; // // The Cloud environment @@ -218,6 +219,29 @@ class CloudEnvImpl : public CloudEnv { } protected: + // Checks to see if the input fname exists in the dest or src bucket + Status ExistsCloudObject(const std::string& fname); + + // Gets the cloud object fname from the dest or src bucket + Status GetCloudObject(const std::string& fname); + + // Gets the size of the named cloud object from the dest or src bucket + Status GetCloudObjectSize(const std::string& fname, uint64_t* remote_size); + + // Gets the modification time of the named cloud object from the dest or src + // bucket + Status GetCloudObjectModificationTime(const std::string& fname, + uint64_t* time); + + // Returns the list of cloud objects from the src and dest buckets. + Status ListCloudObjects(const std::string& path, + std::vector* result); + + // Returns a CloudStorageReadableFile from the dest or src bucket + Status NewCloudReadableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + // Copy IDENTITY file to cloud storage. Update dbid registry. Status SaveIdentityToCloud(const std::string& localfile, const std::string& idfile);