Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Jul 17, 2024
1 parent 31e525d commit 4ad13fd
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 101 deletions.
5 changes: 2 additions & 3 deletions src/writers/file.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ zarr::FileSink::FileSink(const std::string& uri)
bool
zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf)
{
if (!file_) {
return false;
}
CHECK(buf);
CHECK(bytes_of_buf);

return file_write(file_.get(), offset, buf, buf + bytes_of_buf);
}
151 changes: 67 additions & 84 deletions src/writers/s3.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,71 @@ zarr::S3Sink::S3Sink(const std::string& bucket_name,
, connection_pool_{ connection_pool }
, buf_(5 << 20, 0) // 5 MiB is the minimum multipart upload size
{
CHECK(!bucket_name_.empty());
CHECK(!object_key_.empty());
CHECK(connection_pool_);
}

zarr::S3Sink::~S3Sink()
{
close_();
}

bool
zarr::S3Sink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf)
{
CHECK(buf);

try {
return write_to_buffer_(buf, bytes_of_buf);
// upload_id_ is only populated after successfully uploading a part
if (upload_id_.empty() && buf_size_ > 0) {
CHECK(put_object_());
} else if (!upload_id_.empty()) {
if (buf_size_ > 0) {
CHECK(flush_part_());
}
CHECK(finalize_multipart_upload_());
}
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
} catch (...) {
LOGE("Error: (unknown)");
}

return false;
}

