From 002a4eb04fdf9e659e2171a32ec6ae8f674f77d9 Mon Sep 17 00:00:00 2001 From: mrambacher Date: Tue, 5 May 2020 15:42:37 -0400 Subject: [PATCH] Clean up use of StorageProvider There were a lot of places where the code would make two calls into the StorageProvider -- one for the dest bucket and one for source. Some -- but not all -- of the calls were guarded by a "SrcMatchesDest" check to prevent an extra call. This checkin moves most of the calls into a single method that will check the two buckets and perform the proper checks. Also, makes sure the storage provider and log controller are deleted prior to the cloud env completely being deleted. This can help guard against some cases where the provider attempts to access some cloud env resources during detruction. --- cloud/cloud_env.cc | 5 +- cloud/cloud_env_impl.cc | 192 +++++++++++++++++++++++----------------- cloud/cloud_env_impl.h | 24 +++++ 3 files changed, 138 insertions(+), 83 deletions(-) diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 4bf5c160996..3d6be9c710e 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -89,7 +89,10 @@ CloudEnv::CloudEnv(const CloudEnvOptions& options, Env* base, const std::shared_ptr& logger) : cloud_env_options(options), base_env_(base), info_log_(logger) {} -CloudEnv::~CloudEnv() {} +CloudEnv::~CloudEnv() { + cloud_env_options.cloud_log_controller.reset(); + cloud_env_options.storage_provider.reset(); +} Status CloudEnv::NewAwsEnv( Env* base_env, const std::string& src_cloud_bucket, diff --git a/cloud/cloud_env_impl.cc b/cloud/cloud_env_impl.cc index 1b4291b4e82..e5432899526 100644 --- a/cloud/cloud_env_impl.cc +++ b/cloud/cloud_env_impl.cc @@ -33,6 +33,104 @@ CloudEnvImpl::~CloudEnvImpl() { StopPurger(); } +Status CloudEnvImpl::ExistsCloudObject(const std::string& fname) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->ExistsCloudObject( + GetDestBucketName(), destname(fname)); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->ExistsCloudObject( + GetSrcBucketName(), srcname(fname)); + } + return st; +} + +Status CloudEnvImpl::GetCloudObject(const std::string& fname) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObject( + GetDestBucketName(), destname(fname), fname); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObject( + GetSrcBucketName(), srcname(fname), fname); + } + return st; +} + +Status CloudEnvImpl::GetCloudObjectSize(const std::string& fname, + uint64_t* remote_size) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObjectSize( + GetDestBucketName(), destname(fname), remote_size); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObjectSize( + GetSrcBucketName(), srcname(fname), remote_size); + } + return st; +} + +Status CloudEnvImpl::GetCloudObjectModificationTime(const std::string& fname, + uint64_t* time) { + Status st = Status::NotFound(); + if (HasDestBucket()) { + st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( + GetDestBucketName(), destname(fname), time); + } + if (st.IsNotFound() && HasSrcBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( + GetSrcBucketName(), srcname(fname), time); + } + return st; +} + +Status CloudEnvImpl::ListCloudObjects(const std::string& path, + std::vector* result) { + Status st; + // Fetch the list of children from both cloud buckets + if (HasSrcBucket()) { + st = cloud_env_options.storage_provider->ListCloudObjects( + GetSrcBucketName(), GetSrcObjectPath(), result); + if (!st.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "[%s] GetChildren src bucket %s %s error from S3 %s", Name(), + GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str()); + return st; + } + } + if (HasDestBucket() && !SrcMatchesDest()) { + st = cloud_env_options.storage_provider->ListCloudObjects( + GetDestBucketName(), GetDestObjectPath(), result); + if (!st.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "[%s] GetChildren dest bucket %s %s error from S3 %s", Name(), + GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str()); + } + } + return st; +} + +Status CloudEnvImpl::NewCloudReadableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + Status st = Status::NotFound(); + if (HasDestBucket()) { // read from destination + st = cloud_env_options.storage_provider->NewCloudReadableFile( + GetDestBucketName(), destname(fname), result, options); + if (st.ok()) { + return st; + } + } + if (HasSrcBucket() && !SrcMatchesDest()) { // read from src bucket + st = cloud_env_options.storage_provider->NewCloudReadableFile( + GetSrcBucketName(), srcname(fname), result, options); + } + return st; +} + // open a file for sequential reading Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname, std::unique_ptr* result, @@ -58,28 +156,14 @@ Status CloudEnvImpl::NewSequentialFile(const std::string& logical_fname, if (!st.ok()) { if (cloud_env_options.keep_local_sst_files || !sstfile) { // copy the file to the local storage if keep_local_sst_files is true - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetDestBucketName(), destname(fname), fname); - } - if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetSrcBucketName(), srcname(fname), fname); - } + st = GetCloudObject(fname); if (st.ok()) { // we successfully copied the file, try opening it locally now st = base_env_->NewSequentialFile(fname, result, options); } } else { std::unique_ptr file; - if (!st.ok() && HasDestBucket()) { // read from destination - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetDestBucketName(), destname(fname), &file, options); - } - if (!st.ok() && HasSrcBucket()) { // read from src bucket - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetSrcBucketName(), srcname(fname), &file, options); - } + st = NewCloudReadableFile(fname, &file, options); if (st.ok()) { result->reset(file.release()); } @@ -145,14 +229,7 @@ Status CloudEnvImpl::NewRandomAccessFile( if (cloud_env_options.keep_local_sst_files || !sstfile) { if (!st.ok()) { // copy the file to the local storage if keep_local_sst_files is true - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetDestBucketName(), destname(fname), fname); - } - if (!st.ok() && HasSrcBucket() && !SrcMatchesDest()) { - st = cloud_env_options.storage_provider->GetCloudObject( - GetSrcBucketName(), srcname(fname), fname); - } + st = GetCloudObject(fname); if (st.ok()) { // we successfully copied the file, try opening it locally now st = base_env_->NewRandomAccessFile(fname, result, options); @@ -169,12 +246,7 @@ Status CloudEnvImpl::NewRandomAccessFile( } stax = Status::NotFound(); if (HasDestBucket()) { - stax = cloud_env_options.storage_provider->GetCloudObjectSize( - GetDestBucketName(), destname(fname), &remote_size); - } - if (stax.IsNotFound() && HasSrcBucket()) { - stax = cloud_env_options.storage_provider->GetCloudObjectSize( - GetSrcBucketName(), srcname(fname), &remote_size); + stax = GetCloudObjectSize(fname, &remote_size); } if (stax.IsNotFound() && !HasDestBucket()) { // It is legal for file to not be present in S3 if destination bucket @@ -193,14 +265,7 @@ Status CloudEnvImpl::NewRandomAccessFile( // true, we will never use CloudReadableFile to read; we copy the file // locally and read using base_env. std::unique_ptr file; - if (!st.ok() && HasDestBucket()) { - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetDestBucketName(), destname(fname), &file, options); - } - if (!st.ok() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->NewCloudReadableFile( - GetSrcBucketName(), srcname(fname), &file, options); - } + st = NewCloudReadableFile(fname, &file, options); if (st.ok()) { result->reset(file.release()); } @@ -294,13 +359,8 @@ Status CloudEnvImpl::FileExists(const std::string& logical_fname) { if (sstfile || manifest || identity) { // We read first from local storage and then from cloud storage. st = base_env_->FileExists(fname); - if (st.IsNotFound() && HasDestBucket()) { - st = cloud_env_options.storage_provider->ExistsCloudObject( - GetDestBucketName(), destname(fname)); - } - if (!st.ok() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->ExistsCloudObject( - GetSrcBucketName(), srcname(fname)); + if (st.IsNotFound()) { + st = ExistsCloudObject(fname); } } else if (logfile && !cloud_env_options.keep_local_log_files) { // read from Kinesis @@ -319,26 +379,11 @@ Status CloudEnvImpl::GetChildren(const std::string& path, Name(), path.c_str()); result->clear(); - // Fetch the list of children from both buckets in S3 Status st; - if (HasSrcBucket() && !cloud_env_options.skip_cloud_files_in_getchildren) { - st = cloud_env_options.storage_provider->ListCloudObjects( - GetSrcBucketName(), GetSrcObjectPath(), result); + if (!cloud_env_options.skip_cloud_files_in_getchildren) { + // Fetch the list of children from the cloud + st = ListCloudObjects(path, result); if (!st.ok()) { - Log(InfoLogLevel::ERROR_LEVEL, info_log_, - "[%s] GetChildren src bucket %s %s error from S3 %s", Name(), - GetSrcBucketName().c_str(), path.c_str(), st.ToString().c_str()); - return st; - } - } - if (HasDestBucket() && !SrcMatchesDest() && - !cloud_env_options.skip_cloud_files_in_getchildren) { - st = cloud_env_options.storage_provider->ListCloudObjects( - GetDestBucketName(), GetDestObjectPath(), result); - if (!st.ok()) { - Log(InfoLogLevel::ERROR_LEVEL, info_log_, - "[%s] GetChildren dest bucket %s %s error from S3 %s", Name(), - GetDestBucketName().c_str(), path.c_str(), st.ToString().c_str()); return st; } } @@ -399,16 +444,7 @@ Status CloudEnvImpl::GetFileSize(const std::string& logical_fname, if (base_env_->FileExists(fname).ok()) { st = base_env_->GetFileSize(fname, size); } else { - st = Status::NotFound(); - // Get file length from CloudStorage - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectSize( - GetDestBucketName(), destname(fname), size); - } - if (st.IsNotFound() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectSize( - GetSrcBucketName(), srcname(fname), size); - } + st = GetCloudObjectSize(fname, size); } } else if (logfile && !cloud_env_options.keep_local_log_files) { st = cloud_env_options.cloud_log_controller->GetFileSize(fname, size); @@ -435,15 +471,7 @@ Status CloudEnvImpl::GetFileModificationTime(const std::string& logical_fname, if (base_env_->FileExists(fname).ok()) { st = base_env_->GetFileModificationTime(fname, time); } else { - st = Status::NotFound(); - if (HasDestBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( - GetDestBucketName(), destname(fname), time); - } - if (st.IsNotFound() && HasSrcBucket()) { - st = cloud_env_options.storage_provider->GetCloudObjectModificationTime( - GetSrcBucketName(), srcname(fname), time); - } + st = GetCloudObjectModificationTime(fname, time); } } else if (logfile && !cloud_env_options.keep_local_log_files) { st = cloud_env_options.cloud_log_controller->GetFileModificationTime(fname, diff --git a/cloud/cloud_env_impl.h b/cloud/cloud_env_impl.h index 00ad68a8a3d..4184909a005 100644 --- a/cloud/cloud_env_impl.h +++ b/cloud/cloud_env_impl.h @@ -12,6 +12,7 @@ #include "rocksdb/status.h" namespace rocksdb { +class CloudStorageReadableFile; // // The Cloud environment @@ -218,6 +219,29 @@ class CloudEnvImpl : public CloudEnv { } protected: + // Checks to see if the input fname exists in the dest or src bucket + Status ExistsCloudObject(const std::string& fname); + + // Gets the cloud object fname from the dest or src bucket + Status GetCloudObject(const std::string& fname); + + // Gets the size of the named cloud object from the dest or src bucket + Status GetCloudObjectSize(const std::string& fname, uint64_t* remote_size); + + // Gets the modification time of the named cloud object from the dest or src + // bucket + Status GetCloudObjectModificationTime(const std::string& fname, + uint64_t* time); + + // Returns the list of cloud objects from the src and dest buckets. + Status ListCloudObjects(const std::string& path, + std::vector* result); + + // Returns a CloudStorageReadableFile from the dest or src bucket + Status NewCloudReadableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + // Copy IDENTITY file to cloud storage. Update dbid registry. Status SaveIdentityToCloud(const std::string& localfile, const std::string& idfile);