Skip to content

Commit

Permalink
Implement and test S3 Sink, with usage in runtime. (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell authored Jul 20, 2024
1 parent 9f392b1 commit 597eed5
Show file tree
Hide file tree
Showing 22 changed files with 1,314 additions and 124 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Support for writing to S3 buckets.

### Changed

- Calling `acquire_get_configuration` with a Zarr storage device now returns a URI of the storage device, with file://
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ add_library(${tgt} MODULE
writers/sink.creator.cpp
writers/file.sink.hh
writers/file.sink.cpp
writers/s3.sink.hh
writers/s3.sink.cpp
writers/writer.hh
writers/writer.cpp
writers/zarrv2.writer.hh
Expand Down
8 changes: 1 addition & 7 deletions src/common/s3.connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,7 @@ common::S3Connection::create_multipart_object(std::string_view bucket_name,
args.object = object_name;

auto response = client_->CreateMultipartUpload(args);
if (!response) {
LOGE("Failed to create multipart object %s in bucket %s: %s",
object_name.data(),
bucket_name.data(),
response.Error().String().c_str());
return {};
}
CHECK(!response.upload_id.empty());

return response.upload_id;
}
Expand Down
112 changes: 103 additions & 9 deletions src/common/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,47 @@ common::split_uri(const std::string& uri)
{
const char delim = '/';

size_t begin = 0;
auto end = uri.find_first_of(delim);

std::vector<std::string> out;
size_t begin = 0, end = uri.find_first_of(delim);

while (end != std::string::npos) {
if (end > begin) {
out.emplace_back(uri.substr(begin, end - begin));
}
std::string part = uri.substr(begin, end - begin);
if (!part.empty())
out.push_back(part);

begin = end + 1;
end = uri.find_first_of(delim, begin);
}

// Add the last segment of the URI (if any) after the last '/'
std::string last_part = uri.substr(begin);
if (!last_part.empty()) {
out.push_back(last_part);
}

return out;
}

void
common::parse_path_from_uri(std::string_view uri,
std::string& bucket_name,
std::string& path)
{
auto parts = split_uri(uri.data());
EXPECT(parts.size() > 2, "Invalid URI: %s", uri.data());

bucket_name = parts[2];
path = "";
for (size_t i = 3; i < parts.size(); ++i) {
path += parts[i];
if (i < parts.size() - 1) {
path += "/";
}
}
}

bool
common::is_s3_uri(const std::string& uri)
common::is_web_uri(std::string_view uri)
{
return uri.starts_with("s3://") || uri.starts_with("http://") ||
uri.starts_with("https://");
Expand Down Expand Up @@ -480,6 +504,76 @@ extern "C"

return retval;
}
}

#endif
acquire_export int unit_test__split_uri()
{
try {
auto parts = common::split_uri("s3://bucket/key");
CHECK(parts.size() == 3);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");
CHECK(parts[2] == "key");

parts = common::split_uri("s3://bucket/key/");
CHECK(parts.size() == 3);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");
CHECK(parts[2] == "key");

parts = common::split_uri("s3://bucket/key/with/slashes");
CHECK(parts.size() == 5);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");
CHECK(parts[2] == "key");
CHECK(parts[3] == "with");
CHECK(parts[4] == "slashes");

parts = common::split_uri("s3://bucket");
CHECK(parts.size() == 2);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");

parts = common::split_uri("s3://");
CHECK(parts.size() == 1);
CHECK(parts[0] == "s3:");

parts = common::split_uri("s3:///");
CHECK(parts.size() == 1);
CHECK(parts[0] == "s3:");

parts = common::split_uri("s3://bucket/");
CHECK(parts.size() == 2);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");

parts = common::split_uri("s3://bucket/");
CHECK(parts.size() == 2);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");

parts = common::split_uri("s3://bucket/key/with/slashes/");
CHECK(parts.size() == 5);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");
CHECK(parts[2] == "key");
CHECK(parts[3] == "with");
CHECK(parts[4] == "slashes");

parts = common::split_uri("s3://bucket/key/with/slashes//");
CHECK(parts.size() == 5);
CHECK(parts[0] == "s3:");
CHECK(parts[1] == "bucket");
CHECK(parts[2] == "key");
CHECK(parts[3] == "with");
CHECK(parts[4] == "slashes");
return 1;
} catch (const std::exception& exc) {
LOGE("Exception: %s\n", exc.what());
} catch (...) {
LOGE("Exception: (unknown)");
}

return 0;
}
}
#endif
18 changes: 16 additions & 2 deletions src/common/utilities.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ namespace acquire::sink::zarr {

struct Zarr;

namespace common {
enum class ZarrVersion
{
V2 = 2,
V3
};

namespace common {
/// @brief Get the number of chunks along a dimension.
/// @param dimension A dimension.
/// @return The number of, possibly ragged, chunks along the dimension, given
Expand Down Expand Up @@ -99,11 +104,20 @@ align_up(size_t n, size_t align);
std::vector<std::string>
split_uri(const std::string& uri);

/// @brief Get the endpoint and bucket name from a URI.
/// @param[in] uri String to parse.
/// @param[out] endpoint The endpoint of the URI.
/// @param[out] bucket_name The bucket name of the URI.
void
parse_path_from_uri(std::string_view uri,
std::string& bucket_name,
std::string& path);

/// @brief Check if a URI is an S3 URI.
/// @param uri String to check.
/// @return True if the URI is an S3 URI, false otherwise.
bool
is_s3_uri(const std::string& uri);
is_web_uri(std::string_view uri);
} // namespace acquire::sink::zarr::common
} // namespace acquire::sink::zarr

Expand Down
5 changes: 2 additions & 3 deletions src/writers/file.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ zarr::FileSink::FileSink(const std::string& uri)
bool
zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf)
{
if (!file_) {
return false;
}
CHECK(buf);
CHECK(bytes_of_buf);

return file_write(file_.get(), offset, buf, buf + bytes_of_buf);
}
170 changes: 170 additions & 0 deletions src/writers/s3.sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#include "s3.sink.hh"

