-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-40036: [C++] Azure file system write buffering & async writes #43096
Conversation
|
No problem. Sorry for not reviewing this and #43098. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in DoWrite is very similar to the code in the S3 FS. Maybe this could be unified? I didn't see this in the scope of the PR though.
It's better if it improve maintainability. We can work on it as a follow-up task.
cpp/src/arrow/filesystem/azurefs.cc
Outdated
} | ||
|
||
Future<> FlushAsync() { | ||
RETURN_NOT_OK(CheckClosed("flush")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move
arrow/cpp/src/arrow/filesystem/azurefs.cc
Lines 1063 to 1067 in 0bae073
RETURN_NOT_OK(CheckClosed("flush")); | |
if (!initialised_) { | |
// If the stream has not been successfully initialized then there is nothing to | |
// flush. This also avoids some unhandled errors when flushing in the destructor. | |
return Status::OK(); |
Flush()
?It seems that we don't need to execute
CheckClosed("flush")
and if (!initialized_)
in both of Flush()
and FlushAsync()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this to Flush
, since this is the public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree with @kou. Also the S3 filesystem implements Flush()
as
Status Flush() override {
auto fut = FlushAsync();
return fut.status();
}
I think it would be nice if we could do the same because it ensures that Flush
and FlushAsync
have the same behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to unify the sync/async flush implementations, unfortunately that did not work out due to lifetime issues in the async case. When an ObjectAppendStream
is deconstructed in RAII way, Close()
(and therefore Flush()
) is called. If we call FlushAsync()
in the close, we need to create a shared_ptr
from this
(to ensure lifetime of this
when the lambda is actually called), but: we can not create a shared_ptr
of this
while it is deconstructed. Hence in the Close()
call we always must do a sync Flush
where we do not have to give these lifetime guarantees.
TLDR: Flush()
and FlushAsync()
impls are similar, but slightly different and decoupled implementations.
cpp/src/arrow/filesystem/azurefs.cc
Outdated
auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { | ||
data_ptr += offset; | ||
nbytes -= offset; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about updating pos_
and content_length_
too in this and improve variable name for the change?
std::shared_ptr<Buffer> buffer; | ||
do { | ||
ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); | ||
ASSERT_TRUE(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it asserts that reading the input in fact worked (because that is also using the underlying Azure file system implementation). But we can also get rid of it, I don't have a strong opinion on it but I also don't see any harm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let's remove it. The previous ASSERT_OK_AND_ASSIGN(buffer)
must detect any invalid situations.
auto data = SetUpPreexistingData(); | ||
const auto path = data.ContainerPath("test-write-object"); | ||
ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {})); | ||
std::array<std::int64_t, 3> sizes{2570 * 1024, 258 * 1024, 259 * 1024}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment why we should use these sizes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sizes were kind of arbitrary (I did not introduce them), I changed to more reasonable sizes. The rationale of the test is just to issue different sizes of writes that trigger different mechanisms (e.g. buffering, directly uploading, etc.).
std::string(sizes[2], 'C'), | ||
}; | ||
auto expected = std::int64_t{0}; | ||
for (auto i = 0; i != 3; ++i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use i < buffers.size()
here?
std::shared_ptr<io::OutputStream> stream; | ||
auto data = SetUpPreexistingData(); | ||
const std::string path = data.ContainerPath("test-write-object"); | ||
constexpr auto payload = PreexistingData::kLoremIpsum; | ||
|
||
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<io::OutputStream> stream; | |
auto data = SetUpPreexistingData(); | |
const std::string path = data.ContainerPath("test-write-object"); | |
constexpr auto payload = PreexistingData::kLoremIpsum; | |
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); | |
auto data = SetUpPreexistingData(); | |
const std::string path = data.ContainerPath("test-write-object"); | |
constexpr auto payload = PreexistingData::kLoremIpsum; | |
ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); |
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); | ||
ASSERT_OK(stream->Write(payload)); | ||
// Destructor implicitly closes stream and completes the multipart upload. | ||
// GH-37670: Testing it doesn't matter whether flush is triggered asynchronously |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that GH-37670 is for S3 filesystem.
Is this true for Azure filesystem too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, not the exact wording, as we don't use a "multipart upload" but we also trigger the upload of the last data, if any, and then commit it. So there is also some I/O when the stream is closed which we need to make sure we wait on. I'll clarified the comment and removed the ref to the GitHub issue.
std::shared_ptr<io::OutputStream> stream; | ||
constexpr auto* payload = "new data"; | ||
auto data = SetUpPreexistingData(); | ||
const std::string path = data.ContainerPath("test-write-object"); | ||
|
||
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<io::OutputStream> stream; | |
constexpr auto* payload = "new data"; | |
auto data = SetUpPreexistingData(); | |
const std::string path = data.ContainerPath("test-write-object"); | |
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream(path)); | |
constexpr auto* payload = "new data"; | |
auto data = SetUpPreexistingData(); | |
const std::string path = data.ContainerPath("test-write-object"); | |
ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path)); |
cpp/src/arrow/filesystem/azurefs.cc
Outdated
ARROW_ASSIGN_OR_RAISE(current_block_, io::BufferOutputStream::Create( | ||
kBlockUploadSize, io_context_.pool())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse Reset()
-ed current_block_
instead of creating a new one?
Yeah, I just have to admit that I haven't come up with a good idea to factor this out and provide a good abstraction on top. Because this functionality may be needed by any file system that implements write buffering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a complicated change (at least for me) and I don't currently have time to give a full review.
A more detailed leak trace (obtained using
|
The leak manifests in |
Interesting, I don't really see where this could come from actually. From the tests that are affected, for me it looks like that only the path when we go through |
@OliLay There's already a skip related to a libxml2 leak with threaded operation, so I added another one. This might actually be fixed by Azure/azure-sdk-for-cpp#5767 which was recently merged in Azure C++. Also cc @felipecrv |
Actually, no, we may need something like this hack from ClickHouse: It was added in this PR: ClickHouse/ClickHouse#45796 |
@github-actions crossbow submit -g cpp |
This comment was marked as outdated.
This comment was marked as outdated.
ed47d2c
to
d337e09
Compare
Hmm, I rebased for CI. |
@github-actions crossbow submit -g cpp |
Revision: d337e09 Submitted crossbow builds: ursacomputing/crossbow @ actions-0a8f77add1 |
macOS build failure is unrelated as it happened elsewhere: https://github.com/apache/arrow/actions/runs/10481257647/job/29030430952 |
After merging your PR, Conbench analyzed the 4 benchmarking runs that have been run so far on merge-commit e1e7c50. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. |
Rationale for this change
See #40036.
What changes are included in this PR?
Write buffering and async writes (similar to what the S3 file system does) in the
ObjectAppendStream
for the Azure file system.With write buffering and async writes, the input scenario creation runtime in the tests (which uses the
ObjectAppendStream
against Azurite) decreased from ~25s (see here) to ~800ms:Are these changes tested?
Added some tests with background writes enabled and disabled (some were taken from the S3 tests). Everything changed should be covered.
Are there any user-facing changes?
AzureOptions
now allows forbackground_writes
to be set (default: true). No breaking changes.Notes
DoWrite
is very similar to the code in the S3 FS. Maybe this could be unified? I didn't see this in the scope of the PR though.ObjectAppendStream::DoAppend
in the case of many small appends #40036