Skip to content

Commit

Permalink
add storage retry policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Jinming-Hu committed May 25, 2024
1 parent edfca94 commit af5a61f
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 40 deletions.
76 changes: 36 additions & 40 deletions sdk/storage/azure-storage-blobs/src/blob_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#include <azure/storage/common/internal/reliable_stream.hpp>
#include <azure/storage/common/internal/shared_key_policy.hpp>
#include <azure/storage/common/internal/storage_bearer_token_auth.hpp>
#include <azure/storage/common/internal/storage_per_retry_policy.hpp>
#include <azure/storage/common/internal/storage_service_version_policy.hpp>
#include <azure/storage/common/internal/storage_pipeline.hpp>
#include <azure/storage/common/internal/storage_switch_to_secondary_policy.hpp>
#include <azure/storage/common/storage_common.hpp>
#include <azure/storage/common/storage_exception.hpp>
Expand Down Expand Up @@ -58,23 +57,20 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobClient(blobUrl, options)
{
BlobClientOptions newOptions = options;
newOptions.PerRetryPolicies.emplace_back(
std::make_unique<_internal::SharedKeyPolicy>(credential));

std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), newOptions.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(newOptions.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
newOptions,
_internal::StorageHttpPipelineOptions serviceOptions;
serviceOptions.SharedKeyCredential = credential;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlobClient::BlobClient(
Expand All @@ -83,48 +79,48 @@ namespace Azure { namespace Storage { namespace Blobs {
const BlobClientOptions& options)
: BlobClient(blobUrl, options)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}
{
Azure::Core::Credentials::TokenRequestContext tokenContext;
tokenContext.Scopes.emplace_back(
options.Audience.HasValue()
? _internal::GetDefaultScopeForAudience(options.Audience.Value().ToString())
: _internal::StorageScope);
perRetryPolicies.emplace_back(
serviceOptions.ServicePerRetryPolicies.emplace_back(
std::make_unique<_internal::StorageBearerTokenAuthenticationPolicy>(
credential, tokenContext, options.EnableTenantDiscovery));
}
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlobClient::BlobClient(const std::string& blobUrl, const BlobClientOptions& options)
: m_blobUrl(blobUrl), m_customerProvidedKey(options.CustomerProvidedKey),
m_encryptionScope(options.EncryptionScope)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perRetryPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> perOperationPolicies;
perRetryPolicies.emplace_back(std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
m_blobUrl.GetHost(), options.SecondaryHostForRetryReads));
perRetryPolicies.emplace_back(std::make_unique<_internal::StoragePerRetryPolicy>());
perOperationPolicies.emplace_back(
std::make_unique<_internal::StorageServiceVersionPolicy>(options.ApiVersion));
m_pipeline = std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(
options,
_internal::StorageHttpPipelineOptions serviceOptions;
if (!options.SecondaryHostForRetryReads.empty())
{
serviceOptions.PrimaryHost = m_blobUrl.GetHost();
serviceOptions.SecondaryHostForRetryReads = options.SecondaryHostForRetryReads;
}

m_pipeline = _internal::BuildStorageHttpPipeline(
options.ApiVersion,
_internal::BlobServicePackageName,
_detail::PackageVersion::ToString(),
std::move(perRetryPolicies),
std::move(perOperationPolicies));
serviceOptions,
options);
}

BlockBlobClient BlobClient::AsBlockBlobClient() const { return BlockBlobClient(*this); }
Expand Down
4 changes: 4 additions & 0 deletions sdk/storage/azure-storage-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ set(
inc/azure/storage/common/internal/storage_bearer_token_auth.hpp
inc/azure/storage/common/internal/storage_bearer_token_authentication_policy.hpp
inc/azure/storage/common/internal/storage_per_retry_policy.hpp
inc/azure/storage/common/internal/storage_pipeline.hpp
inc/azure/storage/common/internal/storage_retry_policy.hpp
inc/azure/storage/common/internal/storage_service_version_policy.hpp
inc/azure/storage/common/internal/storage_switch_to_secondary_policy.hpp
inc/azure/storage/common/internal/xml_wrapper.hpp
Expand All @@ -76,6 +78,8 @@ set(
src/storage_credential.cpp
src/storage_exception.cpp
src/storage_per_retry_policy.cpp
src/storage_pipeline.cpp
src/storage_retry_policy.cpp
src/storage_switch_to_secondary_policy.cpp
src/xml_wrapper.cpp
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#pragma once

#include "../storage_credential.hpp"

#include <azure/core/internal/http/pipeline.hpp>

namespace Azure { namespace Storage { namespace _internal {

struct StorageHttpPipelineOptions
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> ServicePerCallPolicies;
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> ServicePerRetryPolicies;
std::shared_ptr<StorageSharedKeyCredential> SharedKeyCredential;
Azure::Nullable<std::string> PrimaryHost;
Azure::Nullable<std::string> SecondaryHostForRetryReads;
};

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> BuildStorageHttpPipeline(
const std::string& apiVersion,
const std::string& telemetryPackageName,
const std::string& telemetryPackageVersion,
StorageHttpPipelineOptions& serviceOptions,
const Azure::Core::_internal::ClientOptions& clientOptions);

}}} // namespace Azure::Storage::_internal
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#pragma once

#include <azure/core/http/policies/policy.hpp>

namespace Azure { namespace Storage { namespace _internal {

class StorageRetryPolicy final : public Core::Http::Policies::_internal::RetryPolicy {
public:
explicit StorageRetryPolicy(Core::Http::Policies::RetryOptions options)
: RetryPolicy(std::move(options))
{
}
~StorageRetryPolicy() override {}

protected:
bool ShouldRetry(
const std::unique_ptr<Core::Http::RawResponse>& response,
const Core::Http::Policies::RetryOptions& retryOptions) const override;
};

}}} // namespace Azure::Storage::_internal
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#pragma once

#include "constants.hpp"

#include <azure/core/http/policies/policy.hpp>

#include <memory>
Expand Down
93 changes: 93 additions & 0 deletions sdk/storage/azure-storage-common/src/storage_pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#include "azure/storage/common/internal/storage_pipeline.hpp"

#include "azure/storage/common/internal/shared_key_policy.hpp"
#include "azure/storage/common/internal/storage_per_retry_policy.hpp"
#include "azure/storage/common/internal/storage_retry_policy.hpp"
#include "azure/storage/common/internal/storage_service_version_policy.hpp"
#include "azure/storage/common/internal/storage_switch_to_secondary_policy.hpp"

namespace Azure { namespace Storage { namespace _internal {

std::shared_ptr<Azure::Core::Http::_internal::HttpPipeline> BuildStorageHttpPipeline(
const std::string& apiVersion,
const std::string& telemetryPackageName,
const std::string& telemetryPackageVersion,
StorageHttpPipelineOptions& serviceOptions,
const Azure::Core::_internal::ClientOptions& clientOptions)
{
std::vector<std::unique_ptr<Azure::Core::Http::Policies::HttpPolicy>> policies;

// service-specific per call policies
for (auto& policy : serviceOptions.ServicePerCallPolicies)
{
policies.push_back(std::move(policy));
}
policies.push_back(std::make_unique<_internal::StorageServiceVersionPolicy>(apiVersion));

// Request Id
policies.push_back(std::make_unique<Azure::Core::Http::Policies::_internal::RequestIdPolicy>());

// Telemetry (user-agent header)
policies.push_back(std::make_unique<Azure::Core::Http::Policies::_internal::TelemetryPolicy>(
telemetryPackageName, telemetryPackageVersion, clientOptions.Telemetry));

// client-options per call policies.
for (auto& policy : clientOptions.PerOperationPolicies)
{
policies.push_back(policy->Clone());
}

// Retry policy
policies.push_back(std::make_unique<StorageRetryPolicy>(clientOptions.Retry));

if (serviceOptions.SecondaryHostForRetryReads.HasValue()
&& !serviceOptions.SecondaryHostForRetryReads.Value().empty())
{
auto policy = std::make_unique<_internal::StorageSwitchToSecondaryPolicy>(
serviceOptions.PrimaryHost.Value(), serviceOptions.SecondaryHostForRetryReads.Value());
policies.push_back(std::move(policy));
}

// service-specific per retry policies.
for (auto& policy : serviceOptions.ServicePerRetryPolicies)
{
policies.push_back(std::move(policy));
}
policies.push_back(std::make_unique<StoragePerRetryPolicy>());

// client options per retry policies.
for (auto& policy : clientOptions.PerRetryPolicies)
{
policies.push_back(policy->Clone());
}

if (serviceOptions.SharedKeyCredential)
{
auto policy = std::make_unique<SharedKeyPolicy>(serviceOptions.SharedKeyCredential);
policies.push_back(std::move(policy));
}

// Policies after here cannot modify the request anymore

// Add a request activity policy which will generate distributed traces for the pipeline.
Azure::Core::Http::_internal::HttpSanitizer httpSanitizer(
clientOptions.Log.AllowedHttpQueryParameters, clientOptions.Log.AllowedHttpHeaders);
policies.push_back(
std::make_unique<Azure::Core::Http::Policies::_internal::RequestActivityPolicy>(
httpSanitizer));

// logging - won't update request
policies.push_back(
std::make_unique<Azure::Core::Http::Policies::_internal::LogPolicy>(clientOptions.Log));

// transport
policies.push_back(std::make_unique<Azure::Core::Http::Policies::_internal::TransportPolicy>(
clientOptions.Transport));

return std::make_shared<Azure::Core::Http::_internal::HttpPipeline>(policies);
}

}}} // namespace Azure::Storage::_internal
44 changes: 44 additions & 0 deletions sdk/storage/azure-storage-common/src/storage_retry_policy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#include "azure/storage/common/internal/storage_retry_policy.hpp"

#include "azure/core/internal/diagnostics/log.hpp"

namespace Azure { namespace Storage { namespace _internal {

bool StorageRetryPolicy::ShouldRetry(
const std::unique_ptr<Core::Http::RawResponse>& response,
const Core::Http::Policies::RetryOptions& retryOptions) const
{
if (static_cast<std::underlying_type_t<Core::Http::HttpStatusCode>>(response->GetStatusCode())
>= 400)
{
const auto& headers = response->GetHeaders();
auto ite = headers.find("x-ms-copy-source-status-code");
if (ite != headers.end())
{
const auto innerStatusCodeInt = std::stoi(ite->second);
const auto innerStatusCode = static_cast<Core::Http::HttpStatusCode>(innerStatusCodeInt);

const bool shouldRetry
= retryOptions.StatusCodes.find(innerStatusCode) != retryOptions.StatusCodes.end();

if (Azure::Core::Diagnostics::_internal::Log::ShouldWrite(
Azure::Core::Diagnostics::Logger::Level::Informational))
{
Azure::Core::Diagnostics::_internal::Log::Write(
Azure::Core::Diagnostics::Logger::Level::Informational,
std::string("x-ms-copy-source-status-code ") + std::to_string(innerStatusCodeInt)
+ (shouldRetry ? " will be retried" : " won't be retried"));
}
if (shouldRetry)
{
return true;
}
}
}
return false;
}

}}} // namespace Azure::Storage::_internal

0 comments on commit af5a61f

Please sign in to comment.