Skip to content

Commit

Permalink
add storage retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Jinming-Hu committed May 26, 2024
1 parent edfca94 commit 20e7243
Show file tree
Hide file tree
Showing 20 changed files with 683 additions and 629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ namespace Azure { namespace Storage { namespace Blobs {

class BlobBatchAccessHelper;

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchRequestPolicy(
const std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>>&
servicePerRetryPolicies,
const std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>>&
servicePerOperationPolicies,
const BlobClientOptions& options);
std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchRequestPipeline(
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& tokenAuthPolicy,
std::shared_ptr<StorageSharedKeyCredential> sharedKeyCredential,
BlobClientOptions options);

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchSubrequestPolicy(
std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchSubrequestPipeline(
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& tokenAuthPolicy,
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& sharedKeyAuthPolicy,
std::shared_ptr<StorageSharedKeyCredential> sharedKeyCredential,
const BlobClientOptions& options);
} // namespace _detail

Expand Down
36 changes: 21 additions & 15 deletions sdk/storage/azure-storage-blobs/src/blob_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <azure/storage/common/crypt.hpp>
#include <azure/storage/common/internal/constants.hpp>
#include <azure/storage/common/internal/shared_key_policy.hpp>
#include <azure/storage/common/internal/storage_service_version_policy.hpp>

#include <algorithm>
#include <cstring>
Expand Down Expand Up @@ -494,13 +495,17 @@ namespace Azure { namespace Storage { namespace Blobs {

BatchSubrequest::~BatchSubrequest() {}

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchRequestPolicy(
const std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>>&
servicePerRetryPolicies,
const std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>>&
servicePerOperationPolicies,
const BlobClientOptions& options)
std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchRequestPipeline(
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& tokenAuthPolicy,
std::shared_ptr<StorageSharedKeyCredential> sharedKeyCredential,
BlobClientOptions options)
{
if (sharedKeyCredential)
{
auto sharedKeyAuthPolicy
= std::make_unique<_internal::SharedKeyPolicy>(sharedKeyCredential);
options.PerRetryPolicies.emplace_back(std::move(sharedKeyAuthPolicy));
}
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
perRetryPolicies.push_back(std::make_unique<ConstructBatchRequestBodyPolicy>(
[](Core::Http::Request& request, const Core::Context& context) {
Expand All @@ -509,15 +514,14 @@ namespace Azure { namespace Storage { namespace Blobs {
[](std::unique_ptr<Core::Http::RawResponse>& rawResponse, const Core::Context& context) {
ParseSubresponses(rawResponse, context);
}));
for (auto& policy : servicePerRetryPolicies)
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
if (tokenAuthPolicy)
{
perRetryPolicies.push_back(policy->Clone());
perRetryPolicies.emplace_back(std::move(tokenAuthPolicy));
}
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
for (auto& policy : servicePerOperationPolicies)
{
perOperationPolicies.push_back(policy->Clone());
}
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));
return std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
_internal::BlobServicePackageName,
Expand All @@ -526,9 +530,9 @@ namespace Azure { namespace Storage { namespace Blobs {
std::move(perOperationPolicies));
}

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchSubrequestPolicy(
std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> ConstructBatchSubrequestPipeline(
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& tokenAuthPolicy,
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>&& sharedKeyAuthPolicy,
std::shared_ptr<StorageSharedKeyCredential> sharedKeyCredential,
const BlobClientOptions& options)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> policies;
Expand All @@ -551,8 +555,10 @@ namespace Azure { namespace Storage { namespace Blobs {
policies.emplace_back(policy->Clone());
}
policies.emplace_back(std::make_unique<RemoveXMsVersionPolicy>());
if (sharedKeyAuthPolicy)
if (sharedKeyCredential)
{
auto sharedKeyAuthPolicy
= std::make_unique<_internal::SharedKeyPolicy>(sharedKeyCredential);
policies.emplace_back(std::move(sharedKeyAuthPolicy));
}
policies.push_back(std::make_unique<NoopTransportPolicy>());
Expand Down
92 changes: 41 additions & 51 deletions sdk/storage/azure-storage-blobs/src/blob_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
#include <azure/storage/common/internal/constants.hpp>
#include <azure/storage/common/internal/file_io.hpp>
#include <azure/storage/common/internal/reliable_stream.hpp>
#include <azure/storage/common/internal/shared_key_policy.hpp>
#include <azure/storage/common/internal/storage_bearer_token_auth.hpp>
#include <azure/storage/common/internal/storage_per_retry_policy.hpp>
#include <azure/storage/common/internal/storage_service_version_policy.hpp>
#include <azure/storage/common/internal/storage_pipeline.hpp>
#include <azure/storage/common/internal/storage_switch_to_secondary_policy.hpp>
#include <azure/storage/common/storage_common.hpp>
#include <azure/storage/common/storage_exception.hpp>
Expand Down Expand Up @@ -58,23 +55,20 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobClient(blobUrl, options)
{
BlobClientOptions newOptions = options;
newOptions.PerRetryPolicies.emplace_back(
std::make_unique<_internal::SharedKeyPolicy>(credential));

std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), newOptions.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(newOptions.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
newOptions,
_internal::StorageHttpPipelineOptions serviceOptions;
serviceOptions.SharedKeyCredential = credential;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlobClient::BlobClient(
Expand All @@ -83,48 +77,44 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobClient(blobUrl, options)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
{
Azure::Core::Credentials::TokenRequestContext tokenContext;
tokenContext.Scopes.emplace_back(
options.Audience.HasValue()
? _internal::GetDefaultScopeForAudience(options.Audience.Value().ToString())
: _internal::StorageScope);
perRetryPolicies.emplace_back(
std::make_unique<_internal::StorageBearerTokenAuthenticationPolicy>(
credential, tokenContext, options.EnableTenantDiscovery));
}
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}
serviceOptions.TokenCredential = credential;
serviceOptions.EnableTenantDiscovery = options.EnableTenantDiscovery;
if (options.Audience.HasValue())
{
serviceOptions.TokenAudience = options.Audience.Value().ToString();
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlobClient::BlobClient(const std::string& blobUrl, const BlobClientOptions& options)
: m_blobUrl(blobUrl), m_customerProvidedKey(options.CustomerProvidedKey),
m_encryptionScope(options.EncryptionScope)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlockBlobClient BlobClient::AsBlockBlobClient() const { return BlockBlobClient(*this); }
Expand Down
120 changes: 58 additions & 62 deletions sdk/storage/azure-storage-blobs/src/blob_container_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#include <azure/core/http/policies/policy.hpp>
#include <azure/storage/common/crypt.hpp>
#include <azure/storage/common/internal/constants.hpp>
#include <azure/storage/common/internal/shared_key_policy.hpp>
#include <azure/storage/common/internal/storage_bearer_token_auth.hpp>
#include <azure/storage/common/internal/storage_per_retry_policy.hpp>
#include <azure/storage/common/internal/storage_pipeline.hpp>
#include <azure/storage/common/internal/storage_service_version_policy.hpp>
#include <azure/storage/common/internal/storage_switch_to_secondary_policy.hpp>
#include <azure/storage/common/storage_common.hpp>
Expand Down Expand Up @@ -129,30 +129,24 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobContainerClient(blobContainerUrl, options)
{
BlobClientOptions newOptions = options;
auto sharedKeyAuthPolicy = std::make_unique<_internal::SharedKeyPolicy>(credential);
newOptions.PerRetryPolicies.emplace_back(sharedKeyAuthPolicy->Clone());

std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobContainerUrl.GetHost(), newOptions.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(newOptions.ApiVersion));

m_batchRequestPipeline
= _detail::ConstructBatchRequestPolicy(perRetryPolicies, perOperationPolicies, newOptions);

m_batchSubrequestPipeline
= _detail::ConstructBatchSubrequestPolicy(nullptr, std::move(sharedKeyAuthPolicy), options);
_internal::StorageHttpPipelineOptions serviceOptions;
serviceOptions.SharedKeyCredential = credential;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobContainerUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
newOptions,
m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);

m_batchRequestPipeline = _detail::ConstructBatchRequestPipeline(nullptr, credential, options);
m_batchSubrequestPipeline
= _detail::ConstructBatchSubrequestPipeline(nullptr, credential, options);
}

BlobContainerClient::BlobContainerClient(
Expand All @@ -161,37 +155,41 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobContainerClient(blobContainerUrl, options)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobContainerUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy> tokenAuthPolicy;
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobContainerUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}
serviceOptions.TokenCredential = credential;
serviceOptions.EnableTenantDiscovery = options.EnableTenantDiscovery;
if (options.Audience.HasValue())
{
serviceOptions.TokenAudience = options.Audience.Value().ToString();
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
serviceOptions,
options);

{
Azure::Core::Credentials::TokenRequestContext tokenContext;
tokenContext.Scopes.emplace_back(
options.Audience.HasValue()
? _internal::GetDefaultScopeForAudience(options.Audience.Value().ToString())
: _internal::StorageScope);
tokenAuthPolicy = std::make_unique<_internal::StorageBearerTokenAuthenticationPolicy>(
credential, tokenContext, options.EnableTenantDiscovery);
perRetryPolicies.emplace_back(tokenAuthPolicy->Clone());
std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy> tokenPolicy
= std::make_unique<_internal::StorageBearerTokenAuthenticationPolicy>(
credential, tokenContext, options.EnableTenantDiscovery);

m_batchRequestPipeline
= _detail::ConstructBatchRequestPipeline(tokenPolicy->Clone(), nullptr, options);
m_batchSubrequestPipeline
= _detail::ConstructBatchSubrequestPipeline(std::move(tokenPolicy), nullptr, options);
}
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));

m_batchRequestPipeline
= _detail::ConstructBatchRequestPolicy(perRetryPolicies, perOperationPolicies, options);

m_batchSubrequestPipeline
= _detail::ConstructBatchSubrequestPolicy(std::move(tokenAuthPolicy), nullptr, options);

m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
}

BlobContainerClient::BlobContainerClient(
Expand All @@ -200,25 +198,23 @@ namespace Azure { namespace Storage { namespace Blobs {
: m_blobContainerUrl(blobContainerUrl), m_customerProvidedKey(options.CustomerProvidedKey),
m_encryptionScope(options.EncryptionScope)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobContainerUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));

m_batchRequestPipeline
= _detail::ConstructBatchRequestPolicy(perRetryPolicies, perOperationPolicies, options);

m_batchSubrequestPipeline = _detail::ConstructBatchSubrequestPolicy(nullptr, nullptr, options);
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobContainerUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);

m_batchRequestPipeline = _detail::ConstructBatchRequestPipeline(nullptr, nullptr, options);
m_batchSubrequestPipeline
= _detail::ConstructBatchSubrequestPipeline(nullptr, nullptr, options);
}

BlobClient BlobContainerClient::GetBlobClient(const std::string& blobName) const
Expand Down
Loading

0 comments on commit 20e7243

Please sign in to comment.