#include "common/utilities.hh"
#include "logger.h"

#include <miniocpp/client.h>

namespace zarr = acquire::sink::zarr;

zarr::S3Sink::S3Sink(std::string_view bucket_name,
std::string_view object_key,
std::shared_ptr<common::S3ConnectionPool> connection_pool)
: bucket_name_{ bucket_name }
, object_key_{ object_key }
, connection_pool_{ connection_pool }
{
CHECK(!bucket_name_.empty());
CHECK(!object_key_.empty());
CHECK(connection_pool_);
}

zarr::S3Sink::~S3Sink()
{
if (!is_multipart_upload_() && n_bytes_buffered_ > 0) {
if (!put_object_()) {
LOGE("Failed to upload object: %s", object_key_.c_str());
}
} else if (is_multipart_upload_()) {
if (n_bytes_buffered_ > 0 && !flush_part_()) {
LOGE("Failed to upload part %zu of object %s",
parts_.size() + 1,
object_key_.c_str());
}
if (!finalize_multipart_upload_()) {
LOGE("Failed to finalize multipart upload of object %s",
object_key_.c_str());
}
}
}

bool
zarr::S3Sink::write(size_t _, const uint8_t* data, size_t bytes_of_data)
{
CHECK(data);
CHECK(bytes_of_data);

while (bytes_of_data > 0) {
const auto bytes_to_write =
std::min(bytes_of_data, part_buffer_.size() - n_bytes_buffered_);

if (bytes_to_write) {
std::copy_n(
data, bytes_to_write, part_buffer_.begin() + n_bytes_buffered_);
n_bytes_buffered_ += bytes_to_write;
data += bytes_to_write;
bytes_of_data -= bytes_to_write;
}

if (n_bytes_buffered_ == part_buffer_.size() && !flush_part_()) {
return false;
}
}

return true;
}

bool
zarr::S3Sink::put_object_()
{
if (n_bytes_buffered_ == 0) {
return false;
}

auto connection = connection_pool_->get_connection();

bool retval = false;
try {
std::string etag =
connection->put_object(bucket_name_,
object_key_,
{ part_buffer_.data(), n_bytes_buffered_ });
EXPECT(
!etag.empty(), "Failed to upload object: %s", object_key_.c_str());

retval = true;
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
}

// cleanup
connection_pool_->return_connection(std::move(connection));
n_bytes_buffered_ = 0;

return retval;
}

bool
zarr::S3Sink::is_multipart_upload_() const
{
return !upload_id_.empty() && !parts_.empty();
}

std::string
zarr::S3Sink::get_multipart_upload_id_()
{
if (upload_id_.empty()) {
upload_id_ =
connection_pool_->get_connection()->create_multipart_object(
bucket_name_, object_key_);
}

return upload_id_;
}

bool
zarr::S3Sink::flush_part_()
{
if (n_bytes_buffered_ == 0) {
return false;
}

auto connection = connection_pool_->get_connection();

bool retval = false;
try {
minio::s3::Part part;
part.number = static_cast<unsigned int>(parts_.size()) + 1;

std::span<uint8_t> data(part_buffer_.data(), n_bytes_buffered_);
part.etag =
connection->upload_multipart_object_part(bucket_name_,
object_key_,
get_multipart_upload_id_(),
data,
part.number);
EXPECT(!part.etag.empty(),
"Failed to upload part %u of object %s",
part.number,
object_key_.c_str());

parts_.push_back(part);

retval = true;
} catch (const std::exception& exc) {
LOGE("Error: %s", exc.what());
}

// cleanup
connection_pool_->return_connection(std::move(connection));
n_bytes_buffered_ = 0;

return retval;
}

bool
zarr::S3Sink::finalize_multipart_upload_()
{
if (!is_multipart_upload_()) {
return false;
}

auto connection = connection_pool_->get_connection();

bool retval = connection->complete_multipart_object(
bucket_name_, object_key_, upload_id_, parts_);

connection_pool_->return_connection(std::move(connection));

return retval;
}
Loading

0 comments on commit 597eed5

Please sign in to comment.