From 1818902c21d1205628392db83f04a02c4adb5e5f Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Thu, 27 Jun 2024 12:58:03 +0200 Subject: [PATCH 01/14] write buffering & asnyc writes for azure blob --- cpp/src/arrow/filesystem/azurefs.cc | 252 ++++++++++++++++++++--- cpp/src/arrow/filesystem/azurefs.h | 3 + cpp/src/arrow/filesystem/azurefs_test.cc | 164 ++++++++++----- 3 files changed, 339 insertions(+), 80 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 9b3c0c0c1d703..3ea4821505729 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -22,6 +22,7 @@ #include "arrow/filesystem/azurefs.h" #include "arrow/filesystem/azurefs_internal.h" +#include "arrow/io/memory.h" // idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail, // so disable it for this file with pragmas. @@ -933,24 +934,29 @@ Result GetBlockList( } } -Status CommitBlockList(std::shared_ptr block_blob_client, - const std::vector& block_ids, - const Blobs::CommitBlockListOptions& options) { +Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id, + Core::IO::MemoryBodyStream& content) { try { - // CommitBlockList puts all block_ids in the latest element. That means in the case of - // overlapping block_ids the newly staged block ids will always replace the - // previously committed blocks. - // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body - block_blob_client->CommitBlockList(block_ids, options); + block_blob_client->StageBlock(id, content); } catch (const Storage::StorageException& exception) { return ExceptionToStatus( - exception, "CommitBlockList failed for '", block_blob_client->GetUrl(), - "'. Committing is required to flush an output/append stream."); + exception, "StageBlock failed for '", block_blob_client->GetUrl(), + "' new_block_id: '", id, + "'. Staging new blocks is fundamental to streaming writes to blob storage."); } + return Status::OK(); } +/** + * Writes will be buffered up to this size (in bytes) before actually uploading them. + */ +static constexpr int64_t kBlockUploadSize = 10 * 1024 * 1024; + class ObjectAppendStream final : public io::OutputStream { + private: + struct UploadState; + public: ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, @@ -958,7 +964,8 @@ class ObjectAppendStream final : public io::OutputStream { const AzureOptions& options) : block_blob_client_(std::move(block_blob_client)), io_context_(io_context), - location_(location) { + location_(location), + background_writes_(options.background_writes) { if (metadata && metadata->size() != 0) { ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_); } else if (options.default_metadata && options.default_metadata->size() != 0) { @@ -1008,10 +1015,13 @@ class ObjectAppendStream final : public io::OutputStream { content_length_ = 0; } } + + upload_state_ = std::make_shared(); + if (content_length_ > 0) { ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); for (auto block : block_list.CommittedBlocks) { - block_ids_.push_back(block.Name); + upload_state_->block_ids.push_back(block.Name); } } initialised_ = true; @@ -1027,16 +1037,38 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } + Status EnsureReadyToFlushFromClose() { + if (current_block_) { + // Upload remaining buffer + RETURN_NOT_OK(WriteBuffer()); + } + + return Status::OK(); + } + Status Close() override { if (closed_) { return Status::OK(); } + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + RETURN_NOT_OK(Flush()); block_blob_client_ = nullptr; closed_ = true; return Status::OK(); } + Future<> CloseAsync() override { + if (closed_) { + return Status::OK(); + } + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + return FlushAsync(); + } + bool closed() const override { return closed_; } Status CheckClosed(const char* action) const { @@ -1052,11 +1084,11 @@ class ObjectAppendStream final : public io::OutputStream { } Status Write(const std::shared_ptr& buffer) override { - return DoAppend(buffer->data(), buffer->size(), buffer); + return DoWrite(buffer->data(), buffer->size(), buffer); } Status Write(const void* data, int64_t nbytes) override { - return DoAppend(data, nbytes); + return DoWrite(data, nbytes); } Status Flush() override { @@ -1066,20 +1098,85 @@ class ObjectAppendStream final : public io::OutputStream { // flush. This also avoids some unhandled errors when flushing in the destructor. return Status::OK(); } - return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_); + + auto fut = FlushAsync(); + RETURN_NOT_OK(fut.status()); + + return CommitBlockList(); + } + + Future<> FlushAsync() { + RETURN_NOT_OK(CheckClosed("flush")); + + // Wait for background writes to finish + std::unique_lock lock(upload_state_->mutex); + return upload_state_->pending_blocks_completed; } private: - Status DoAppend(const void* data, int64_t nbytes, - std::shared_ptr owned_buffer = nullptr) { - RETURN_NOT_OK(CheckClosed("append")); - auto append_data = reinterpret_cast(data); - Core::IO::MemoryBodyStream block_content(append_data, nbytes); - if (block_content.Length() == 0) { - return Status::OK(); + Status WriteBuffer() { + ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish()); + current_block_.reset(); + current_block_size_ = 0; + return AppendBlock(buf); + } + + Status DoWrite(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + + const auto* data_ptr = reinterpret_cast(data); + auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { + data_ptr += offset; + nbytes -= offset; + }; + + // Handle case where we have some bytes buffered from prior calls. + if (current_block_size_ > 0) { + // Try to fill current buffer + const int64_t to_copy = std::min(nbytes, kBlockUploadSize - current_block_size_); + RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy)); + current_block_size_ += to_copy; + advance_ptr(to_copy); + pos_ += to_copy; + + // If buffer isn't full, break + if (current_block_size_ < kBlockUploadSize) { + return Status::OK(); + } + + // Upload current buffer + RETURN_NOT_OK(WriteBuffer()); } - const auto n_block_ids = block_ids_.size(); + // We can upload chunks without copying them into a buffer + while (nbytes >= kBlockUploadSize) { + RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSize)); + advance_ptr(kBlockUploadSize); + pos_ += kBlockUploadSize; + } + + // Buffer remaining bytes + if (nbytes > 0) { + current_block_size_ = nbytes; + ARROW_ASSIGN_OR_RAISE(current_block_, io::BufferOutputStream::Create( + kBlockUploadSize, io_context_.pool())); + RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_)); + pos_ += current_block_size_; + } + + return Status::OK(); + } + + Status AppendBlock(std::shared_ptr buffer) { + return AppendBlock(buffer->data(), buffer->size(), buffer); + } + + std::string CreateBlock() { + std::unique_lock lock(upload_state_->mutex); + const auto n_block_ids = upload_state_->block_ids.size(); // New block ID must always be distinct from the existing block IDs. Otherwise we // will accidentally replace the content of existing blocks, causing corruption. @@ -1093,36 +1190,123 @@ class ObjectAppendStream final : public io::OutputStream { new_block_id.insert(0, required_padding_digits, '0'); // There is a small risk when appending to a blob created by another client that // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` - // suffix significantly reduces the risk, but does not 100% eliminate it. For example - // if the blob was previously created with one block, with id `00001-arrow` then the - // next block we append will conflict with that, and cause corruption. + // suffix significantly reduces the risk, but does not 100% eliminate it. For + // example if the blob was previously created with one block, with id `00001-arrow` + // then the next block we append will conflict with that, and cause corruption. new_block_id += "-arrow"; new_block_id = Core::Convert::Base64Encode( std::vector(new_block_id.begin(), new_block_id.end())); + upload_state_->block_ids.push_back(new_block_id); + + // We only use the future if we have background writes enabled. Without background + // writes the future is initialized as finished and not mutated any more. + if (background_writes_ && upload_state_->blocks_in_progress++ == 0) { + upload_state_->pending_blocks_completed = Future<>::Make(); + } + + return new_block_id; + } + + Status AppendBlock(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckClosed("append")); + + if (nbytes == 0) { + return Status::OK(); + } + + const auto block_id = CreateBlock(); + + if (background_writes_) { + if (owned_buffer == nullptr) { + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); + memcpy(owned_buffer->mutable_data(), data, nbytes); + } else { + DCHECK_EQ(data, owned_buffer->data()); + DCHECK_EQ(nbytes, owned_buffer->size()); + } + + // The closure keeps the buffer and the upload state alive + auto deferred = [owned_buffer, block_id, block_blob_client = block_blob_client_, + state = upload_state_]() mutable -> Status { + Core::IO::MemoryBodyStream block_content(owned_buffer->data(), + owned_buffer->size()); + + auto status = StageBlock(block_blob_client.get(), block_id, block_content); + HandleUploadOutcome(state, status); + return Status::OK(); + }; + RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred))); + } else { + auto append_data = reinterpret_cast(data); + Core::IO::MemoryBodyStream block_content(append_data, nbytes); + + RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content)); + } + + pos_ += nbytes; + content_length_ += nbytes; + + return Status::OK(); + } + + Status CommitBlockList() { + std::unique_lock lock(upload_state_->mutex); try { - block_blob_client_->StageBlock(new_block_id, block_content); + // CommitBlockList puts all block_ids in the latest element. That means in the case + // of overlapping block_ids the newly staged block ids will always replace the + // previously committed blocks. + // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body + block_blob_client_->CommitBlockList(upload_state_->block_ids, + commit_block_list_options_); } catch (const Storage::StorageException& exception) { return ExceptionToStatus( - exception, "StageBlock failed for '", block_blob_client_->GetUrl(), - "' new_block_id: '", new_block_id, - "'. Staging new blocks is fundamental to streaming writes to blob storage."); + exception, "CommitBlockList failed for '", block_blob_client_->GetUrl(), + "'. Committing is required to flush an output/append stream."); } - block_ids_.push_back(new_block_id); - pos_ += nbytes; - content_length_ += nbytes; return Status::OK(); } + static void HandleUploadOutcome(const std::shared_ptr& state, + const Status& status) { + std::unique_lock lock(state->mutex); + if (!status.ok()) { + state->status &= status; + } + // Notify completion + if (--state->blocks_in_progress == 0) { + auto fut = state->pending_blocks_completed; + lock.unlock(); + fut.MarkFinished(state->status); + } + } + std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; int64_t content_length_ = kNoSize; + std::shared_ptr current_block_; + int64_t current_block_size_ = 0; + + const bool background_writes_; + bool closed_ = false; bool initialised_ = false; int64_t pos_ = 0; - std::vector block_ids_; + + // This struct is kept alive through background writes to avoid problems + // in the completion handler. + struct UploadState { + std::mutex mutex; + std::vector block_ids; + int64_t blocks_in_progress = 0; + Status status; + Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK()); + }; + std::shared_ptr upload_state_; + Blobs::CommitBlockListOptions commit_block_list_options_; }; diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 072b061eeb2a9..ebbe00c4ee784 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions { /// This will be ignored if non-empty metadata is passed to OpenOutputStream. std::shared_ptr default_metadata; + /// Whether OutputStream writes will be issued in the background, without blocking. + bool background_writes = true; + private: enum class CredentialKind { kDefault, diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 5ff241b17ff58..73ef9df1b3ba4 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -53,6 +53,7 @@ #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" +#include "arrow/util/future.h" #include "arrow/util/io_util.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" @@ -929,8 +930,9 @@ class TestAzureFileSystem : public ::testing::Test { void UploadLines(const std::vector& lines, const std::string& path, int total_size) { ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - ASSERT_OK(output->Write(all_lines)); + for (auto const& line : lines) { + ASSERT_OK(output->Write(line.data(), line.size())); + } ASSERT_OK(output->Close()); } @@ -1474,6 +1476,93 @@ class TestAzureFileSystem : public ::testing::Test { arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File); } + void AssertObjectContents(AzureFileSystem* fs, std::string_view path, + std::string_view expected) { + ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path})); + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + ASSERT_TRUE(buffer); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(expected, contents); + } + + void TestOpenOutputStreamSmall() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + const std::string_view expected(PreexistingData::kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + AssertObjectContents(fs.get(), path, expected); + } + + void TestOpenOutputStreamLarge() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array buffers{ + std::string(sizes[0], 'A'), + std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), + }; + auto expected = std::int64_t{0}; + for (auto i = 0; i != 3; ++i) { + ASSERT_OK(output->Write(buffers[i])); + expected += sizes[i]; + ASSERT_EQ(expected, output->Tell()); + } + ASSERT_OK(output->Close()); + + AssertObjectContents(fs.get(), path, buffers[0] + buffers[1] + buffers[2]); + } + + void TestOpenOutputStreamCloseAsyncDestructor() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + std::shared_ptr stream; + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + constexpr auto payload = PreexistingData::kLoremIpsum; + + ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + // Destructor implicitly closes stream and completes the multipart upload. + // GH-37670: Testing it doesn't matter whether flush is triggered asynchronously + // after CloseAsync or synchronously after stream.reset() since we're just + // checking that `closeAsyncFut` keeps the stream alive until completion + // rather than segfaulting on a dangling stream + auto close_fut = stream->CloseAsync(); + stream.reset(); + ASSERT_OK(close_fut.MoveResult()); + + AssertObjectContents(fs.get(), path, payload); + } + + void TestOpenOutputStreamDestructor() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + std::shared_ptr stream; + constexpr auto* payload = "new data"; + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + + ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + // Destructor implicitly closes stream and completes the multipart upload. + stream.reset(); + + AssertObjectContents(fs.get(), path, payload); + } + private: using StringMatcher = ::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher>; @@ -2704,55 +2793,20 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) { ASSERT_EQ("text/plain", content_type); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { - auto data = SetUpPreexistingData(); - const auto path = data.ContainerPath("test-write-object"); - ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - const std::string_view expected(PreexistingData::kLoremIpsum); - ASSERT_OK(output->Write(expected)); - ASSERT_OK(output->Close()); - - // Verify we can read the object back. - ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path)); - - std::array inbuf{}; - ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); - - EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamSmall(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { - auto data = SetUpPreexistingData(); - const auto path = data.ContainerPath("test-write-object"); - ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {})); - std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; - std::array buffers{ - std::string(sizes[0], 'A'), - std::string(sizes[1], 'B'), - std::string(sizes[2], 'C'), - }; - auto expected = std::int64_t{0}; - for (auto i = 0; i != 3; ++i) { - ASSERT_OK(output->Write(buffers[i])); - expected += sizes[i]; - ASSERT_EQ(expected, output->Tell()); - } - ASSERT_OK(output->Close()); - - // Verify we can read the object back. - ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path)); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { TestOpenOutputStreamSmall(); } - std::string contents; - std::shared_ptr buffer; - do { - ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); - ASSERT_TRUE(buffer); - contents.append(buffer->ToString()); - } while (buffer->size() != 0); - - EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamLarge(); } +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { TestOpenOutputStreamLarge(); } + TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("test-write-object"); @@ -2820,6 +2874,24 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) { ASSERT_RAISES(Invalid, output->Tell()); } +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) { + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamCloseAsyncDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) { + TestOpenOutputStreamDestructor(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamDestructor(); +} + TEST_F(TestAzuriteFileSystem, OpenOutputStreamUri) { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("open-output-stream-uri.txt"); From ca617a173c92335e5ffe366c0a6e613c7ae30c76 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Tue, 9 Jul 2024 09:35:01 +0200 Subject: [PATCH 02/14] fix: do not increment `pos_` twice --- cpp/src/arrow/filesystem/azurefs.cc | 6 +++--- cpp/src/arrow/filesystem/azurefs_test.cc | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 3ea4821505729..f1eef870b534b 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1141,6 +1141,7 @@ class ObjectAppendStream final : public io::OutputStream { current_block_size_ += to_copy; advance_ptr(to_copy); pos_ += to_copy; + content_length_ += to_copy; // If buffer isn't full, break if (current_block_size_ < kBlockUploadSize) { @@ -1156,6 +1157,7 @@ class ObjectAppendStream final : public io::OutputStream { RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSize)); advance_ptr(kBlockUploadSize); pos_ += kBlockUploadSize; + content_length_ += kBlockUploadSize; } // Buffer remaining bytes @@ -1165,6 +1167,7 @@ class ObjectAppendStream final : public io::OutputStream { kBlockUploadSize, io_context_.pool())); RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_)); pos_ += current_block_size_; + content_length_ += current_block_size_; } return Status::OK(); @@ -1245,9 +1248,6 @@ class ObjectAppendStream final : public io::OutputStream { RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content)); } - pos_ += nbytes; - content_length_ += nbytes; - return Status::OK(); } diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 73ef9df1b3ba4..481a1b53f90f6 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1510,7 +1510,7 @@ class TestAzureFileSystem : public ::testing::Test { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("test-write-object"); ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); - std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array sizes{2570 * 1024, 258 * 1024, 259 * 1024}; std::array buffers{ std::string(sizes[0], 'A'), std::string(sizes[1], 'B'), From 6b498f83dcd8088d97e71f7fb9da2e0195df0820 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Fri, 19 Jul 2024 09:45:24 +0200 Subject: [PATCH 03/14] PR feedback --- cpp/src/arrow/filesystem/azurefs.cc | 21 +++++---- cpp/src/arrow/filesystem/azurefs.h | 2 +- cpp/src/arrow/filesystem/azurefs_test.cc | 54 ++++++++++++------------ 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index f1eef870b534b..10a38bd70387e 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1106,8 +1106,6 @@ class ObjectAppendStream final : public io::OutputStream { } Future<> FlushAsync() { - RETURN_NOT_OK(CheckClosed("flush")); - // Wait for background writes to finish std::unique_lock lock(upload_state_->mutex); return upload_state_->pending_blocks_completed; @@ -1128,9 +1126,11 @@ class ObjectAppendStream final : public io::OutputStream { } const auto* data_ptr = reinterpret_cast(data); - auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { + auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) { data_ptr += offset; nbytes -= offset; + pos_ += offset; + content_length_ += offset; }; // Handle case where we have some bytes buffered from prior calls. @@ -1140,8 +1140,6 @@ class ObjectAppendStream final : public io::OutputStream { RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy)); current_block_size_ += to_copy; advance_ptr(to_copy); - pos_ += to_copy; - content_length_ += to_copy; // If buffer isn't full, break if (current_block_size_ < kBlockUploadSize) { @@ -1156,15 +1154,20 @@ class ObjectAppendStream final : public io::OutputStream { while (nbytes >= kBlockUploadSize) { RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSize)); advance_ptr(kBlockUploadSize); - pos_ += kBlockUploadSize; - content_length_ += kBlockUploadSize; } // Buffer remaining bytes if (nbytes > 0) { current_block_size_ = nbytes; - ARROW_ASSIGN_OR_RAISE(current_block_, io::BufferOutputStream::Create( - kBlockUploadSize, io_context_.pool())); + + if (current_block_ == nullptr) { + ARROW_ASSIGN_OR_RAISE(current_block_, io::BufferOutputStream::Create( + kBlockUploadSize, io_context_.pool())); + } else { + // Re-use the allocation from before. + RETURN_NOT_OK(current_block_->Reset(kBlockUploadSize, io_context_.pool())); + } + RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_)); pos_ += current_block_size_; content_length_ += current_block_size_; diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index ebbe00c4ee784..60079d3f52e24 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -113,7 +113,7 @@ struct ARROW_EXPORT AzureOptions { std::shared_ptr default_metadata; /// Whether OutputStream writes will be issued in the background, without blocking. - bool background_writes = true; + bool background_writes = false; private: enum class CredentialKind { diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 481a1b53f90f6..3c2c929d7be44 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1510,37 +1510,38 @@ class TestAzureFileSystem : public ::testing::Test { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("test-write-object"); ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); - std::array sizes{2570 * 1024, 258 * 1024, 259 * 1024}; - std::array buffers{ - std::string(sizes[0], 'A'), - std::string(sizes[1], 'B'), - std::string(sizes[2], 'C'), - }; - auto expected = std::int64_t{0}; - for (auto i = 0; i != 3; ++i) { + + // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes + std::array sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 * 1024, + 2000}; + std::array buffers{ + std::string(sizes[0], 'A'), std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), std::string(sizes[3], 'D')}; + auto expected_size = std::int64_t{0}; + for (size_t i = 0; i < buffers.size(); ++i) { ASSERT_OK(output->Write(buffers[i])); - expected += sizes[i]; - ASSERT_EQ(expected, output->Tell()); + expected_size += sizes[i]; + ASSERT_EQ(expected_size, output->Tell()); } ASSERT_OK(output->Close()); - AssertObjectContents(fs.get(), path, buffers[0] + buffers[1] + buffers[2]); + AssertObjectContents(fs.get(), path, + buffers[0] + buffers[1] + buffers[2] + buffers[3]); } void TestOpenOutputStreamCloseAsyncDestructor() { ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); - std::shared_ptr stream; auto data = SetUpPreexistingData(); const std::string path = data.ContainerPath("test-write-object"); constexpr auto payload = PreexistingData::kLoremIpsum; - ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); ASSERT_OK(stream->Write(payload)); - // Destructor implicitly closes stream and completes the multipart upload. - // GH-37670: Testing it doesn't matter whether flush is triggered asynchronously + // Destructor implicitly closes stream and completes the upload. + // Testing it doesn't matter whether flush is triggered asynchronously // after CloseAsync or synchronously after stream.reset() since we're just - // checking that `closeAsyncFut` keeps the stream alive until completion - // rather than segfaulting on a dangling stream + // checking that the future keeps the stream alive until completion + // rather than segfaulting on a dangling stream. auto close_fut = stream->CloseAsync(); stream.reset(); ASSERT_OK(close_fut.MoveResult()); @@ -1550,12 +1551,11 @@ class TestAzureFileSystem : public ::testing::Test { void TestOpenOutputStreamDestructor() { ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); - std::shared_ptr stream; constexpr auto* payload = "new data"; auto data = SetUpPreexistingData(); const std::string path = data.ContainerPath("test-write-object"); - ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); ASSERT_OK(stream->Write(payload)); // Destructor implicitly closes stream and completes the multipart upload. stream.reset(); @@ -2793,15 +2793,15 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) { ASSERT_EQ("text/plain", content_type); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) { - options_.background_writes = false; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallBackgroundWrites) { + options_.background_writes = true; TestOpenOutputStreamSmall(); } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { TestOpenOutputStreamSmall(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) { - options_.background_writes = false; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeBackgroundWrites) { + options_.background_writes = true; TestOpenOutputStreamLarge(); } @@ -2878,8 +2878,8 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) { TestOpenOutputStreamCloseAsyncDestructor(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorNoBackgroundWrites) { - options_.background_writes = false; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorBackgroundWrites) { + options_.background_writes = true; TestOpenOutputStreamCloseAsyncDestructor(); } @@ -2887,8 +2887,8 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) { TestOpenOutputStreamDestructor(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) { - options_.background_writes = false; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorBackgroundWrites) { + options_.background_writes = true; TestOpenOutputStreamDestructor(); } From 1d0d3c87d9ee8002fcefe9fb9a972ac0131c9aa6 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Mon, 22 Jul 2024 09:36:51 +0200 Subject: [PATCH 04/14] PR feedback --- cpp/src/arrow/filesystem/azurefs.cc | 62 +++++++++++++++-------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 10a38bd70387e..599645acff898 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -934,6 +934,23 @@ Result GetBlockList( } } +Status CommitBlockList(std::shared_ptr block_blob_client, + const std::vector& block_ids, + const Blobs::CommitBlockListOptions& options) { + try { + // CommitBlockList puts all block_ids in the latest element. That means in the case + // of overlapping block_ids the newly staged block ids will always replace the + // previously committed blocks. + // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body + block_blob_client->CommitBlockList(block_ids, options); + } catch (const Storage::StorageException& exception) { + return ExceptionToStatus( + exception, "CommitBlockList failed for '", block_blob_client->GetUrl(), + "'. Committing is required to flush an output/append stream."); + } + return Status::OK(); +} + Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id, Core::IO::MemoryBodyStream& content) { try { @@ -948,10 +965,8 @@ Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& return Status::OK(); } -/** - * Writes will be buffered up to this size (in bytes) before actually uploading them. - */ -static constexpr int64_t kBlockUploadSize = 10 * 1024 * 1024; +/// Writes will be buffered up to this size (in bytes) before actually uploading them. +static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024; class ObjectAppendStream final : public io::OutputStream { private: @@ -1102,7 +1117,9 @@ class ObjectAppendStream final : public io::OutputStream { auto fut = FlushAsync(); RETURN_NOT_OK(fut.status()); - return CommitBlockList(); + std::unique_lock lock(upload_state_->mutex); + return CommitBlockList(block_blob_client_, upload_state_->block_ids, + commit_block_list_options_); } Future<> FlushAsync() { @@ -1136,13 +1153,14 @@ class ObjectAppendStream final : public io::OutputStream { // Handle case where we have some bytes buffered from prior calls. if (current_block_size_ > 0) { // Try to fill current buffer - const int64_t to_copy = std::min(nbytes, kBlockUploadSize - current_block_size_); + const int64_t to_copy = + std::min(nbytes, kBlockUploadSizeBytes - current_block_size_); RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy)); current_block_size_ += to_copy; advance_ptr(to_copy); // If buffer isn't full, break - if (current_block_size_ < kBlockUploadSize) { + if (current_block_size_ < kBlockUploadSizeBytes) { return Status::OK(); } @@ -1151,9 +1169,9 @@ class ObjectAppendStream final : public io::OutputStream { } // We can upload chunks without copying them into a buffer - while (nbytes >= kBlockUploadSize) { - RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSize)); - advance_ptr(kBlockUploadSize); + while (nbytes >= kBlockUploadSizeBytes) { + RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSizeBytes)); + advance_ptr(kBlockUploadSizeBytes); } // Buffer remaining bytes @@ -1161,11 +1179,12 @@ class ObjectAppendStream final : public io::OutputStream { current_block_size_ = nbytes; if (current_block_ == nullptr) { - ARROW_ASSIGN_OR_RAISE(current_block_, io::BufferOutputStream::Create( - kBlockUploadSize, io_context_.pool())); + ARROW_ASSIGN_OR_RAISE( + current_block_, + io::BufferOutputStream::Create(kBlockUploadSizeBytes, io_context_.pool())); } else { // Re-use the allocation from before. - RETURN_NOT_OK(current_block_->Reset(kBlockUploadSize, io_context_.pool())); + RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, io_context_.pool())); } RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_)); @@ -1254,23 +1273,6 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - Status CommitBlockList() { - std::unique_lock lock(upload_state_->mutex); - try { - // CommitBlockList puts all block_ids in the latest element. That means in the case - // of overlapping block_ids the newly staged block ids will always replace the - // previously committed blocks. - // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body - block_blob_client_->CommitBlockList(upload_state_->block_ids, - commit_block_list_options_); - } catch (const Storage::StorageException& exception) { - return ExceptionToStatus( - exception, "CommitBlockList failed for '", block_blob_client_->GetUrl(), - "'. Committing is required to flush an output/append stream."); - } - return Status::OK(); - } - static void HandleUploadOutcome(const std::shared_ptr& state, const Status& status) { std::unique_lock lock(state->mutex); From 33e90afe47b249d9d9a08937a652bb4880897acf Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Wed, 31 Jul 2024 10:33:13 +0200 Subject: [PATCH 05/14] PR feedback --- cpp/src/arrow/filesystem/azurefs.cc | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 599645acff898..4c546b85f92d5 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1052,21 +1052,15 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - Status EnsureReadyToFlushFromClose() { - if (current_block_) { - // Upload remaining buffer - RETURN_NOT_OK(WriteBuffer()); - } - - return Status::OK(); - } - Status Close() override { if (closed_) { return Status::OK(); } - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + if (current_block_) { + // Upload remaining buffer + RETURN_NOT_OK(WriteBuffer()); + } RETURN_NOT_OK(Flush()); block_blob_client_ = nullptr; @@ -1079,7 +1073,10 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + if (current_block_) { + // Upload remaining buffer + RETURN_NOT_OK(WriteBuffer()); + } return FlushAsync(); } @@ -1290,13 +1287,12 @@ class ObjectAppendStream final : public io::OutputStream { std::shared_ptr block_blob_client_; const io::IOContext io_context_; const AzureLocation location_; + const bool background_writes_; int64_t content_length_ = kNoSize; std::shared_ptr current_block_; int64_t current_block_size_ = 0; - const bool background_writes_; - bool closed_ = false; bool initialised_ = false; int64_t pos_ = 0; From 9785a412d3e1fd6265fd41bfef006e48125b2948 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Thu, 1 Aug 2024 09:08:09 +0200 Subject: [PATCH 06/14] Add comment indicating no thread safety of the output stream --- cpp/src/arrow/filesystem/azurefs.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 4c546b85f92d5..11c5aa9fcef40 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -968,6 +968,7 @@ Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& /// Writes will be buffered up to this size (in bytes) before actually uploading them. static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024; +/// This output stream, similar to other arrow OutputStreams, is not thread-safe. class ObjectAppendStream final : public io::OutputStream { private: struct UploadState; From f9e76380654018845c21aff09d7a31a0720abe78 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Tue, 6 Aug 2024 14:54:49 +0200 Subject: [PATCH 07/14] PR feedback --- cpp/src/arrow/filesystem/azurefs.cc | 38 ++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 11c5aa9fcef40..1da1e0f30ace7 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1044,6 +1044,10 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + Status Abort() override { if (closed_) { return Status::OK(); @@ -1079,7 +1083,10 @@ class ObjectAppendStream final : public io::OutputStream { RETURN_NOT_OK(WriteBuffer()); } - return FlushAsync(); + return FlushAsync().Then([self = Self()]() { + self->block_blob_client_ = nullptr; + self->closed_ = true; + }); } bool closed() const override { return closed_; } @@ -1112,18 +1119,37 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - auto fut = FlushAsync(); - RETURN_NOT_OK(fut.status()); + Future<> pending_blocks_completed; + { + std::unique_lock lock(upload_state_->mutex); + pending_blocks_completed = upload_state_->pending_blocks_completed; + } + RETURN_NOT_OK(pending_blocks_completed.status()); std::unique_lock lock(upload_state_->mutex); return CommitBlockList(block_blob_client_, upload_state_->block_ids, commit_block_list_options_); } Future<> FlushAsync() { - // Wait for background writes to finish - std::unique_lock lock(upload_state_->mutex); - return upload_state_->pending_blocks_completed; + RETURN_NOT_OK(CheckClosed("flushAsync")); + if (!initialised_) { + // If the stream has not been successfully initialized then there is nothing to + // flush. This also avoids some unhandled errors when flushing in the destructor. + return Status::OK(); + } + + Future<> pending_blocks_completed; + { + std::unique_lock lock(upload_state_->mutex); + pending_blocks_completed = upload_state_->pending_blocks_completed; + } + + return pending_blocks_completed.Then([self = Self()] { + std::unique_lock lock(self->upload_state_->mutex); + return CommitBlockList(self->block_blob_client_, self->upload_state_->block_ids, + self->commit_block_list_options_); + }); } private: From 290ce7f499ee6ec7af819c45781e028c3deea342 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Tue, 13 Aug 2024 11:40:12 +0200 Subject: [PATCH 08/14] add a test specifically for `CloseAsync` --- cpp/src/arrow/filesystem/azurefs_test.cc | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 3c2c929d7be44..7ae6604059e0b 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1529,6 +1529,21 @@ class TestAzureFileSystem : public ::testing::Test { buffers[0] + buffers[1] + buffers[2] + buffers[3]); } + void TestOpenOutputStreamCloseAsync() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + auto data = SetUpPreexistingData(); + const std::string path = data.ContainerPath("test-write-object"); + constexpr auto payload = PreexistingData::kLoremIpsum; + + ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); + ASSERT_OK(stream->Write(payload)); + auto close_fut = stream->CloseAsync(); + + ASSERT_OK(close_fut.MoveResult()); + + AssertObjectContents(fs.get(), path, payload); + } + void TestOpenOutputStreamCloseAsyncDestructor() { ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); auto data = SetUpPreexistingData(); @@ -2874,6 +2889,15 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) { ASSERT_RAISES(Invalid, output->Tell()); } +TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) { + TestOpenOutputStreamCloseAsync(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncBackgroundWrites) { + options_.background_writes = true; + TestOpenOutputStreamCloseAsync(); +} + TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) { TestOpenOutputStreamCloseAsyncDestructor(); } From 3157fd941b353c9819e41470c207966a26c26d76 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Wed, 14 Aug 2024 11:08:08 +0200 Subject: [PATCH 09/14] Further PR feedback --- cpp/src/arrow/filesystem/azurefs.cc | 13 ++++++++----- cpp/src/arrow/filesystem/azurefs_test.cc | 14 +++++++++++++- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 1da1e0f30ace7..1796a6af9aba3 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -145,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) { blob_storage_scheme = "http"; dfs_storage_scheme = "http"; } + } else if (kv.first == "background_writes") { + ARROW_ASSIGN_OR_RAISE(background_writes, + ::arrow::internal::ParseBoolean(kv.second)); } else { return Status::Invalid( "Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'"); @@ -973,6 +976,10 @@ class ObjectAppendStream final : public io::OutputStream { private: struct UploadState; + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + public: ObjectAppendStream(std::shared_ptr block_blob_client, const io::IOContext& io_context, const AzureLocation& location, @@ -1044,10 +1051,6 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - std::shared_ptr Self() { - return std::dynamic_pointer_cast(shared_from_this()); - } - Status Abort() override { if (closed_) { return Status::OK(); @@ -1132,7 +1135,7 @@ class ObjectAppendStream final : public io::OutputStream { } Future<> FlushAsync() { - RETURN_NOT_OK(CheckClosed("flushAsync")); + RETURN_NOT_OK(CheckClosed("flush async")); if (!initialised_) { // If the stream has not been successfully initialized then there is nothing to // flush. This also avoids some unhandled errors when flushing in the destructor. diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7ae6604059e0b..f1e5855eb9431 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -631,6 +631,16 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(path, "container/dir/blob"); } + void TestFromUriEnableBackgroundWrites() { + std::string path; + ASSERT_OK_AND_ASSIGN(auto options, + AzureOptions::FromUri( + "abfs://account:password@127.0.0.1:10000/container/dir/blob?" + "background_writes=true", + &path)); + ASSERT_EQ(options.background_writes, true); + } + void TestFromUriCredentialDefault() { ASSERT_OK_AND_ASSIGN( auto options, @@ -774,6 +784,9 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); } TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); } TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); } TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); } +TEST_F(TestAzureOptions, FromUriEnableBackgroundWrites) { + TestFromUriEnableBackgroundWrites(); +} TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); } TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); } TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) { @@ -1483,7 +1496,6 @@ class TestAzureFileSystem : public ::testing::Test { std::shared_ptr buffer; do { ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); - ASSERT_TRUE(buffer); contents.append(buffer->ToString()); } while (buffer->size() != 0); From 25cb6ef8ee7342e3c1a5bca5a8e129fadf9e23da Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Mon, 19 Aug 2024 16:54:16 +0200 Subject: [PATCH 10/14] PR feedback pitrou --- cpp/src/arrow/filesystem/azurefs.cc | 16 ++++---- cpp/src/arrow/filesystem/azurefs.h | 2 +- cpp/src/arrow/filesystem/azurefs_test.cc | 50 ++++++++++++++---------- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 1796a6af9aba3..27f8a10d8288b 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1067,7 +1067,7 @@ class ObjectAppendStream final : public io::OutputStream { if (current_block_) { // Upload remaining buffer - RETURN_NOT_OK(WriteBuffer()); + RETURN_NOT_OK(AppendCurrentBlock()); } RETURN_NOT_OK(Flush()); @@ -1083,7 +1083,7 @@ class ObjectAppendStream final : public io::OutputStream { if (current_block_) { // Upload remaining buffer - RETURN_NOT_OK(WriteBuffer()); + RETURN_NOT_OK(AppendCurrentBlock()); } return FlushAsync().Then([self = Self()]() { @@ -1156,7 +1156,7 @@ class ObjectAppendStream final : public io::OutputStream { } private: - Status WriteBuffer() { + Status AppendCurrentBlock() { ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish()); current_block_.reset(); current_block_size_ = 0; @@ -1192,7 +1192,7 @@ class ObjectAppendStream final : public io::OutputStream { } // Upload current buffer - RETURN_NOT_OK(WriteBuffer()); + RETURN_NOT_OK(AppendCurrentBlock()); } // We can upload chunks without copying them into a buffer @@ -1222,10 +1222,6 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } - Status AppendBlock(std::shared_ptr buffer) { - return AppendBlock(buffer->data(), buffer->size(), buffer); - } - std::string CreateBlock() { std::unique_lock lock(upload_state_->mutex); const auto n_block_ids = upload_state_->block_ids.size(); @@ -1300,6 +1296,10 @@ class ObjectAppendStream final : public io::OutputStream { return Status::OK(); } + Status AppendBlock(std::shared_ptr buffer) { + return AppendBlock(buffer->data(), buffer->size(), buffer); + } + static void HandleUploadOutcome(const std::shared_ptr& state, const Status& status) { std::unique_lock lock(state->mutex); diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 60079d3f52e24..ebbe00c4ee784 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -113,7 +113,7 @@ struct ARROW_EXPORT AzureOptions { std::shared_ptr default_metadata; /// Whether OutputStream writes will be issued in the background, without blocking. - bool background_writes = false; + bool background_writes = true; private: enum class CredentialKind { diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index f1e5855eb9431..45607932c5d0e 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -46,6 +46,7 @@ #include #include #include +#include #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" @@ -567,6 +568,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriDfsStorage() { @@ -583,6 +585,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); ASSERT_EQ(path, "file_system/dir/file"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriAbfs() { @@ -598,6 +601,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "https"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriAbfss() { @@ -613,6 +617,7 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "https"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } void TestFromUriEnableTls() { @@ -629,16 +634,17 @@ class TestAzureOptions : public ::testing::Test { ASSERT_EQ(options.dfs_storage_scheme, "http"); ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); ASSERT_EQ(path, "container/dir/blob"); + ASSERT_EQ(options.background_writes, true); } - void TestFromUriEnableBackgroundWrites() { + void TestFromUriDisableBackgroundWrites() { std::string path; ASSERT_OK_AND_ASSIGN(auto options, AzureOptions::FromUri( "abfs://account:password@127.0.0.1:10000/container/dir/blob?" - "background_writes=true", + "background_writes=false", &path)); - ASSERT_EQ(options.background_writes, true); + ASSERT_EQ(options.background_writes, false); } void TestFromUriCredentialDefault() { @@ -784,8 +790,8 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); } TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); } TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); } TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); } -TEST_F(TestAzureOptions, FromUriEnableBackgroundWrites) { - TestFromUriEnableBackgroundWrites(); +TEST_F(TestAzureOptions, FromUriDisableBackgroundWrites) { + TestFromUriDisableBackgroundWrites(); } TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); } TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); } @@ -1524,11 +1530,15 @@ class TestAzureFileSystem : public ::testing::Test { ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes - std::array sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 * 1024, - 2000}; - std::array buffers{ - std::string(sizes[0], 'A'), std::string(sizes[1], 'B'), - std::string(sizes[2], 'C'), std::string(sizes[3], 'D')}; + std::vector sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 * 1024, + 2000}; + + std::vector buffers{}; + char current_char = 'A'; + for (const auto size : sizes) { + buffers.emplace_back(size, current_char++); + } + auto expected_size = std::int64_t{0}; for (size_t i = 0; i < buffers.size(); ++i) { ASSERT_OK(output->Write(buffers[i])); @@ -2820,15 +2830,15 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) { ASSERT_EQ("text/plain", content_type); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallBackgroundWrites) { - options_.background_writes = true; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) { + options_.background_writes = false; TestOpenOutputStreamSmall(); } TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { TestOpenOutputStreamSmall(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeBackgroundWrites) { - options_.background_writes = true; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) { + options_.background_writes = false; TestOpenOutputStreamLarge(); } @@ -2905,8 +2915,8 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) { TestOpenOutputStreamCloseAsync(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncBackgroundWrites) { - options_.background_writes = true; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncNoBackgroundWrites) { + options_.background_writes = false; TestOpenOutputStreamCloseAsync(); } @@ -2914,8 +2924,8 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) { TestOpenOutputStreamCloseAsyncDestructor(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorBackgroundWrites) { - options_.background_writes = true; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorNoBackgroundWrites) { + options_.background_writes = false; TestOpenOutputStreamCloseAsyncDestructor(); } @@ -2923,8 +2933,8 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) { TestOpenOutputStreamDestructor(); } -TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorBackgroundWrites) { - options_.background_writes = true; +TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) { + options_.background_writes = false; TestOpenOutputStreamDestructor(); } From 147f6de610c14dd27179e60daa04de5d5267cda9 Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Tue, 20 Aug 2024 14:11:18 +0200 Subject: [PATCH 11/14] directly write large writes into a block --- cpp/src/arrow/filesystem/azurefs.cc | 8 +++++--- cpp/src/arrow/filesystem/azurefs_test.cc | 26 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 27f8a10d8288b..6659b147df83c 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -970,6 +970,8 @@ Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& /// Writes will be buffered up to this size (in bytes) before actually uploading them. static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024; +/// The maximum size of a block in Azure Blob (as per docs). +static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024; /// This output stream, similar to other arrow OutputStreams, is not thread-safe. class ObjectAppendStream final : public io::OutputStream { @@ -1196,9 +1198,9 @@ class ObjectAppendStream final : public io::OutputStream { } // We can upload chunks without copying them into a buffer - while (nbytes >= kBlockUploadSizeBytes) { - RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSizeBytes)); - advance_ptr(kBlockUploadSizeBytes); + while (nbytes >= kMaxBlockSizeBytes) { + RETURN_NOT_OK(AppendBlock(data_ptr, kMaxBlockSizeBytes)); + advance_ptr(kMaxBlockSizeBytes); } // Buffer remaining bytes diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 45607932c5d0e..cb00a73990417 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1551,6 +1551,23 @@ class TestAzureFileSystem : public ::testing::Test { buffers[0] + buffers[1] + buffers[2] + buffers[3]); } + void TestOpenOutputStreamLargeSingleWrite() { + ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); + + auto data = SetUpPreexistingData(); + const auto path = data.ContainerPath("test-write-object"); + ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); + + constexpr std::int64_t size{12 * 1024 * 1024}; + const std::string large_string(size, 'X'); + + ASSERT_OK(output->Write(large_string)); + ASSERT_EQ(size, output->Tell()); + ASSERT_OK(output->Close()); + + AssertObjectContents(fs.get(), path, large_string); + } + void TestOpenOutputStreamCloseAsync() { ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); auto data = SetUpPreexistingData(); @@ -2844,6 +2861,15 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) { TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { TestOpenOutputStreamLarge(); } +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWriteNoBackgroundWrites) { + options_.background_writes = false; + TestOpenOutputStreamLargeSingleWrite(); +} + +TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWrite) { + TestOpenOutputStreamLargeSingleWrite(); +} + TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) { auto data = SetUpPreexistingData(); const auto path = data.ContainerPath("test-write-object"); From 2a37491d1bd739b9e8c515f23773dffd63a94eee Mon Sep 17 00:00:00 2001 From: Oliver Layer Date: Wed, 21 Aug 2024 09:23:50 +0200 Subject: [PATCH 12/14] PR feedback pitrou #2 --- cpp/src/arrow/filesystem/azurefs.cc | 7 ++++--- cpp/src/arrow/filesystem/azurefs_test.cc | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 6659b147df83c..0bad856339729 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -1198,9 +1198,10 @@ class ObjectAppendStream final : public io::OutputStream { } // We can upload chunks without copying them into a buffer - while (nbytes >= kMaxBlockSizeBytes) { - RETURN_NOT_OK(AppendBlock(data_ptr, kMaxBlockSizeBytes)); - advance_ptr(kMaxBlockSizeBytes); + while (nbytes >= kBlockUploadSizeBytes) { + const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes); + RETURN_NOT_OK(AppendBlock(data_ptr, upload_size)); + advance_ptr(upload_size); } // Buffer remaining bytes diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index cb00a73990417..7363932edb040 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -46,7 +47,6 @@ #include #include #include -#include #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" From 216e8d57e8b1bd942ae16483a09e4012d567e3ef Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 21 Aug 2024 12:17:13 +0200 Subject: [PATCH 13/14] Skip CloseAsync tests due to false positive leak --- cpp/src/arrow/filesystem/azurefs_test.cc | 31 ++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7363932edb040..8565b857b7613 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1569,6 +1569,31 @@ class TestAzureFileSystem : public ::testing::Test { } void TestOpenOutputStreamCloseAsync() { +#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND) + // This false positive leak is similar to the one pinpointed in the + // have_false_positive_memory_leak_with_generator() comments above, + // though the stack trace is different. It happens when a block list + // is committed from a background thread. + // + // clang-format off + // Direct leak of 968 byte(s) in 1 object(s) allocated from: + // #0 calloc + // #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4) + // #2 __xmlDefaultBufferSize + // #3 xmlBufferCreate + // #4 Azure::Storage::_internal::XmlWriter::XmlWriter() + // #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList + // #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList + // #7 arrow::fs::(anonymous namespace)::CommitBlockList + // #8 arrow::fs::(anonymous namespace)::ObjectAppendStream::FlushAsync()::'lambda' + // clang-format on + // + // TODO perhaps remove this skip once we can rely on + // https://github.com/Azure/azure-sdk-for-cpp/pull/5767 + if (options_.background_writes) { + GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync"; + } +#endif ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); auto data = SetUpPreexistingData(); const std::string path = data.ContainerPath("test-write-object"); @@ -1584,6 +1609,12 @@ class TestAzureFileSystem : public ::testing::Test { } void TestOpenOutputStreamCloseAsyncDestructor() { +#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND) + // See above. + if (options_.background_writes) { + GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync"; + } +#endif ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_)); auto data = SetUpPreexistingData(); const std::string path = data.ContainerPath("test-write-object"); From d337e09ba00db66fc6ab116483b8b714a14ecf37 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 21 Aug 2024 12:32:32 +0200 Subject: [PATCH 14/14] Update comment to point the ClickHouse PR --- cpp/src/arrow/filesystem/azurefs_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 8565b857b7613..9d437d1f83aac 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -1590,6 +1590,9 @@ class TestAzureFileSystem : public ::testing::Test { // // TODO perhaps remove this skip once we can rely on // https://github.com/Azure/azure-sdk-for-cpp/pull/5767 + // + // Also note that ClickHouse has a workaround for a similar issue: + // https://github.com/ClickHouse/ClickHouse/pull/45796 if (options_.background_writes) { GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync"; }