diff --git a/Changelog.txt b/Changelog.txt index bf12f3d3..1ec81e8b 100644 --- a/Changelog.txt +++ b/Changelog.txt @@ -1,6 +1,10 @@ Azure Storage Client Library for C++ History of Changes +Changes in v2.6: +- Supported parallel download for blobs and files +- Supported installation from Vcpkg + Changes in v2.5: - Upgraded Casablanca dependency to 2.9.1 - Default Rest API version is 2015-12-11 diff --git a/Doxyfile b/Doxyfile index bb1de401..0501ed8b 100644 --- a/Doxyfile +++ b/Doxyfile @@ -38,7 +38,7 @@ PROJECT_NAME = "Microsoft Azure Storage Client Library for C++" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 2.5.0 +PROJECT_NUMBER = 2.6.0 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/Microsoft.WindowsAzure.Storage/CMakeLists.txt b/Microsoft.WindowsAzure.Storage/CMakeLists.txt index f037c7fc..258a0387 100644 --- a/Microsoft.WindowsAzure.Storage/CMakeLists.txt +++ b/Microsoft.WindowsAzure.Storage/CMakeLists.txt @@ -115,7 +115,7 @@ set(AZURESTORAGE_LIBRARIES ${AZURESTORAGE_LIBRARY} ${CASABLANCA_LIBRARIES} ${Boo # Set version numbers centralized set (AZURESTORAGE_VERSION_MAJOR 2) -set (AZURESTORAGE_VERSION_MINOR 5) +set (AZURESTORAGE_VERSION_MINOR 6) set (AZURESTORAGE_VERSION_REVISION 0) # Add sources per configuration diff --git a/Microsoft.WindowsAzure.Storage/includes/was/blob.h b/Microsoft.WindowsAzure.Storage/includes/was/blob.h index a5aaded0..8a21b390 100644 --- a/Microsoft.WindowsAzure.Storage/includes/was/blob.h +++ b/Microsoft.WindowsAzure.Storage/includes/was/blob.h @@ -1680,20 +1680,20 @@ namespace azure { namespace storage { } /// - /// Gets the number of blocks or pages that may be simultaneously uploaded when uploading a blob that is greater than + /// Gets the number of blocks or pages that may be simultaneously uploaded or downloaded when uploading or downloading a blob that is greater than /// the value specified by the property in size. /// - /// The number of parallel block or page upload operations that may proceed. + /// The number of parallel block or page upload or download operations that may proceed. int parallelism_factor() const { return m_parallelism_factor; } /// - /// Sets the number of blocks or pages that may be simultaneously uploaded when uploading a blob that is greater than + /// Sets the number of blocks or pages that may be simultaneously uploaded or downloaded when uploading or downloading a blob that is greater than /// the value specified by the property in size. /// - /// The number of parallel block or page upload operations that may proceed. + /// The number of parallel block or page upload or download operations that may proceed. void set_parallelism_factor(int value) { utility::assert_in_bounds(_XPLATSTR("value"), value, 0); @@ -5074,6 +5074,7 @@ namespace azure { namespace storage { void init(utility::string_t snapshot_time, storage_credentials credentials); WASTORAGE_API pplx::task exists_async(bool primary_only, const blob_request_options& options, operation_context context); + WASTORAGE_API pplx::task download_single_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const access_condition& condition, const blob_request_options& options, operation_context context, bool update_properties = false); utility::string_t m_name; utility::string_t m_snapshot_time; diff --git a/Microsoft.WindowsAzure.Storage/includes/was/file.h b/Microsoft.WindowsAzure.Storage/includes/was/file.h index 602005fd..2cbcc83d 100644 --- a/Microsoft.WindowsAzure.Storage/includes/was/file.h +++ b/Microsoft.WindowsAzure.Storage/includes/was/file.h @@ -527,20 +527,20 @@ namespace azure { namespace storage { } /// - /// Gets the number of blocks or pages that may be simultaneously uploaded when uploading a blob that is greater than + /// Gets the number of ranges that may be simultaneously uploaded or downloaded when uploading or downloading a file that is greater than /// the value specified by the property in size. /// - /// The number of parallel block or page upload operations that may proceed. + /// The number of parallel range upload or download operations that may proceed. int parallelism_factor() const { return m_parallelism_factor; } /// - /// Sets the number of ranges that may be simultaneously uploaded when uploading a file that is greater than + /// Sets the number of ranges that may be simultaneously uploaded or downloaded when uploading or downloading a file that is greater than /// the value specified by the property in size. /// - /// The number of parallel range upload operations that may proceed. + /// The number of parallel range upload or download operations that may proceed. void set_parallelism_factor(int value) { m_parallelism_factor = value; @@ -2211,6 +2211,15 @@ namespace azure { namespace storage { return m_length; } + /// + /// Gets the size of the file, in bytes. + /// + /// The file's size in bytes. + utility::size64_t size() const + { + return m_length; + } + /// /// Gets the file's ETag value. /// @@ -3913,6 +3922,7 @@ namespace azure { namespace storage { void init(storage_credentials credentials); WASTORAGE_API pplx::task exists_async(bool primary_only, const file_access_condition& condition, const file_request_options& options, operation_context context) const; + WASTORAGE_API pplx::task download_single_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const file_access_condition& condition, const file_request_options& options, operation_context context, bool update_properties = false, bool validate_last_modify = false) const; utility::string_t m_name; cloud_file_directory m_directory; diff --git a/Microsoft.WindowsAzure.Storage/includes/wascore/constants.dat b/Microsoft.WindowsAzure.Storage/includes/wascore/constants.dat index 8776f012..75229155 100644 --- a/Microsoft.WindowsAzure.Storage/includes/wascore/constants.dat +++ b/Microsoft.WindowsAzure.Storage/includes/wascore/constants.dat @@ -309,19 +309,19 @@ DAT(xml_share, _XPLATSTR("Share")) DAT(xml_shares, _XPLATSTR("Shares")) #define STR(x) #x -#define VER(x) _XPLATSTR("Azure-Storage/2.4.0 (Native; Windows; MSC_VER " STR(x) ")") +#define VER(x) _XPLATSTR("Azure-Storage/2.6.0 (Native; Windows; MSC_VER " STR(x) ")") #if defined(_WIN32) #if defined(_MSC_VER) #if _MSC_VER == 1800 - DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.4.0 (Native; Windows; MSC_VER 1800 )")) + DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.6.0 (Native; Windows; MSC_VER 1800 )")) #else DAT(header_value_user_agent, VER(_MSC_VER)) #endif #else - DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.4.0 (Native; Windows)")) + DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.6.0 (Native; Windows)")) #endif #else - DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.4.0 (Native)")) + DAT(header_value_user_agent, _XPLATSTR("Azure-Storage/2.6.0 (Native)")) #endif #endif // _CONSTANTS diff --git a/Microsoft.WindowsAzure.Storage/includes/wascore/constants.h b/Microsoft.WindowsAzure.Storage/includes/wascore/constants.h index 4605a5b8..9eaa1c05 100644 --- a/Microsoft.WindowsAzure.Storage/includes/wascore/constants.h +++ b/Microsoft.WindowsAzure.Storage/includes/wascore/constants.h @@ -25,8 +25,11 @@ namespace azure { namespace storage { namespace protocol { // size constants const size_t max_block_size = 4 * 1024 * 1024; + const size_t single_block_size = 4 * 1024 * 1024; const size_t default_buffer_size = 64 * 1024; const utility::size64_t default_single_blob_upload_threshold = 32 * 1024 * 1024; + const utility::size64_t default_single_blob_download_threshold = 32 * 1024 * 1024; + const utility::size64_t default_single_block_download_threshold = 4 * 1024 * 1024; // duration constants const std::chrono::seconds default_retry_interval(3); diff --git a/Microsoft.WindowsAzure.Storage/includes/wascore/protocol.h b/Microsoft.WindowsAzure.Storage/includes/wascore/protocol.h index 21a1f997..f08382ed 100644 --- a/Microsoft.WindowsAzure.Storage/includes/wascore/protocol.h +++ b/Microsoft.WindowsAzure.Storage/includes/wascore/protocol.h @@ -216,6 +216,8 @@ namespace azure { namespace storage { namespace protocol { class file_response_parsers { public: + static utility::size64_t parse_file_size(const web::http::http_response& response); + static cloud_file_share_properties parse_file_share_properties(const web::http::http_response& response); static cloud_file_directory_properties parse_file_directory_properties(const web::http::http_response& response); static cloud_file_properties parse_file_properties(const web::http::http_response& response); diff --git a/Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp b/Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp index 5a8b92a0..71aadb7b 100644 --- a/Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp +++ b/Microsoft.WindowsAzure.Storage/src/cloud_blob.cpp @@ -16,12 +16,16 @@ // ----------------------------------------------------------------------------------------- #include "stdafx.h" + +#include + #include "was/blob.h" #include "was/error_code_strings.h" #include "wascore/protocol.h" #include "wascore/resources.h" #include "wascore/blobstreams.h" #include "wascore/util.h" +#include "wascore/async_semaphore.h" namespace azure { namespace storage { @@ -386,7 +390,7 @@ namespace azure { namespace storage { concurrency::streams::ostream::pos_type m_target_offset; }; - pplx::task cloud_blob::download_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const access_condition& condition, const blob_request_options& options, operation_context context) + pplx::task cloud_blob::download_single_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const access_condition& condition, const blob_request_options& options, operation_context context, bool update_properties) { blob_request_options modified_options(options); modified_options.apply_defaults(service_client().default_request_options(), blob_type::unspecified); @@ -401,7 +405,7 @@ namespace azure { namespace storage { download_info->m_total_written_to_destination_stream = 0; download_info->m_response_length = std::numeric_limits::max(); download_info->m_reset_target = false; - download_info->m_target_offset = target.can_seek() ? target.tell() : (Concurrency::streams::basic_ostream::pos_type)0; + download_info->m_target_offset = target.can_seek() ? target.tell() : static_cast::pos_type>(0); std::shared_ptr> command = std::make_shared>(uri()); std::weak_ptr> weak_command(command); @@ -476,7 +480,7 @@ namespace azure { namespace storage { return true; }); - command->set_preprocess_response([weak_command, offset, modified_options, properties, metadata, copy_state, download_info](const web::http::http_response& response, const request_result& result, operation_context context) + command->set_preprocess_response([weak_command, offset, modified_options, properties, metadata, copy_state, download_info, update_properties](const web::http::http_response& response, const request_result& result, operation_context context) { std::shared_ptr> command(weak_command); @@ -499,9 +503,12 @@ namespace azure { namespace storage { if (!download_info->m_are_properties_populated) { - properties->update_all(protocol::blob_response_parsers::parse_blob_properties(response), offset != std::numeric_limits::max()); - *metadata = protocol::parse_metadata(response); - *copy_state = protocol::response_parsers::parse_copy_state(response); + if (update_properties == true) + { + properties->update_all(protocol::blob_response_parsers::parse_blob_properties(response), offset != std::numeric_limits::max()); + *metadata = protocol::parse_metadata(response); + *copy_state = protocol::response_parsers::parse_copy_state(response); + } download_info->m_response_length = result.content_length(); download_info->m_response_md5 = result.content_md5(); @@ -515,7 +522,6 @@ namespace azure { namespace storage { // early before the retry policy has the opportunity to change the storage location. command->set_location_mode(core::command_location_mode::primary_or_secondary, result.target_location()); - download_info->m_locked_etag = properties->etag(); download_info->m_are_properties_populated = true; } }); @@ -540,6 +546,180 @@ namespace azure { namespace storage { return core::executor::execute_async(command, modified_options, context); } + pplx::task cloud_blob::download_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const access_condition& condition, const blob_request_options& options, operation_context context) + { + if (options.parallelism_factor() > 1) + { + auto instance = std::make_shared(*this); + // if download a whole blob, enable download strategy(download 32MB first). + utility::size64_t single_blob_download_threshold(protocol::default_single_blob_download_threshold); + // If tranactional md5 validation is set, first range should be 4MB. + if (options.use_transactional_md5()) + { + single_blob_download_threshold = protocol::default_single_block_download_threshold; + } + + // download first range. + // if 416 thrown, it's an empty blob. need to download attributes. + // otherwise, properties must be updated for further parallel download. + return instance->download_single_range_to_stream_async(target, 0, single_blob_download_threshold, condition, options, context, true).then([=](pplx::task download_task) + { + try + { + download_task.wait(); + } + catch (storage_exception &e) + { + // For empty blob, swallow the exception and update the attributes. + if (e.result().http_status_code() == web::http::status_codes::RangeNotSatisfiable + && offset >= std::numeric_limits::max()) + { + return instance->download_attributes_async(condition, options, context); + } + else + { + throw; + } + } + + if ((offset >= std::numeric_limits::max() && instance->properties().size() <= single_blob_download_threshold) + || (offset < std::numeric_limits::max() && length <= single_blob_download_threshold)) + { + return pplx::task_from_result(); + } + + // download the rest data in parallel. + utility::size64_t target_offset; + utility::size64_t target_length; + + if (offset >= std::numeric_limits::max()) + { + target_offset = single_blob_download_threshold; + target_length = instance->properties().size() - single_blob_download_threshold; + } + else + { + target_offset = offset + single_blob_download_threshold; + target_length = length - single_blob_download_threshold; + } + + access_condition modified_condition(condition); + if (condition.if_match_etag().empty()) + { + modified_condition.set_if_match_etag(instance->properties().etag()); + } + + return pplx::task_from_result().then([instance, target, target_offset, target_length, single_blob_download_threshold, modified_condition, options, context]() + { + auto semaphore = std::make_shared(options.parallelism_factor()); + // lock to the target ostream + pplx::extensibility::reader_writer_lock_t mutex; + + // limit the number of parallel writer(maximum number is options.parallelism_factor()) to write to target stream. prevent OOM. + pplx::details::atomic_long writer(0); + + auto smallest_offset = std::make_shared(target_offset); + auto condition_variable = std::make_shared(); + std::mutex condition_variable_mutex; + for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::single_block_size) + { + utility::size64_t current_length = protocol::single_block_size; + if (current_offset + current_length > target_offset + target_length) + { + current_length = target_offset + target_length - current_offset; + } + semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, target, smallest_offset, current_offset, current_length, modified_condition, options, context]() + { + concurrency::streams::container_buffer> buffer; + auto segment_ostream = buffer.create_ostream(); + // if trasaction MD5 is enabled, it will be checked inside each download_single_range_to_stream_async. + instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, modified_condition, options, context).then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task download_task) + { + segment_ostream.close().then([download_task](pplx::task close_task) + { + download_task.wait(); + close_task.wait(); + }).wait(); + + // status of current semaphore. + bool released = false; + // target stream is seekable, could write to target stream once the download finished. + if (target.can_seek()) + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + target.streambuf().seekpos(current_offset, std::ios_base::out); + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + released = true; + semaphore->unlock(); + } + else + { + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + if (*smallest_offset == current_offset) + { + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + condition_variable->notify_all(); + released = true; + semaphore->unlock(); + } + } + if (!released) + { + pplx::details::atomic_increment(writer); + if (writer < options.parallelism_factor()) + { + released = true; + semaphore->unlock(); + } + std::unique_lock locker(condition_variable_mutex); + condition_variable->wait(locker, [smallest_offset, current_offset, &mutex]() + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + return *smallest_offset == current_offset; + }); + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + + if (*smallest_offset == current_offset) + { + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + } + else if (*smallest_offset > current_offset) + { + throw std::runtime_error("Out of order in parallel downloading blob."); + } + } + condition_variable->notify_all(); + pplx::details::atomic_decrement(writer); + if (!released) + { + semaphore->unlock(); + } + } + } + }); + }); + } + semaphore->wait_all_async().wait(); + std::unique_lock locker(condition_variable_mutex); + condition_variable->wait(locker, [smallest_offset, &mutex, target_offset, target_length]() + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + return *smallest_offset >= target_offset + target_length; + }); + }); + }); + } + else + { + return download_single_range_to_stream_async(target, offset, length, condition, options, context, true); + } + } + pplx::task cloud_blob::download_to_file_async(const utility::string_t &path, const access_condition& condition, const blob_request_options& options, operation_context context) { auto instance = std::make_shared(*this); diff --git a/Microsoft.WindowsAzure.Storage/src/cloud_file.cpp b/Microsoft.WindowsAzure.Storage/src/cloud_file.cpp index a7f5d970..7a447a53 100644 --- a/Microsoft.WindowsAzure.Storage/src/cloud_file.cpp +++ b/Microsoft.WindowsAzure.Storage/src/cloud_file.cpp @@ -16,6 +16,9 @@ // ----------------------------------------------------------------------------------------- #include "stdafx.h" + +#include + #include "was/file.h" #include "was/error_code_strings.h" #include "wascore/protocol.h" @@ -407,9 +410,9 @@ namespace azure { namespace storage { concurrency::streams::ostream::pos_type m_target_offset; }; - pplx::task cloud_file::download_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t start_offset, utility::size64_t length, const file_access_condition& access_condition, const file_request_options& options, operation_context context) const + pplx::task cloud_file::download_single_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const file_access_condition& condition, const file_request_options& options, operation_context context, bool update_properties, bool validate_last_modify) const { - UNREFERENCED_PARAMETER(access_condition); + UNREFERENCED_PARAMETER(condition); file_request_options modified_options(options); modified_options.apply_defaults(service_client().default_request_options()); @@ -426,13 +429,13 @@ namespace azure { namespace storage { std::shared_ptr> command = std::make_shared>(uri()); std::weak_ptr> weak_command(command); - command->set_build_request([start_offset, length, modified_options, download_info](web::http::uri_builder uri_builder, const std::chrono::seconds& timeout, operation_context context) -> web::http::http_request + command->set_build_request([offset, length, modified_options, download_info](web::http::uri_builder uri_builder, const std::chrono::seconds& timeout, operation_context context) -> web::http::http_request { - utility::size64_t current_offset = start_offset; + utility::size64_t current_offset = offset; utility::size64_t current_length = length; if (download_info->m_total_written_to_destination_stream > 0) { - if (start_offset == std::numeric_limits::max()) + if (offset == std::numeric_limits::max()) { current_offset = 0; } @@ -482,7 +485,7 @@ namespace azure { namespace storage { return true; }); - command->set_preprocess_response([weak_command, start_offset, modified_options, properties, metadata, copy_state, download_info](const web::http::http_response& response, const request_result& result, operation_context context) + command->set_preprocess_response([weak_command, offset, modified_options, properties, metadata, copy_state, download_info, update_properties, validate_last_modify](const web::http::http_response& response, const request_result& result, operation_context context) { std::shared_ptr> command(weak_command); @@ -505,9 +508,18 @@ namespace azure { namespace storage { if (!download_info->m_are_properties_populated) { - *properties = protocol::file_response_parsers::parse_file_properties(response); - *metadata = protocol::parse_metadata(response); - *copy_state = protocol::response_parsers::parse_copy_state(response); + if (validate_last_modify == true + && properties->last_modified() != protocol::file_response_parsers::parse_file_properties(response).last_modified()) + { + throw std::runtime_error("File is modified during downloading."); + } + + if (update_properties == true) + { + *properties = protocol::file_response_parsers::parse_file_properties(response); + *metadata = protocol::parse_metadata(response); + *copy_state = protocol::response_parsers::parse_copy_state(response); + } download_info->m_response_length = result.content_length(); download_info->m_response_md5 = result.content_md5(); @@ -515,7 +527,7 @@ namespace azure { namespace storage { if (modified_options.use_transactional_md5() && !modified_options.disable_content_md5_validation() && download_info->m_response_md5.empty() // If range is not set and the file has no MD5 hash, no content md5 will not be returned. // Consider the file has no MD5 hash in default. - && start_offset < std::numeric_limits::max()) + && offset < std::numeric_limits::max()) { throw storage_exception(protocol::error_missing_md5); } @@ -549,6 +561,174 @@ namespace azure { namespace storage { return core::executor::execute_async(command, modified_options, context); } + pplx::task cloud_file::download_range_to_stream_async(concurrency::streams::ostream target, utility::size64_t offset, utility::size64_t length, const file_access_condition& condition, const file_request_options& options, operation_context context) const + { + if (options.parallelism_factor() > 1) + { + auto instance = std::make_shared(*this); + // if download a whole blob, enable download strategy(download 32MB first). + utility::size64_t single_file_download_threshold(protocol::default_single_blob_download_threshold); + // If tranactional md5 validation is set, first range should be 4MB. + if (options.use_transactional_md5()) + { + single_file_download_threshold = protocol::default_single_block_download_threshold; + } + + // download first range. + // if 416 thrown, it's an empty blob. need to download attributes. + // otherwise, properties must be updated for further parallel download. + return instance->download_single_range_to_stream_async(target, 0, single_file_download_threshold, condition, options, context, true).then([=](pplx::task download_task) + { + try + { + download_task.wait(); + } + catch (storage_exception &e) + { + // For empty blob, swallow the exception and update the attributes. + if (e.result().http_status_code() == web::http::status_codes::RangeNotSatisfiable + && offset >= std::numeric_limits::max()) + { + return instance->download_attributes_async(condition, options, context); + } + else + { + throw; + } + } + + if ((offset >= std::numeric_limits::max() && instance->properties().size() <= single_file_download_threshold) + || (offset < std::numeric_limits::max() && length <= single_file_download_threshold)) + { + return pplx::task_from_result(); + } + + // download the rest data in parallel. + utility::size64_t target_offset; + utility::size64_t target_length; + + if (offset >= std::numeric_limits::max()) + { + target_offset = single_file_download_threshold; + target_length = instance->properties().size() - single_file_download_threshold; + } + else + { + target_offset = offset + single_file_download_threshold; + target_length = length - single_file_download_threshold; + } + + return pplx::task_from_result().then([instance, target, target_offset, target_length, single_file_download_threshold, condition, options, context]() + { + auto semaphore = std::make_shared(options.parallelism_factor()); + // lock to the target ostream + pplx::extensibility::reader_writer_lock_t mutex; + + // limit the number of parallel writer(maximum number is options.parallelism_factor()) to write to target stream. prevent OOM. + pplx::details::atomic_long writer(0); + + auto smallest_offset = std::make_shared(target_offset); + auto condition_variable = std::make_shared(); + std::mutex condition_variable_mutex; + for (utility::size64_t current_offset = target_offset; current_offset < target_offset + target_length; current_offset += protocol::single_block_size) + { + utility::size64_t current_length = protocol::single_block_size; + if (current_offset + current_length > target_offset + target_length) + { + current_length = target_offset + target_length - current_offset; + } + semaphore->lock_async().then([instance, &mutex, semaphore, condition_variable, &condition_variable_mutex, &writer, target, smallest_offset, current_offset, current_length, condition, options, context]() + { + concurrency::streams::container_buffer> buffer; + auto segment_ostream = buffer.create_ostream(); + // if trasaction MD5 is enabled, it will be checked inside each download_single_range_to_stream_async. + instance->download_single_range_to_stream_async(segment_ostream, current_offset, current_length, condition, options, context, false, true).then([buffer, segment_ostream, semaphore, condition_variable, &condition_variable_mutex, smallest_offset, current_offset, current_length, &mutex, target, &writer, options](pplx::task download_task) + { + segment_ostream.close().then([download_task](pplx::task close_task) + { + download_task.wait(); + close_task.wait(); + }).wait(); + + // status of current semaphore. + bool released = false; + // target stream is seekable, could write to target stream once the download finished. + if (target.can_seek()) + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + target.streambuf().seekpos(current_offset, std::ios_base::out); + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + released = true; + semaphore->unlock(); + } + else + { + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + if (*smallest_offset == current_offset) + { + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + condition_variable->notify_all(); + released = true; + semaphore->unlock(); + } + } + if (!released) + { + pplx::details::atomic_increment(writer); + if (writer < options.parallelism_factor()) + { + released = true; + semaphore->unlock(); + } + std::unique_lock locker(condition_variable_mutex); + condition_variable->wait(locker, [smallest_offset, current_offset, &mutex]() + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + return *smallest_offset == current_offset; + }); + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + + if (*smallest_offset == current_offset) + { + target.streambuf().putn_nocopy(buffer.collection().data(), buffer.collection().size()).wait(); + *smallest_offset += protocol::single_block_size; + } + else if (*smallest_offset > current_offset) + { + throw std::runtime_error("Out of order in parallel downloading blob."); + } + } + condition_variable->notify_all(); + pplx::details::atomic_decrement(writer); + if (!released) + { + semaphore->unlock(); + } + } + } + }); + }); + } + semaphore->wait_all_async().wait(); + std::unique_lock locker(condition_variable_mutex); + condition_variable->wait(locker, [smallest_offset, &mutex, target_offset, target_length]() + { + pplx::extensibility::scoped_rw_lock_t guard(mutex); + return *smallest_offset >= target_offset + target_length; + }); + }); + }); + } + else + { + return download_single_range_to_stream_async(target, offset, length, condition, options, context, true); + } + } + pplx::task cloud_file::download_to_file_async(const utility::string_t &path, const file_access_condition& access_condition, const file_request_options& options, operation_context context) const { auto instance = std::make_shared(*this); diff --git a/Microsoft.WindowsAzure.Storage/src/file_response_parsers.cpp b/Microsoft.WindowsAzure.Storage/src/file_response_parsers.cpp index 9c65838e..e0ec5d64 100644 --- a/Microsoft.WindowsAzure.Storage/src/file_response_parsers.cpp +++ b/Microsoft.WindowsAzure.Storage/src/file_response_parsers.cpp @@ -37,12 +37,27 @@ namespace azure { namespace storage { namespace protocol { return properties; } + utility::size64_t file_response_parsers::parse_file_size(const web::http::http_response& response) + { + auto& headers = response.headers(); + utility::string_t value; + + if (headers.match(web::http::header_names::content_range, value)) + { + auto slash = value.find(_XPLATSTR('/')); + value = value.substr(slash + 1); + return utility::conversions::scan_string(value); + } + + return headers.content_length(); + } + cloud_file_properties file_response_parsers::parse_file_properties(const web::http::http_response& response) { cloud_file_properties properties; properties.m_etag = parse_etag(response); properties.m_last_modified = parse_last_modified(response); - properties.m_length = response.headers().content_length(); + properties.m_length = parse_file_size(response); auto& headers = response.headers(); properties.m_cache_control = get_header_value(headers, web::http::header_names::cache_control); diff --git a/Microsoft.WindowsAzure.Storage/tests/blob_test_base.cpp b/Microsoft.WindowsAzure.Storage/tests/blob_test_base.cpp index 4a6bb536..b68e7463 100644 --- a/Microsoft.WindowsAzure.Storage/tests/blob_test_base.cpp +++ b/Microsoft.WindowsAzure.Storage/tests/blob_test_base.cpp @@ -52,7 +52,7 @@ utility::string_t blob_service_test_base::get_random_container_name(size_t lengt return utility::conversions::print_string(utility::datetime::utc_now().to_interval()) + name; } -void blob_service_test_base::check_parallelism(const azure::storage::operation_context& context, int expected_parallelism) +void test_base::check_parallelism(const azure::storage::operation_context& context, int expected_parallelism) { typedef std::pair request; diff --git a/Microsoft.WindowsAzure.Storage/tests/blob_test_base.h b/Microsoft.WindowsAzure.Storage/tests/blob_test_base.h index d5748538..ea92ee37 100644 --- a/Microsoft.WindowsAzure.Storage/tests/blob_test_base.h +++ b/Microsoft.WindowsAzure.Storage/tests/blob_test_base.h @@ -46,7 +46,6 @@ class blob_service_test_base : public test_base static utility::string_t fill_buffer_and_get_md5(std::vector& buffer); static utility::string_t fill_buffer_and_get_md5(std::vector& buffer, size_t offset, size_t count); static utility::string_t get_random_container_name(size_t length = 10); - static void check_parallelism(const azure::storage::operation_context& context, int expected_parallelism); static void check_blob_equal(const azure::storage::cloud_blob& expected, const azure::storage::cloud_blob& actual); static void check_blob_copy_state_equal(const azure::storage::copy_state& expected, const azure::storage::copy_state& actual); static void check_blob_properties_equal(const azure::storage::cloud_blob_properties& expected, const azure::storage::cloud_blob_properties& actual); diff --git a/Microsoft.WindowsAzure.Storage/tests/cloud_blob_test.cpp b/Microsoft.WindowsAzure.Storage/tests/cloud_blob_test.cpp index bd8a9337..e87f10e7 100644 --- a/Microsoft.WindowsAzure.Storage/tests/cloud_blob_test.cpp +++ b/Microsoft.WindowsAzure.Storage/tests/cloud_blob_test.cpp @@ -537,7 +537,7 @@ SUITE(Blob) CHECK(m_blob.snapshot_qualified_uri().secondary_uri() != snapshot1.snapshot_qualified_uri().secondary_uri()); CHECK(snapshot1.snapshot_qualified_uri().primary_uri().query().find(_XPLATSTR("snapshot")) != utility::string_t::npos); CHECK(snapshot1.snapshot_qualified_uri().secondary_uri().query().find(_XPLATSTR("snapshot")) != utility::string_t::npos); - + CHECK_THROW(snapshot1.upload_properties(azure::storage::access_condition(), azure::storage::blob_request_options(), m_context), std::logic_error); CHECK_THROW(snapshot1.upload_metadata(azure::storage::access_condition(), azure::storage::blob_request_options(), m_context), std::logic_error); CHECK_THROW(snapshot1.create_snapshot(azure::storage::cloud_metadata(), azure::storage::access_condition(), azure::storage::blob_request_options(), m_context), std::logic_error); @@ -700,4 +700,130 @@ SUITE(Blob) CHECK_EQUAL(web::http::status_codes::Conflict, m_context.request_results().back().http_status_code()); } } + + /// + /// Test parallel download + /// + TEST_FIXTURE(blob_test_base, parallel_download) + { + // download blob smaller than 32MB. + { + auto blob_name = get_random_string(20); + auto blob = m_container.get_block_blob_reference(blob_name); + size_t target_length = 31 * 1024 * 1024; + azure::storage::blob_request_options option; + option.set_parallelism_factor(2); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context); + + // download target blob in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + blob.download_to_stream(download_buffer.create_ostream(), azure::storage::access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(blob.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + + // blob with size larger than 32MB. + { + auto blob_name = get_random_string(20); + auto blob = m_container.get_block_blob_reference(blob_name); + size_t target_length = 100 * 1024 * 1024; + azure::storage::blob_request_options option; + option.set_parallelism_factor(2); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context); + + // download target blob in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + blob.download_to_stream(download_buffer.create_ostream(), azure::storage::access_condition(), option, context); + + check_parallelism(context, 2); + CHECK(blob.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + } + + TEST_FIXTURE(blob_test_base, parallel_download_with_md5) + { + // transactional md5 enabled. + // download blob smaller than 4MB. + { + auto blob_name = get_random_string(20); + auto blob = m_container.get_block_blob_reference(blob_name); + size_t target_length = 1 * 1024 * 1024; + azure::storage::blob_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context); + + // download target blob in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + blob.download_to_stream(download_buffer.create_ostream(), azure::storage::access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(blob.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + + // download blob larger than 4MB. + { + auto blob_name = get_random_string(20); + auto blob = m_container.get_block_blob_reference(blob_name); + size_t target_length = 21 * 1024 * 1024; + azure::storage::blob_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context); + + // download target blob in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + blob.download_to_stream(download_buffer.create_ostream(), azure::storage::access_condition(), option, context); + + check_parallelism(context, 2); + CHECK(blob.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + } + + TEST_FIXTURE(blob_test_base, parallel_download_empty_blob) + { + auto blob_name = get_random_string(20); + auto blob = m_container.get_block_blob_reference(blob_name); + size_t target_length = 0; + azure::storage::blob_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + blob.upload_from_stream(upload_buffer.create_istream(), azure::storage::access_condition(), option, m_context); + + // download target blob in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + blob.download_to_stream(download_buffer.create_ostream(), azure::storage::access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(blob.properties().size() == target_length); + } } diff --git a/Microsoft.WindowsAzure.Storage/tests/cloud_file_test.cpp b/Microsoft.WindowsAzure.Storage/tests/cloud_file_test.cpp index b81eef40..375c2b03 100644 --- a/Microsoft.WindowsAzure.Storage/tests/cloud_file_test.cpp +++ b/Microsoft.WindowsAzure.Storage/tests/cloud_file_test.cpp @@ -394,7 +394,7 @@ SUITE(File) ranges_clear = m_file.list_ranges(0, 2048, azure::storage::file_access_condition(), azure::storage::file_request_options(), m_context); CHECK(ranges_clear.size() == 0); - // verify write range with start offset not zero. + // verify write range with start start_offset not zero. m_file.create(1024, azure::storage::file_access_condition(), azure::storage::file_request_options(), m_context); ranges0 = m_file.list_ranges(azure::storage::file_access_condition(), azure::storage::file_request_options(), m_context); CHECK(ranges0.size() == 0); @@ -474,4 +474,132 @@ SUITE(File) CHECK(!file.properties().content_md5().empty()); } } + + /// + /// Test parallel download + /// + TEST_FIXTURE(file_test_base, parallel_download) + { + // download file smaller than 32MB. + { + auto file_name = get_random_string(20); + auto file = m_share.get_root_directory_reference().get_file_reference(file_name); + size_t target_length = 31 * 1024 * 1024; + azure::storage::file_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(false); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + file.upload_from_stream(upload_buffer.create_istream(), azure::storage::file_access_condition(), option, m_context); + + // download target file in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + file.download_to_stream(download_buffer.create_ostream(), azure::storage::file_access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(file.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + + // file with size larger than 32MB. + { + auto file_name = get_random_string(20); + auto file = m_share.get_root_directory_reference().get_file_reference(file_name); + size_t target_length = 100 * 1024 * 1024; + azure::storage::file_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(false); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + file.upload_from_stream(upload_buffer.create_istream(), azure::storage::file_access_condition(), option, m_context); + + // download target file in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + file.download_to_stream(download_buffer.create_ostream(), azure::storage::file_access_condition(), option, context); + + check_parallelism(context, 2); + CHECK(file.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + } + + TEST_FIXTURE(file_test_base, parallel_download_with_md5) + { + // transactional md5 enabled. + // download file smaller than 4MB. + { + auto file_name = get_random_string(20); + auto file = m_share.get_root_directory_reference().get_file_reference(file_name); + size_t target_length = 1 * 1024 * 1024; + azure::storage::file_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + file.upload_from_stream(upload_buffer.create_istream(), azure::storage::file_access_condition(), option, m_context); + + // download target file in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + file.download_to_stream(download_buffer.create_ostream(), azure::storage::file_access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(file.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + + // download file larger than 4MB. + { + auto file_name = get_random_string(20); + auto file = m_share.get_root_directory_reference().get_file_reference(file_name); + size_t target_length = 21 * 1024 * 1024; + azure::storage::file_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + file.upload_from_stream(upload_buffer.create_istream(), azure::storage::file_access_condition(), option, m_context); + + // download target file in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + file.download_to_stream(download_buffer.create_ostream(), azure::storage::file_access_condition(), option, context); + + check_parallelism(context, 2); + CHECK(file.properties().size() == target_length); + CHECK(download_buffer.collection().size() == target_length); + CHECK(std::equal(data.begin(), data.end(), download_buffer.collection().begin())); + } + } + + TEST_FIXTURE(file_test_base, parallel_download_empty_file) + { + auto file_name = get_random_string(20); + auto file = m_share.get_root_directory_reference().get_file_reference(file_name); + size_t target_length = 0; + azure::storage::file_request_options option; + option.set_parallelism_factor(2); + option.set_use_transactional_md5(true); + std::vector data; + data.resize(target_length); + concurrency::streams::container_buffer> upload_buffer(data); + file.upload_from_stream(upload_buffer.create_istream(), azure::storage::file_access_condition(), option, m_context); + + // download target file in parallel. + azure::storage::operation_context context; + concurrency::streams::container_buffer> download_buffer; + file.download_to_stream(download_buffer.create_ostream(), azure::storage::file_access_condition(), option, context); + + check_parallelism(context, 1); + CHECK(file.properties().size() == target_length); + } } \ No newline at end of file diff --git a/Microsoft.WindowsAzure.Storage/tests/test_base.h b/Microsoft.WindowsAzure.Storage/tests/test_base.h index 6617ccb1..db12d8b5 100644 --- a/Microsoft.WindowsAzure.Storage/tests/test_base.h +++ b/Microsoft.WindowsAzure.Storage/tests/test_base.h @@ -58,7 +58,8 @@ class test_base protected: static void print_client_request_id(const azure::storage::operation_context& context, const utility::string_t& purpose); - + static void check_parallelism(const azure::storage::operation_context& context, int expected_parallelism); + azure::storage::operation_context m_context; static utility::string_t object_name_prefix; diff --git a/Microsoft.WindowsAzure.Storage/version.rc b/Microsoft.WindowsAzure.Storage/version.rc index 91ed4e1c..6d40764b 100644 Binary files a/Microsoft.WindowsAzure.Storage/version.rc and b/Microsoft.WindowsAzure.Storage/version.rc differ diff --git a/README.md b/README.md index 90e40d07..926454c2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Azure Storage Client Library for C++ (2.5.0) +# Azure Storage Client Library for C++ (2.6.0) The Azure Storage Client Library for C++ allows you to build applications against Microsoft Azure Storage. For an overview of Azure Storage, see [Introduction to Microsoft Azure Storage](http://azure.microsoft.com/en-us/documentation/articles/storage-introduction/). @@ -23,6 +23,8 @@ The Azure Storage Client Library for C++ allows you to build applications agains For the best development experience, we recommend that developers use the official Microsoft NuGet packages for libraries. NuGet packages are regularly updated with new functionality and hotfixes. Download the [NuGet Package](http://www.nuget.org/packages/wastorage). +Azure Storage Client Library for C++ is also avaiable on Vcpkg since v2.5.0. To get know more about Vcpkg, please visit https://github.com/Microsoft/vcpkg. + ## Requirements To call Azure services, you must first have an Azure subscription. Sign up for a [free trial](https://azure.microsoft.com/en-us/pricing/free-trial/) or use your [MSDN subscriber benefits](https://azure.microsoft.com/en-us/pricing/member-offers/msdn-benefits-details/). @@ -58,6 +60,15 @@ To install the binaries for the Azure Storage Client Library for C++, type the f `Install-Package wastorage` +### Via Vcpkg + +To install the Azure Storage Client Library for C++ through Vcpkg, you need Vcpkg installed first. Please follow the instructions(https://github.com/Microsoft/vcpkg#quick-start) to install Vcpkg. + +install package with: +``` +C:\src\vcpkg> .\vcpkg install azure-storage-cpp +``` + #### Visual Studio Version Starting from version 2.1.0, Azure Storage Client Library for C++ supports Visual Studio 2013 and Visual Studio 2015. In case you have the need to use Visual Studio 2012, please get [version 2.0.0](http://www.nuget.org/packages/wastorage/2.0.0). diff --git a/wastorage.nuspec b/wastorage.nuspec index 10b415a6..a0afd94d 100644 --- a/wastorage.nuspec +++ b/wastorage.nuspec @@ -2,7 +2,7 @@ wastorage - 2.5.0 + 2.6.0 Microsoft Azure Storage Client Library for C++ Microsoft Corporation Microsoft Corporation @@ -14,8 +14,8 @@ Public release Microsoft Azure Storage Table Blob Queue File Scalable windowsazureofficial - - + + diff --git a/wastorage.v120.nuspec b/wastorage.v120.nuspec index 157b5e85..3f13a176 100644 --- a/wastorage.v120.nuspec +++ b/wastorage.v120.nuspec @@ -2,7 +2,7 @@ wastorage.v120 - 2.5.0 + 2.6.0 Microsoft Azure Storage Client Library for C++ Microsoft Corporation Microsoft Corporation diff --git a/wastorage.v140.nuspec b/wastorage.v140.nuspec index 4f95d41d..06dc44cb 100644 --- a/wastorage.v140.nuspec +++ b/wastorage.v140.nuspec @@ -2,7 +2,7 @@ wastorage.v140 - 2.5.0 + 2.6.0 Microsoft Azure Storage Client Library for C++ Microsoft Corporation Microsoft Corporation