From 597eed5eab10e3279ceb79f3a7a3a38adb46188d Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Sat, 20 Jul 2024 12:09:36 -0400 Subject: [PATCH] Implement and test S3 Sink, with usage in runtime. (#278) --- CHANGELOG.md | 4 + src/CMakeLists.txt | 2 + src/common/s3.connection.cpp | 8 +- src/common/utilities.cpp | 112 ++++++++++- src/common/utilities.hh | 18 +- src/writers/file.sink.cpp | 5 +- src/writers/s3.sink.cpp | 170 ++++++++++++++++ src/writers/s3.sink.hh | 60 ++++++ src/writers/sink.creator.cpp | 185 +++++++++++------ src/writers/sink.creator.hh | 40 ++-- src/writers/sink.hh | 2 + src/writers/zarrv2.writer.cpp | 2 +- src/writers/zarrv3.writer.cpp | 3 +- src/zarr.cpp | 87 ++++++-- src/zarr.hh | 2 + src/zarr.v2.cpp | 4 +- src/zarr.v3.cpp | 5 +- tests/CMakeLists.txt | 2 + tests/get-meta.cpp | 1 + tests/unit-tests.cpp | 2 + tests/write-zarr-v2-to-s3.cpp | 363 ++++++++++++++++++++++++++++++++++ tests/write-zarr-v3-to-s3.cpp | 361 +++++++++++++++++++++++++++++++++ 22 files changed, 1314 insertions(+), 124 deletions(-) create mode 100644 src/writers/s3.sink.cpp create mode 100644 src/writers/s3.sink.hh create mode 100644 tests/write-zarr-v2-to-s3.cpp create mode 100644 tests/write-zarr-v3-to-s3.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 5af8d99c..a3c0155a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Support for writing to S3 buckets. + ### Changed - Calling `acquire_get_configuration` with a Zarr storage device now returns a URI of the storage device, with file:// diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 952c7793..eba9a206 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,6 +19,8 @@ add_library(${tgt} MODULE writers/sink.creator.cpp writers/file.sink.hh writers/file.sink.cpp + writers/s3.sink.hh + writers/s3.sink.cpp writers/writer.hh writers/writer.cpp writers/zarrv2.writer.hh diff --git a/src/common/s3.connection.cpp b/src/common/s3.connection.cpp index 1be72ab3..bb848457 100644 --- a/src/common/s3.connection.cpp +++ b/src/common/s3.connection.cpp @@ -129,13 +129,7 @@ common::S3Connection::create_multipart_object(std::string_view bucket_name, args.object = object_name; auto response = client_->CreateMultipartUpload(args); - if (!response) { - LOGE("Failed to create multipart object %s in bucket %s: %s", - object_name.data(), - bucket_name.data(), - response.Error().String().c_str()); - return {}; - } + CHECK(!response.upload_id.empty()); return response.upload_id; } diff --git a/src/common/utilities.cpp b/src/common/utilities.cpp index 63b98f94..769468e6 100644 --- a/src/common/utilities.cpp +++ b/src/common/utilities.cpp @@ -197,23 +197,47 @@ common::split_uri(const std::string& uri) { const char delim = '/'; - size_t begin = 0; - auto end = uri.find_first_of(delim); - std::vector out; + size_t begin = 0, end = uri.find_first_of(delim); + while (end != std::string::npos) { - if (end > begin) { - out.emplace_back(uri.substr(begin, end - begin)); - } + std::string part = uri.substr(begin, end - begin); + if (!part.empty()) + out.push_back(part); begin = end + 1; end = uri.find_first_of(delim, begin); } + + // Add the last segment of the URI (if any) after the last '/' + std::string last_part = uri.substr(begin); + if (!last_part.empty()) { + out.push_back(last_part); + } + return out; } +void +common::parse_path_from_uri(std::string_view uri, + std::string& bucket_name, + std::string& path) +{ + auto parts = split_uri(uri.data()); + EXPECT(parts.size() > 2, "Invalid URI: %s", uri.data()); + + bucket_name = parts[2]; + path = ""; + for (size_t i = 3; i < parts.size(); ++i) { + path += parts[i]; + if (i < parts.size() - 1) { + path += "/"; + } + } +} + bool -common::is_s3_uri(const std::string& uri) +common::is_web_uri(std::string_view uri) { return uri.starts_with("s3://") || uri.starts_with("http://") || uri.starts_with("https://"); @@ -480,6 +504,76 @@ extern "C" return retval; } -} -#endif \ No newline at end of file + acquire_export int unit_test__split_uri() + { + try { + auto parts = common::split_uri("s3://bucket/key"); + CHECK(parts.size() == 3); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + + parts = common::split_uri("s3://bucket/key/"); + CHECK(parts.size() == 3); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + + parts = common::split_uri("s3://bucket/key/with/slashes"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + + parts = common::split_uri("s3://bucket"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://"); + CHECK(parts.size() == 1); + CHECK(parts[0] == "s3:"); + + parts = common::split_uri("s3:///"); + CHECK(parts.size() == 1); + CHECK(parts[0] == "s3:"); + + parts = common::split_uri("s3://bucket/"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://bucket/"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://bucket/key/with/slashes/"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + + parts = common::split_uri("s3://bucket/key/with/slashes//"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + return 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + return 0; + } +} +#endif diff --git a/src/common/utilities.hh b/src/common/utilities.hh index 1290c53e..e2b4c8a5 100644 --- a/src/common/utilities.hh +++ b/src/common/utilities.hh @@ -22,8 +22,13 @@ namespace acquire::sink::zarr { struct Zarr; -namespace common { +enum class ZarrVersion +{ + V2 = 2, + V3 +}; +namespace common { /// @brief Get the number of chunks along a dimension. /// @param dimension A dimension. /// @return The number of, possibly ragged, chunks along the dimension, given @@ -99,11 +104,20 @@ align_up(size_t n, size_t align); std::vector split_uri(const std::string& uri); +/// @brief Get the endpoint and bucket name from a URI. +/// @param[in] uri String to parse. +/// @param[out] endpoint The endpoint of the URI. +/// @param[out] bucket_name The bucket name of the URI. +void +parse_path_from_uri(std::string_view uri, + std::string& bucket_name, + std::string& path); + /// @brief Check if a URI is an S3 URI. /// @param uri String to check. /// @return True if the URI is an S3 URI, false otherwise. bool -is_s3_uri(const std::string& uri); +is_web_uri(std::string_view uri); } // namespace acquire::sink::zarr::common } // namespace acquire::sink::zarr diff --git a/src/writers/file.sink.cpp b/src/writers/file.sink.cpp index d5106dcb..375bde9a 100644 --- a/src/writers/file.sink.cpp +++ b/src/writers/file.sink.cpp @@ -16,9 +16,8 @@ zarr::FileSink::FileSink(const std::string& uri) bool zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) { - if (!file_) { - return false; - } + CHECK(buf); + CHECK(bytes_of_buf); return file_write(file_.get(), offset, buf, buf + bytes_of_buf); } diff --git a/src/writers/s3.sink.cpp b/src/writers/s3.sink.cpp new file mode 100644 index 00000000..b746f70c --- /dev/null +++ b/src/writers/s3.sink.cpp @@ -0,0 +1,170 @@ +#include "s3.sink.hh" + +#include "common/utilities.hh" +#include "logger.h" + +#include + +namespace zarr = acquire::sink::zarr; + +zarr::S3Sink::S3Sink(std::string_view bucket_name, + std::string_view object_key, + std::shared_ptr connection_pool) + : bucket_name_{ bucket_name } + , object_key_{ object_key } + , connection_pool_{ connection_pool } +{ + CHECK(!bucket_name_.empty()); + CHECK(!object_key_.empty()); + CHECK(connection_pool_); +} + +zarr::S3Sink::~S3Sink() +{ + if (!is_multipart_upload_() && n_bytes_buffered_ > 0) { + if (!put_object_()) { + LOGE("Failed to upload object: %s", object_key_.c_str()); + } + } else if (is_multipart_upload_()) { + if (n_bytes_buffered_ > 0 && !flush_part_()) { + LOGE("Failed to upload part %zu of object %s", + parts_.size() + 1, + object_key_.c_str()); + } + if (!finalize_multipart_upload_()) { + LOGE("Failed to finalize multipart upload of object %s", + object_key_.c_str()); + } + } +} + +bool +zarr::S3Sink::write(size_t _, const uint8_t* data, size_t bytes_of_data) +{ + CHECK(data); + CHECK(bytes_of_data); + + while (bytes_of_data > 0) { + const auto bytes_to_write = + std::min(bytes_of_data, part_buffer_.size() - n_bytes_buffered_); + + if (bytes_to_write) { + std::copy_n( + data, bytes_to_write, part_buffer_.begin() + n_bytes_buffered_); + n_bytes_buffered_ += bytes_to_write; + data += bytes_to_write; + bytes_of_data -= bytes_to_write; + } + + if (n_bytes_buffered_ == part_buffer_.size() && !flush_part_()) { + return false; + } + } + + return true; +} + +bool +zarr::S3Sink::put_object_() +{ + if (n_bytes_buffered_ == 0) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = false; + try { + std::string etag = + connection->put_object(bucket_name_, + object_key_, + { part_buffer_.data(), n_bytes_buffered_ }); + EXPECT( + !etag.empty(), "Failed to upload object: %s", object_key_.c_str()); + + retval = true; + } catch (const std::exception& exc) { + LOGE("Error: %s", exc.what()); + } + + // cleanup + connection_pool_->return_connection(std::move(connection)); + n_bytes_buffered_ = 0; + + return retval; +} + +bool +zarr::S3Sink::is_multipart_upload_() const +{ + return !upload_id_.empty() && !parts_.empty(); +} + +std::string +zarr::S3Sink::get_multipart_upload_id_() +{ + if (upload_id_.empty()) { + upload_id_ = + connection_pool_->get_connection()->create_multipart_object( + bucket_name_, object_key_); + } + + return upload_id_; +} + +bool +zarr::S3Sink::flush_part_() +{ + if (n_bytes_buffered_ == 0) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = false; + try { + minio::s3::Part part; + part.number = static_cast(parts_.size()) + 1; + + std::span data(part_buffer_.data(), n_bytes_buffered_); + part.etag = + connection->upload_multipart_object_part(bucket_name_, + object_key_, + get_multipart_upload_id_(), + data, + part.number); + EXPECT(!part.etag.empty(), + "Failed to upload part %u of object %s", + part.number, + object_key_.c_str()); + + parts_.push_back(part); + + retval = true; + } catch (const std::exception& exc) { + LOGE("Error: %s", exc.what()); + } + + // cleanup + connection_pool_->return_connection(std::move(connection)); + n_bytes_buffered_ = 0; + + return retval; +} + +bool +zarr::S3Sink::finalize_multipart_upload_() +{ + if (!is_multipart_upload_()) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = connection->complete_multipart_object( + bucket_name_, object_key_, upload_id_, parts_); + + connection_pool_->return_connection(std::move(connection)); + + return retval; +} diff --git a/src/writers/s3.sink.hh b/src/writers/s3.sink.hh new file mode 100644 index 00000000..5b12bce5 --- /dev/null +++ b/src/writers/s3.sink.hh @@ -0,0 +1,60 @@ +#pragma once + +#include "sink.hh" +#include "platform.h" +#include "common/s3.connection.hh" + +#include + +#include +#include + +namespace acquire::sink::zarr { +class S3Sink final : public Sink +{ + public: + S3Sink() = delete; + S3Sink(std::string_view bucket_name, + std::string_view object_key, + std::shared_ptr connection_pool); + ~S3Sink() override; + + bool write(size_t offset, + const uint8_t* data, + size_t bytes_of_data) override; + + private: + std::string bucket_name_; + std::string object_key_; + + std::shared_ptr connection_pool_; + + // multipart upload + std::array part_buffer_; /// temporary 5MiB buffer for multipart upload + size_t n_bytes_buffered_ = 0; + + std::string upload_id_; + std::list parts_; + + // single-part upload + /// @brief Upload the object to S3. + /// @returns True if the object was successfully uploaded, otherwise false. + [[nodiscard]] bool put_object_(); + + // multipart upload + bool is_multipart_upload_() const; + + /// @brief Get the multipart upload ID, if it exists. Otherwise, create a new + /// multipart upload. + /// @returns The multipart upload ID. + std::string get_multipart_upload_id_(); + + /// @brief Flush the current part to S3. + /// @returns True if the part was successfully flushed, otherwise false. + [[nodiscard]] bool flush_part_(); + /// @brief Finalize the multipart upload. + /// @returns True if a multipart upload was successfully finalized, + /// otherwise false. + [[nodiscard]] bool finalize_multipart_upload_(); +}; +} // namespace acquire::sink::zarr diff --git a/src/writers/sink.creator.cpp b/src/writers/sink.creator.cpp index d0228c8c..83c7cce5 100644 --- a/src/writers/sink.creator.cpp +++ b/src/writers/sink.creator.cpp @@ -1,5 +1,6 @@ #include "sink.creator.hh" #include "file.sink.hh" +#include "s3.sink.hh" #include "common/utilities.hh" #include @@ -25,14 +26,24 @@ zarr::SinkCreator::make_data_sinks( { std::queue paths; - std::string base_dir = base_uri; - if (base_uri.starts_with("file://")) { - base_dir = base_uri.substr(7); - } - paths.emplace(base_dir); + bool is_s3 = common::is_web_uri(base_uri); + std::string bucket_name; + if (is_s3) { + std::string base_dir; + common::parse_path_from_uri(base_uri, bucket_name, base_dir); - if (!make_dirs_(paths)) { - return false; + paths.push(base_dir); + } else { + std::string base_dir = base_uri; + + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + paths.emplace(base_dir); + + if (!make_dirs_(paths)) { + return false; + } } // create directories @@ -52,7 +63,7 @@ zarr::SinkCreator::make_data_sinks( } } - if (!make_dirs_(paths)) { + if (!is_s3 && !make_dirs_(paths)) { return false; } } @@ -73,56 +84,47 @@ zarr::SinkCreator::make_data_sinks( } } - return make_files_(paths, part_sinks); -} - -bool -zarr::SinkCreator::create_v2_metadata_sinks( - const std::string& base_uri, - size_t n_arrays, - std::vector>& metadata_sinks) -{ - if (base_uri.empty()) { - LOGE("Base URI is empty."); - return false; - } - - std::queue dir_paths, file_paths; - - file_paths.emplace(".metadata"); // base metadata - file_paths.emplace("0/.zattrs"); // external metadata - file_paths.emplace(".zattrs"); // group metadata - - for (auto i = 0; i < n_arrays; ++i) { - const auto idx_string = std::to_string(i); - dir_paths.push(idx_string); - file_paths.push(idx_string + "/.zarray"); // array metadata - } - - return make_metadata_sinks_( - base_uri, dir_paths, file_paths, metadata_sinks); + return is_s3 ? make_s3_objects_(bucket_name, paths, part_sinks) + : make_files_(paths, part_sinks); } bool -zarr::SinkCreator::create_v3_metadata_sinks( +zarr::SinkCreator::make_metadata_sinks( + acquire::sink::zarr::ZarrVersion version, const std::string& base_uri, size_t n_arrays, std::vector>& metadata_sinks) { - if (base_uri.empty()) { - LOGE("Base URI is empty."); - return false; - } + EXPECT(!base_uri.empty(), "URI must not be empty."); std::queue dir_paths, file_paths; - dir_paths.emplace("meta"); - dir_paths.emplace("meta/root"); + switch (version) { + case ZarrVersion::V2: + file_paths.emplace(".metadata"); // base metadata + file_paths.emplace("0/.zattrs"); // external metadata + file_paths.emplace(".zattrs"); // group metadata - file_paths.emplace("zarr.json"); - file_paths.emplace("meta/root.group.json"); - for (auto i = 0; i < n_arrays; ++i) { - file_paths.push("meta/root/" + std::to_string(i) + ".array.json"); + for (auto i = 0; i < n_arrays; ++i) { + const auto idx_string = std::to_string(i); + dir_paths.push(idx_string); + file_paths.push(idx_string + "/.zarray"); // array metadata + } + break; + case ZarrVersion::V3: + dir_paths.emplace("meta"); + dir_paths.emplace("meta/root"); + + file_paths.emplace("zarr.json"); + file_paths.emplace("meta/root.group.json"); + for (auto i = 0; i < n_arrays; ++i) { + file_paths.push("meta/root/" + std::to_string(i) + + ".array.json"); + } + break; + default: + throw std::runtime_error("Invalid Zarr version " + + std::to_string(static_cast(version))); } return make_metadata_sinks_( @@ -239,39 +241,92 @@ zarr::SinkCreator::make_metadata_sinks_( std::queue& file_paths, std::vector>& metadata_sinks) { - std::string base_dir = base_uri; - if (base_uri.starts_with("file://")) { - base_dir = base_uri.substr(7); - } + bool is_s3 = common::is_web_uri(base_uri); + std::string bucket_name; + std::string base_dir; - // remove trailing slashes - if (base_uri.ends_with("/") || base_uri.ends_with("\\")) { - base_dir = base_dir.substr(0, base_dir.size() - 1); - } + if (is_s3) { + common::parse_path_from_uri(base_uri, bucket_name, base_dir); - // create the base directories if they don't already exist - // we create them in serial because - // 1. there are only a few of them; and - // 2. they may be nested - while (!dir_paths.empty()) { - const auto dir = base_dir + "/" + dir_paths.front(); - dir_paths.pop(); - if (!fs::is_directory(dir) && !fs::create_directories(dir)) { + // create the bucket if it doesn't already exist + if (!bucket_exists_(bucket_name)) { return false; } + } else { + base_dir = base_uri; + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + + // remove trailing slashes + if (base_uri.ends_with("/") || base_uri.ends_with("\\")) { + base_dir = base_dir.substr(0, base_dir.size() - 1); + } + + // create the base directories if they don't already exist + // we create them in serial because + // 1. there are only a few of them; and + // 2. they may be nested + while (!dir_paths.empty()) { + const auto dir = base_dir + "/" + dir_paths.front(); + dir_paths.pop(); + if (!fs::is_directory(dir) && !fs::create_directories(dir)) { + return false; + } + } } // make files size_t n_paths = file_paths.size(); + const std::string prefix = base_dir.empty() ? "" : base_dir + "/"; for (auto i = 0; i < n_paths; ++i) { - const auto path = base_dir + "/" + file_paths.front(); + const auto path = prefix + file_paths.front(); file_paths.pop(); file_paths.push(path); } - if (!make_files_(file_paths, metadata_sinks)) { + return is_s3 ? make_s3_objects_(bucket_name, file_paths, metadata_sinks) + : make_files_(file_paths, metadata_sinks); +} + +bool +zarr::SinkCreator::bucket_exists_(std::string_view bucket_name) +{ + CHECK(!bucket_name.empty()); + EXPECT(connection_pool_, "S3 connection pool not provided."); + + auto conn = connection_pool_->get_connection(); + bool bucket_exists = conn->bucket_exists(bucket_name); + + connection_pool_->return_connection(std::move(conn)); + + return bucket_exists; +} + +bool +zarr::SinkCreator::make_s3_objects_(std::string_view bucket_name, + std::queue& object_keys, + std::vector>& sinks) +{ + if (object_keys.empty()) { + return true; + } + if (bucket_name.empty()) { + LOGE("Bucket name not provided."); return false; } + if (!connection_pool_) { + LOGE("S3 connection pool not provided."); + return false; + } + + const auto n_objects = object_keys.size(); + sinks.resize(n_objects); + for (auto i = 0; i < n_objects; ++i) { + sinks[i] = std::make_unique( + bucket_name, object_keys.front(), connection_pool_); + object_keys.pop(); + } return true; } diff --git a/src/writers/sink.creator.hh b/src/writers/sink.creator.hh index 4e18bced..01c04d78 100644 --- a/src/writers/sink.creator.hh +++ b/src/writers/sink.creator.hh @@ -1,7 +1,7 @@ -#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 -#define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 +#pragma once #include "sink.hh" +#include "common/utilities.hh" #include "common/dimension.hh" #include "common/thread.pool.hh" #include "common/s3.connection.hh" @@ -24,26 +24,24 @@ struct SinkCreator final /// @param[in] parts_along_dimension Function for computing the number of /// parts (either chunk or shard) along each dimension. /// @param[out] part_sinks The sinks created. + /// @throws std::runtime_error if @p base_uri is not valid, if the number of + /// parts along a dimension cannot be computed, or if, for S3 sinks, + /// the bucket does not exist. [[nodiscard]] bool make_data_sinks( const std::string& base_uri, const std::vector& dimensions, const std::function& parts_along_dimension, std::vector>& part_sinks); - /// @brief Create a collection of metadata sinks for a Zarr V2 dataset. + /// @brief Create a collection of metadata sinks for a Zarr dataset. + /// @param[in] version The Zarr version. /// @param[in] base_uri The base URI for the dataset. /// @param[in] n_arrays The number of data arrays. /// @param[out] metadata_sinks The sinks created. - [[nodiscard]] bool create_v2_metadata_sinks( - const std::string& base_uri, - size_t n_arrays, - std::vector>& metadata_sinks); - - /// @brief Create a collection of metadata sinks for a Zarr V3 dataset. - /// @param[in] base_uri The base URI for the dataset. - /// @param[in] n_arrays The number of data arrays. - /// @param[out] metadata_sinks The sinks created. - [[nodiscard]] bool create_v3_metadata_sinks( + /// @throws std::runtime_error if @p base_uri is not valid, or if, for S3 + /// sinks, the bucket does not exist. + [[nodiscard]] bool make_metadata_sinks( + ZarrVersion version, const std::string& base_uri, size_t n_arrays, std::vector>& metadata_sinks); @@ -75,7 +73,19 @@ struct SinkCreator final std::queue& dir_paths, std::queue& file_paths, std::vector>& metadata_sinks); + + /// @brief Check whether an S3 bucket exists. + /// @param[in] bucket_name The name of the bucket to check. + /// @return True iff the bucket exists. + bool bucket_exists_(std::string_view bucket_name); + + /// @brief Create a collection of S3 objects. + /// @param[in] bucket_name The name of the bucket. + /// @param[in,out] object_keys The keys of the objects to create. + /// @param[out] sinks The sinks created. + [[nodiscard]] bool make_s3_objects_( + std::string_view bucket_name, + std::queue& object_keys, + std::vector>& sinks); }; } // namespace acquire::sink::zarr - -#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 \ No newline at end of file diff --git a/src/writers/sink.hh b/src/writers/sink.hh index 4f64db1d..96b61d37 100644 --- a/src/writers/sink.hh +++ b/src/writers/sink.hh @@ -14,6 +14,8 @@ struct Sink /// @param buf The buffer to write to the sink. /// @param bytes_of_buf The number of bytes to write from @p buf. /// @return True if the write was successful, false otherwise. + /// @throws std::runtime_error if @p buf is nullptr, @p bytes_of_buf is 0, + /// or the write fails. [[nodiscard]] virtual bool write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) = 0; diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp index 1a15a71f..1d6e1b2d 100644 --- a/src/writers/zarrv2.writer.cpp +++ b/src/writers/zarrv2.writer.cpp @@ -22,7 +22,7 @@ zarr::ZarrV2Writer::flush_impl_() // create chunk files CHECK(sinks_.empty()); const std::string data_root = - (fs::path(data_root_) / std::to_string(append_chunk_index_)).string(); + data_root_ + "/" + std::to_string(append_chunk_index_); { SinkCreator creator(thread_pool_, connection_pool_); diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp index 4980e182..e1c982fe 100644 --- a/src/writers/zarrv3.writer.cpp +++ b/src/writers/zarrv3.writer.cpp @@ -30,8 +30,7 @@ zarr::ZarrV3Writer::flush_impl_() { // create shard files if they don't exist const std::string data_root = - (fs::path(data_root_) / ("c" + std::to_string(append_chunk_index_))) - .string(); + data_root_ + "/c" + std::to_string(append_chunk_index_); { SinkCreator creator(thread_pool_, connection_pool_); diff --git a/src/zarr.cpp b/src/zarr.cpp index 92c2bf95..fa191e04 100644 --- a/src/zarr.cpp +++ b/src/zarr.cpp @@ -62,10 +62,11 @@ validate_props(const StorageProperties* props) props->external_metadata_json.nbytes); std::string uri{ props->uri.str, props->uri.nbytes - 1 }; - EXPECT(!uri.starts_with("s3://"), "S3 URIs are not yet supported."); - // check that the URI value points to a writable directory - { + if (common::is_web_uri(uri)) { + std::vector tokens = common::split_uri(uri); + CHECK(tokens.size() > 2); // http://endpoint/bucket + } else { const fs::path path = as_path(*props); fs::path parent_path = path.parent_path().string(); if (parent_path.empty()) @@ -357,12 +358,29 @@ zarr::Zarr::set(const StorageProperties* props) // checks the directory exists and is writable validate_props(props); - // TODO (aliddell): we will eventually support S3 URIs, - // dataset_root_ should be a string - dataset_root_ = as_path(*props).string(); + + std::string uri(props->uri.str, props->uri.nbytes - 1); + + if (common::is_web_uri(uri)) { + dataset_root_ = uri; + } else { + dataset_root_ = as_path(*props).string(); + } + + if (props->access_key_id.str) { + s3_access_key_id_ = std::string(props->access_key_id.str, + props->access_key_id.nbytes - 1); + } + + if (props->secret_access_key.str) { + s3_secret_access_key_ = std::string( + props->secret_access_key.str, props->secret_access_key.nbytes - 1); + } if (props->external_metadata_json.str) { - external_metadata_json_ = props->external_metadata_json.str; + external_metadata_json_ = + std::string(props->external_metadata_json.str, + props->external_metadata_json.nbytes - 1); } pixel_scale_um_ = props->pixel_scale_um; @@ -407,6 +425,29 @@ zarr::Zarr::get(StorageProperties* props) const pixel_scale_um_, acquisition_dimensions_.size())); + // set access key and secret + { + const char* access_key_id = + s3_access_key_id_.has_value() ? s3_access_key_id_->c_str() : nullptr; + const size_t bytes_of_access_key_id = + access_key_id ? s3_access_key_id_->size() + 1 : 0; + + const char* secret_access_key = s3_secret_access_key_.has_value() + ? s3_secret_access_key_->c_str() + : nullptr; + const size_t bytes_of_secret_access_key = + secret_access_key ? s3_secret_access_key_->size() + 1 : 0; + + if (access_key_id && secret_access_key) { + CHECK(storage_properties_set_access_key_and_secret( + props, + access_key_id, + bytes_of_access_key_id, + secret_access_key, + bytes_of_secret_access_key)); + } + } + for (auto i = 0; i < acquisition_dimensions_.size(); ++i) { const auto dim = acquisition_dimensions_.at(i); CHECK(storage_properties_set_dimension(props, @@ -430,6 +471,7 @@ zarr::Zarr::get_meta(StoragePropertyMetadata* meta) const memset(meta, 0, sizeof(*meta)); meta->chunking_is_supported = 1; + meta->s3_is_supported = 1; } void @@ -437,19 +479,30 @@ zarr::Zarr::start() { error_ = true; - if (fs::exists(dataset_root_)) { - std::error_code ec; - EXPECT(fs::remove_all(dataset_root_, ec), - R"(Failed to remove folder for "%s": %s)", - dataset_root_.c_str(), - ec.message().c_str()); - } - fs::create_directories(dataset_root_); - thread_pool_ = std::make_shared( std::thread::hardware_concurrency(), [this](const std::string& err) { this->set_error(err); }); + if (common::is_web_uri(dataset_root_)) { + std::vector tokens = common::split_uri(dataset_root_); + CHECK(tokens.size() > 1); + const std::string endpoint = tokens[0] + "//" + tokens[1]; + connection_pool_ = std::make_shared( + 8, endpoint, *s3_access_key_id_, *s3_secret_access_key_); + } else { + // remove the folder if it exists + if (fs::exists(dataset_root_)) { + std::error_code ec; + EXPECT(fs::remove_all(dataset_root_, ec), + R"(Failed to remove folder for "%s": %s)", + dataset_root_.c_str(), + ec.message().c_str()); + } + + // create the dataset folder + fs::create_directories(dataset_root_); + } + allocate_writers_(); make_metadata_sinks_(); @@ -482,6 +535,8 @@ zarr::Zarr::stop() noexcept thread_pool_->await_stop(); thread_pool_ = nullptr; + connection_pool_ = nullptr; + // don't clear before all working threads have shut down writers_.clear(); diff --git a/src/zarr.hh b/src/zarr.hh index 53745306..ce21aa9a 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -48,6 +48,8 @@ struct Zarr : public Storage /// changes on set std::string dataset_root_; + std::optional s3_access_key_id_; + std::optional s3_secret_access_key_; std::string external_metadata_json_; PixelScale pixel_scale_um_; bool enable_multiscale_; diff --git a/src/zarr.v2.cpp b/src/zarr.v2.cpp index 73b80444..96ec37d6 100644 --- a/src/zarr.v2.cpp +++ b/src/zarr.v2.cpp @@ -103,8 +103,8 @@ void zarr::ZarrV2::make_metadata_sinks_() { SinkCreator creator(thread_pool_, connection_pool_); - CHECK(creator.create_v2_metadata_sinks( - dataset_root_, writers_.size(), metadata_sinks_)); + CHECK(creator.make_metadata_sinks( + ZarrVersion::V2, dataset_root_, writers_.size(), metadata_sinks_)); } void diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp index 612d81df..db46f7bf 100644 --- a/src/zarr.v3.cpp +++ b/src/zarr.v3.cpp @@ -99,9 +99,10 @@ void zarr::ZarrV3::make_metadata_sinks_() { SinkCreator creator(thread_pool_, connection_pool_); - CHECK(creator.create_v3_metadata_sinks( - dataset_root_, writers_.size(), metadata_sinks_)); + CHECK(creator.make_metadata_sinks( + ZarrVersion::V3, dataset_root_, writers_.size(), metadata_sinks_)); } + /// @brief Write the metadata for the dataset. void zarr::ZarrV3::write_base_metadata_() const diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 97b404f7..d73fb521 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,11 +34,13 @@ else () write-zarr-v2-raw-multiscale write-zarr-v2-raw-multiscale-with-trivial-tile-size write-zarr-v2-compressed-multiscale + write-zarr-v2-to-s3 multiscales-metadata write-zarr-v3-raw write-zarr-v3-raw-with-ragged-sharding write-zarr-v3-raw-chunk-exceeds-array write-zarr-v3-compressed + write-zarr-v3-to-s3 ) foreach (name ${tests}) diff --git a/tests/get-meta.cpp b/tests/get-meta.cpp index 324b93ff..161540bb 100644 --- a/tests/get-meta.cpp +++ b/tests/get-meta.cpp @@ -86,6 +86,7 @@ main() CHECK(Device_Ok == storage_get_meta(storage, &metadata)); CHECK(metadata.chunking_is_supported); + CHECK(metadata.s3_is_supported); CHECK((bool)metadata.sharding_is_supported == name.starts_with("ZarrV3")); CHECK((bool)metadata.multiscale_is_supported != diff --git a/tests/unit-tests.cpp b/tests/unit-tests.cpp index 0595080a..e02880b8 100644 --- a/tests/unit-tests.cpp +++ b/tests/unit-tests.cpp @@ -83,6 +83,7 @@ main() const std::vector tests{ #define CASE(e) { .name = #e, .test = (int (*)())lib_load(&lib, #e) } CASE(unit_test__trim), + CASE(unit_test__split_uri), CASE(unit_test__shard_index_for_chunk), CASE(unit_test__shard_internal_index), CASE(unit_test__average_frame), @@ -94,6 +95,7 @@ main() CASE(unit_test__chunk_lattice_index), CASE(unit_test__tile_group_offset), CASE(unit_test__chunk_internal_offset), + CASE(unit_test__writer__write_frame_to_chunks), CASE(unit_test__downsample_writer_config), CASE(unit_test__writer__write_frame_to_chunks), CASE(unit_test__zarrv2_writer__write_even), diff --git a/tests/write-zarr-v2-to-s3.cpp b/tests/write-zarr-v2-to-s3.cpp new file mode 100644 index 00000000..19045573 --- /dev/null +++ b/tests/write-zarr-v2-to-s3.cpp @@ -0,0 +1,363 @@ +/// @file write-zarr-v2-to-s3.cpp +/// @brief Test using Zarr V2 storage with S3 backend. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "logger.h" + +#include + +#include +#include +#include + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that strings a == b +/// example: `ASSERT_STREQ("foo",container_of_foo)` +#define ASSERT_STREQ(a, b) \ + do { \ + std::string a_ = (a); \ + std::string b_ = (b); \ + EXPECT(a_ == b_, \ + "Expected %s==%s but '%s' != '%s'", \ + #a, \ + #b, \ + a_.c_str(), \ + b_.c_str()); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",42,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +namespace { +std::string s3_endpoint; +std::string s3_bucket_name; +std::string s3_access_key_id; +std::string s3_secret_access_key; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + ERR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + ERR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + ERR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + ERR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +bucket_exists(minio::s3::Client& client) +{ + if (s3_bucket_name.empty()) { + return false; + } + + try { + minio::s3::BucketExistsArgs args; + args.bucket = s3_bucket_name; + + minio::s3::BucketExistsResponse response = client.BucketExists(args); + CHECK(response); + + return response.exist; + } catch (const std::exception& e) { + ERR("Failed to check existence of bucket: %s", e.what()); + } + + return false; +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + try { + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + ERR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; + } catch (const std::exception& e) { + ERR("Failed to clear bucket %s: %s", s3_bucket_name.c_str(), e.what()); + } + + return false; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + if (s3_bucket_name.empty() || object_name.empty()) { + return false; + } + + try { + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; + } catch (const std::exception& e) { + ERR("Failed to check if object %s exists in bucket %s: %s", + object_name.c_str(), + s3_bucket_name.c_str(), + e.what()); + } + + return false; +} +} // namespace + +void +configure(AcquireRuntime* runtime) +{ + CHECK(runtime); + + const DeviceManager* dm = acquire_device_manager(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u16; + props.video[0].camera.settings.shape = { .x = 1920, .y = 1080 }; + // we may drop frames with lower exposure + props.video[0].camera.settings.exposure_time_us = 1e4; + + props.video[0].max_frame_count = 100; + + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrBlosc1Lz4ByteShuffle"), + &props.video[0].storage.identifier)); + + // check if the bucket already exists + { + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + + CHECK(bucket_exists(client)); + } + + std::string uri = s3_endpoint + ("/" + s3_bucket_name); + storage_properties_init(&props.video[0].storage.settings, + 0, + uri.c_str(), + uri.length() + 1, + nullptr, + 0, + {}, + 3); + CHECK(storage_properties_set_access_key_and_secret( + &props.video[0].storage.settings, + s3_access_key_id.c_str(), + s3_access_key_id.size() + 1, + s3_secret_access_key.c_str(), + s3_secret_access_key.size() + 1 + )); + + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 0, + SIZED("x") + 1, + DimensionType_Space, + 1920, + 1920, + 1)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 1, + SIZED("y") + 1, + DimensionType_Space, + 1080, + 540, + 2)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 2, + SIZED("t") + 1, + DimensionType_Time, + 0, + 5, + 1)); + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + acquire_start(runtime); + acquire_stop(runtime); +} + +void +validate_and_cleanup(AcquireRuntime* runtime) +{ + std::vector paths{ + ".metadata", + ".zattrs", + "0/.zarray", + "0/.zattrs", + }; + for (auto i = 0; i < 20; ++i) { + paths.push_back("0/" + std::to_string(i) + "/0/0"); + paths.push_back("0/" + std::to_string(i) + "/1/0"); + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + CHECK(bucket_exists(client)); + + try { + for (const auto& path : paths) { + CHECK(object_exists(client, path)); + } + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + remove_items(client, paths); + + CHECK(runtime); + acquire_shutdown(runtime); +} + +int +main() +{ + if (!get_credentials()) { + return 0; + } + + int retval = 1; + AcquireRuntime* runtime = acquire_init(reporter); + + try { + configure(runtime); + acquire(runtime); + validate_and_cleanup(runtime); + retval = 0; + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + + return retval; +} diff --git a/tests/write-zarr-v3-to-s3.cpp b/tests/write-zarr-v3-to-s3.cpp new file mode 100644 index 00000000..3474c053 --- /dev/null +++ b/tests/write-zarr-v3-to-s3.cpp @@ -0,0 +1,361 @@ +/// @file write-zarr-v3-to-s3.cpp +/// @brief Test using Zarr V3 storage with S3 backend. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "logger.h" + +#include + +#include +#include +#include + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that strings a == b +/// example: `ASSERT_STREQ("foo",container_of_foo)` +#define ASSERT_STREQ(a, b) \ + do { \ + std::string a_ = (a); \ + std::string b_ = (b); \ + EXPECT(a_ == b_, \ + "Expected %s==%s but '%s' != '%s'", \ + #a, \ + #b, \ + a_.c_str(), \ + b_.c_str()); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",42,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +namespace { +std::string s3_endpoint; +std::string s3_bucket_name; +std::string s3_access_key_id; +std::string s3_secret_access_key; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + ERR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + ERR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + ERR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + ERR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +bucket_exists(minio::s3::Client& client) +{ + if (s3_bucket_name.empty()) { + return false; + } + + try { + minio::s3::BucketExistsArgs args; + args.bucket = s3_bucket_name; + + minio::s3::BucketExistsResponse response = client.BucketExists(args); + CHECK(response); + + return response.exist; + } catch (const std::exception& e) { + ERR("Failed to check existence of bucket: %s", e.what()); + } + + return false; +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + try { + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + ERR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; + } catch (const std::exception& e) { + ERR("Failed to clear bucket %s: %s", s3_bucket_name.c_str(), e.what()); + } + + return false; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + if (s3_bucket_name.empty() || object_name.empty()) { + return false; + } + + try { + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; + } catch (const std::exception& e) { + ERR("Failed to check if object %s exists in bucket %s: %s", + object_name.c_str(), + s3_bucket_name.c_str(), + e.what()); + } + + return false; +} +} // namespace + +void +configure(AcquireRuntime* runtime) +{ + CHECK(runtime); + + const DeviceManager* dm = acquire_device_manager(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u16; + props.video[0].camera.settings.shape = { .x = 1920, .y = 1080 }; + // we may drop frames with lower exposure + props.video[0].camera.settings.exposure_time_us = 1e4; + + props.video[0].max_frame_count = 100; + + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3Blosc1ZstdByteShuffle"), + &props.video[0].storage.identifier)); + + // check if the bucket already exists + { + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + + CHECK(bucket_exists(client)); + } + + std::string uri = s3_endpoint + ("/" + s3_bucket_name); + storage_properties_init(&props.video[0].storage.settings, + 0, + uri.c_str(), + uri.length() + 1, + nullptr, + 0, + {}, + 3); + CHECK(storage_properties_set_access_key_and_secret( + &props.video[0].storage.settings, + s3_access_key_id.c_str(), + s3_access_key_id.size() + 1, + s3_secret_access_key.c_str(), + s3_secret_access_key.size() + 1 + )); + + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 0, + SIZED("x") + 1, + DimensionType_Space, + 1920, + 1920, + 1)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 1, + SIZED("y") + 1, + DimensionType_Space, + 1080, + 540, + 2)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 2, + SIZED("t") + 1, + DimensionType_Time, + 0, + 5, + 1)); + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + acquire_start(runtime); + acquire_stop(runtime); +} + +void +validate_and_cleanup(AcquireRuntime* runtime) +{ + CHECK(runtime); + + std::vector paths{ "zarr.json", + "meta/root.group.json", + "meta/root/0.array.json" }; + for (auto i = 0; i < 20; ++i) { + paths.push_back("data/root/0/c" + std::to_string(i) + "/0/0"); + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + CHECK(bucket_exists(client)); + + try { + for (const auto& path : paths) { + CHECK(object_exists(client, path)); + } + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + remove_items(client, paths); + + CHECK(runtime); + acquire_shutdown(runtime); +} + +int +main() +{ + if (!get_credentials()) { + return 0; + } + + int retval = 1; + AcquireRuntime* runtime = acquire_init(reporter); + + try { + configure(runtime); + acquire(runtime); + validate_and_cleanup(runtime); + retval = 0; + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + + return retval; +}