bool
zarr::S3Sink::write_to_buffer_(const uint8_t* buf, size_t bytes_of_buf)
zarr::S3Sink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf)
{
CHECK(buf);
CHECK(bytes_of_buf);

while (bytes_of_buf > 0) {
const auto n = std::min(bytes_of_buf, buf_.size() - buf_size_);
std::copy(buf, buf + n, buf_.begin() + buf_size_);
buf_size_ += n;
buf += n;
bytes_of_buf -= n;
if (n) {
std::copy(buf, buf + n, buf_.begin() + buf_size_);
buf_size_ += n;
buf += n;
bytes_of_buf -= n;
}

if (buf_size_ == buf_.size() && !flush_part_()) {
return false;
if (buf_size_ == buf_.size()) {
CHECK(flush_part_());
}
}

return true;
}

bool
zarr::S3Sink::put_object_()
zarr::S3Sink::put_object_() noexcept
{
if (buf_size_ == 0) {
return true;
}

std::unique_ptr<common::S3Connection> connection;
if (!(connection = connection_pool_->get_connection())) {
return false;
}

bool retval = false;

std::span<uint8_t> data{ buf_.data(), buf_size_ };
auto connection = connection_pool_->get_connection();

bool retval = false;
try {
std::span<uint8_t> data(buf_.data(), buf_size_);
std::string etag =
connection->put_object(bucket_name_, object_key_, data);
CHECK(!etag.empty());
EXPECT(
!etag.empty(), "Failed to upload object: %s", object_key_.c_str());

retval = true;
buf_size_ = 0;
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
} catch (...) {
Expand All @@ -87,91 +88,73 @@ zarr::S3Sink::put_object_()

// cleanup
connection_pool_->return_connection(std::move(connection));
buf_size_ = 0;

return retval;
}

bool
zarr::S3Sink::flush_part_()
zarr::S3Sink::flush_part_() noexcept
{
if (buf_size_ == 0) {
return true;
}

std::unique_ptr<common::S3Connection> connection;
if (!(connection = connection_pool_->get_connection())) {
return false;
}

std::string upload_id = upload_id_;
if (upload_id.empty()) {
upload_id =
connection->create_multipart_object(bucket_name_, object_key_);
CHECK(!upload_id.empty());
}
auto connection = connection_pool_->get_connection();

minio::s3::Part part;
part.number = (unsigned int)parts_.size() + 1;
bool retval = false;
try {
std::string upload_id = upload_id_;
if (upload_id.empty()) {
upload_id =
connection->create_multipart_object(bucket_name_, object_key_);
EXPECT(!upload_id.empty(),
"Failed to create multipart object: %s",
object_key_.c_str());
}

std::span<uint8_t> data(buf_.data(), buf_size_);
part.etag = connection->upload_multipart_object_part(
bucket_name_, object_key_, upload_id, data, part.number);
CHECK(!part.etag.empty());
minio::s3::Part part;
part.number = static_cast<unsigned int>(parts_.size()) + 1;

parts_.push_back(part);
std::span<uint8_t> data(buf_.data(), buf_size_);
part.etag = connection->upload_multipart_object_part(
bucket_name_, object_key_, upload_id, data, part.number);
EXPECT(!part.etag.empty(),
"Failed to upload part %u of object %s",
part.number,
object_key_.c_str());

// set only when the part is successfully uploaded
upload_id_ = upload_id;
// set these only when the part is successfully uploaded
parts_.push_back(part);
upload_id_ = upload_id;

retval = true;
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
} catch (...) {
LOGE("Error: (unknown)");
}

// cleanup
connection_pool_->return_connection(std::move(connection));
buf_size_ = 0;

return true;
return retval;
}

bool
zarr::S3Sink::finalize_multipart_upload_()
zarr::S3Sink::finalize_multipart_upload_() noexcept
{
if (upload_id_.empty()) {
return true;
}

std::unique_ptr<common::S3Connection> connection;
if (!(connection = connection_pool_->get_connection())) {
return false;
}

bool retval = false;
auto connection = connection_pool_->get_connection();

try {
retval = connection->complete_multipart_object(
bucket_name_, object_key_, upload_id_, parts_);
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
} catch (...) {
LOGE("Error: (unknown)");
}
bool retval = connection->complete_multipart_object(
bucket_name_, object_key_, upload_id_, parts_);

connection_pool_->return_connection(std::move(connection));

return retval;
}

void
zarr::S3Sink::close_()
{
try {
// upload_id_ is populated after successfully uploading a part
if (upload_id_.empty()) {
CHECK(put_object_());
} else {
CHECK(flush_part_());
CHECK(finalize_multipart_upload_());
}
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
} catch (...) {
LOGE("Error: (unknown)");
}
}
22 changes: 10 additions & 12 deletions src/writers/s3.sink.hh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_S3_SINK_V0
#define H_ACQUIRE_STORAGE_ZARR_WRITERS_S3_SINK_V0
#pragma once

#include "sink.hh"
#include "platform.h"
Expand Down Expand Up @@ -34,18 +33,17 @@ struct S3Sink final : public Sink
std::string upload_id_;
std::list<minio::s3::Part> parts_;

[[nodiscard]] bool write_to_buffer_(const uint8_t* buf,
size_t bytes_of_buf);

// single-part upload
[[nodiscard]] bool put_object_();
/// @brief Upload the object to S3.
/// @returns True if the object was successfully uploaded, otherwise false.
[[nodiscard]] bool put_object_() noexcept;

// multipart upload
[[nodiscard]] bool flush_part_();
[[nodiscard]] bool finalize_multipart_upload_();

void close_();
/// @brief Flush the current part to S3.
/// @returns True if the part was successfully flushed, otherwise false.
[[nodiscard]] bool flush_part_() noexcept;
/// @brief Finalize the multipart upload.
/// @returns True if a multipart upload was successfully finalized, otherwise false.
[[nodiscard]] bool finalize_multipart_upload_() noexcept;
};
} // namespace acquire::sink::zarr

#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_S3_SINK_V0
2 changes: 2 additions & 0 deletions src/writers/sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ struct Sink
/// @param buf The buffer to write to the sink.
/// @param bytes_of_buf The number of bytes to write from @p buf.
/// @return True if the write was successful, false otherwise.
/// @throws std::runtime_error if @p buf is nullptr, @p bytes_of_buf is 0,
/// or the write fails.
[[nodiscard]] virtual bool write(size_t offset,
const uint8_t* buf,
size_t bytes_of_buf) = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ validate_props(const StorageProperties* props)

if (common::is_s3_uri(uri)) {
std::vector<std::string> tokens = common::split_uri(uri);
CHECK(tokens.size() > 2); // http://bucket/key
CHECK(tokens.size() > 2); // http://endpoint/bucket
} else {
const fs::path path = as_path(*props);
fs::path parent_path = path.parent_path().string();
Expand Down Expand Up @@ -363,7 +363,7 @@ zarr::Zarr::set(const StorageProperties* props)

if (common::is_s3_uri(uri)) {
std::vector<std::string> tokens = common::split_uri(uri);
CHECK(tokens.size() > 2); // http://bucket/key
CHECK(tokens.size() > 2); // http://endpoint/bucket
dataset_root_ = uri;
} else {
dataset_root_ = as_path(*props).string();
Expand Down

0 comments on commit 4ad13fd

Please sign in to comment.