diff --git a/CHANGELOG.md b/CHANGELOG.md index 5af8d99c..a3c0155a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Support for writing to S3 buckets. + ### Changed - Calling `acquire_get_configuration` with a Zarr storage device now returns a URI of the storage device, with file:// diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dadac099..236ebadd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -13,6 +13,8 @@ add_library(${tgt}-obj OBJECT common/dimension.cpp common/thread.pool.hh common/thread.pool.cpp + common/s3.connection.hh + common/s3.connection.cpp common/utilities.hh common/utilities.cpp writers/sink.hh @@ -20,6 +22,8 @@ add_library(${tgt}-obj OBJECT writers/sink.creator.cpp writers/file.sink.hh writers/file.sink.cpp + writers/s3.sink.hh + writers/s3.sink.cpp writers/writer.hh writers/writer.cpp writers/zarrv2.writer.hh diff --git a/src/common/s3.connection.cpp b/src/common/s3.connection.cpp new file mode 100644 index 00000000..bb848457 --- /dev/null +++ b/src/common/s3.connection.cpp @@ -0,0 +1,456 @@ +#include "s3.connection.hh" +#include "utilities.hh" + +#include + +#include +#include +#include + +namespace zarr = acquire::sink::zarr; +namespace common = zarr::common; + +common::S3Connection::S3Connection(const std::string& endpoint, + const std::string& access_key_id, + const std::string& secret_access_key) +{ + minio::s3::BaseUrl url(endpoint); + url.https = endpoint.starts_with("https"); + + provider_ = std::make_unique( + access_key_id, secret_access_key); + client_ = std::make_unique(url, provider_.get()); + CHECK(client_); +} + +bool +common::S3Connection::check_connection() +{ + return static_cast(client_->ListBuckets()); +} + +bool +common::S3Connection::bucket_exists(std::string_view bucket_name) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + + minio::s3::BucketExistsArgs args; + args.bucket = bucket_name; + + auto response = client_->BucketExists(args); + return response.exist; +} + +bool +common::S3Connection::object_exists(std::string_view bucket_name, + std::string_view object_name) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + + minio::s3::StatObjectArgs args; + args.bucket = bucket_name; + args.object = object_name; + + auto response = client_->StatObject(args); + // casts to true if response code in 200 range and error message is empty + return static_cast(response); +} + +std::string +common::S3Connection::put_object(std::string_view bucket_name, + std::string_view object_name, + std::span data) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + EXPECT(!data.empty(), "Data must not be empty."); + + minio::utils::CharBuffer buffer((char*)const_cast(data.data()), + data.size()); + std::basic_istream stream(&buffer); + + TRACE( + "Putting object %s in bucket %s", object_name.data(), bucket_name.data()); + minio::s3::PutObjectArgs args(stream, (long)data.size(), 0); + args.bucket = bucket_name; + args.object = object_name; + + auto response = client_->PutObject(args); + if (!response) { + LOGE("Failed to put object %s in bucket %s: %s", + object_name.data(), + bucket_name.data(), + response.Error().String().c_str()); + return {}; + } + + return response.etag; +} + +bool +common::S3Connection::delete_object(std::string_view bucket_name, + std::string_view object_name) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + + TRACE("Deleting object %s from bucket %s", + object_name.data(), + bucket_name.data()); + minio::s3::RemoveObjectArgs args; + args.bucket = bucket_name; + args.object = object_name; + + auto response = client_->RemoveObject(args); + if (!response) { + LOGE("Failed to delete object %s from bucket %s: %s", + object_name.data(), + bucket_name.data(), + response.Error().String().c_str()); + return false; + } + + return true; +} + +std::string +common::S3Connection::create_multipart_object(std::string_view bucket_name, + std::string_view object_name) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + + TRACE("Creating multipart object %s in bucket %s", + object_name.data(), + bucket_name.data()); + minio::s3::CreateMultipartUploadArgs args; + args.bucket = bucket_name; + args.object = object_name; + + auto response = client_->CreateMultipartUpload(args); + CHECK(!response.upload_id.empty()); + + return response.upload_id; +} + +std::string +common::S3Connection::upload_multipart_object_part(std::string_view bucket_name, + std::string_view object_name, + std::string_view upload_id, + std::span data, + unsigned int part_number) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + EXPECT(!data.empty(), "Number of bytes must be positive."); + EXPECT(part_number, "Part number must be positive."); + + TRACE("Uploading multipart object part %zu for object %s in bucket %s", + part_number, + object_name.data(), + bucket_name.data()); + + std::string_view sv(reinterpret_cast(data.data()), + data.size()); + + minio::s3::UploadPartArgs args; + args.bucket = bucket_name; + args.object = object_name; + args.part_number = part_number; + args.upload_id = upload_id; + args.data = sv; + + auto response = client_->UploadPart(args); + if (!response) { + LOGE("Failed to upload part %zu for object %s in bucket %s: %s", + part_number, + object_name.data(), + bucket_name.data(), + response.Error().String().c_str()); + return {}; + } + + return response.etag; +} + +bool +common::S3Connection::complete_multipart_object( + std::string_view bucket_name, + std::string_view object_name, + std::string_view upload_id, + const std::list& parts) +{ + EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); + EXPECT(!object_name.empty(), "Object name must not be empty."); + EXPECT(!upload_id.empty(), "Upload id must not be empty."); + EXPECT(!parts.empty(), "Parts list must not be empty."); + + TRACE("Completing multipart object %s in bucket %s", + object_name.data(), + bucket_name.data()); + minio::s3::CompleteMultipartUploadArgs args; + args.bucket = bucket_name; + args.object = object_name; + args.upload_id = upload_id; + args.parts = parts; + + auto response = client_->CompleteMultipartUpload(args); + if (!response) { + LOGE("Failed to complete multipart object %s in bucket %s: %s", + object_name.data(), + bucket_name.data(), + response.Error().String().c_str()); + return false; + } + + return true; +} + +common::S3ConnectionPool::S3ConnectionPool(size_t n_connections, + const std::string& endpoint, + const std::string& access_key_id, + const std::string& secret_access_key) + : is_accepting_connections_{ true } +{ + for (auto i = 0; i < n_connections; ++i) { + auto connection = std::make_unique( + endpoint, access_key_id, secret_access_key); + + if (connection->check_connection()) { + connections_.push_back(std::make_unique( + endpoint, access_key_id, secret_access_key)); + } + } + + CHECK(!connections_.empty()); +} + +common::S3ConnectionPool::~S3ConnectionPool() noexcept +{ + is_accepting_connections_ = false; + cv_.notify_all(); +} + +std::unique_ptr +common::S3ConnectionPool::get_connection() +{ + std::unique_lock lock(connections_mutex_); + cv_.wait(lock, [this] { return !connections_.empty(); }); + + if (!is_accepting_connections_ || connections_.empty()) { + return nullptr; + } + + auto conn = std::move(connections_.back()); + connections_.pop_back(); + return conn; +} + +void +common::S3ConnectionPool::return_connection( + std::unique_ptr&& conn) +{ + std::scoped_lock lock(connections_mutex_); + connections_.push_back(std::move(conn)); + cv_.notify_one(); +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif + +#include + +namespace { +bool +get_credentials(std::string& endpoint, + std::string& bucket_name, + std::string& access_key_id, + std::string& secret_access_key) +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOGE("ZARR_S3_ENDPOINT not set."); + return false; + } + endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOGE("ZARR_S3_BUCKET_NAME not set."); + return false; + } + bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOGE("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOGE("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + secret_access_key = env; + + return true; +} +} // namespace + +extern "C" +{ + acquire_export int unit_test__s3_connection__put_object() + { + std::string s3_endpoint, bucket_name, s3_access_key_id, + s3_secret_access_key; + if (!get_credentials(s3_endpoint, + bucket_name, + s3_access_key_id, + s3_secret_access_key)) { + return 1; + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + minio::s3::Client client(url, &provider); + + const std::string object_name = "test-object"; + + int retval = 0; + + try { + common::S3Connection conn( + s3_endpoint, s3_access_key_id, s3_secret_access_key); + + CHECK(conn.bucket_exists(bucket_name)); + + CHECK(conn.delete_object(bucket_name, object_name)); + CHECK(!conn.object_exists(bucket_name, object_name)); + + std::vector data(1024, 0); + + std::string etag = + conn.put_object(bucket_name, + object_name, + std::span(data.data(), data.size())); + CHECK(!etag.empty()); + + CHECK(conn.object_exists(bucket_name, object_name)); + + // cleanup + CHECK(conn.delete_object(bucket_name, object_name)); + + retval = 1; + } catch (const std::exception& e) { + LOGE("Failed to create S3 connection: %s", e.what()); + } catch (...) { + LOGE("Failed to create S3 connection: unknown error"); + } + + return retval; + } + + acquire_export int unit_test__s3_connection__upload_multipart_object() + { + std::string s3_endpoint, bucket_name, s3_access_key_id, + s3_secret_access_key; + if (!get_credentials(s3_endpoint, + bucket_name, + s3_access_key_id, + s3_secret_access_key)) { + return 1; + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + minio::s3::Client client(url, &provider); + + const std::string object_name = "test-object"; + + int retval = 0; + + try { + common::S3Connection conn( + s3_endpoint, s3_access_key_id, s3_secret_access_key); + + CHECK(conn.bucket_exists(bucket_name)); + + if (conn.object_exists(bucket_name, object_name)) { + CHECK(conn.delete_object(bucket_name, object_name)); + CHECK(!conn.object_exists(bucket_name, object_name)); + } + + std::string upload_id = + conn.create_multipart_object(bucket_name, object_name); + CHECK(!upload_id.empty()); + + std::list parts; + + // parts need to be at least 5MiB, except the last part + std::vector data(5 << 20, 0); + for (auto i = 0; i < 4; ++i) { + std::string etag = conn.upload_multipart_object_part( + bucket_name, + object_name, + upload_id, + std::span(data.data(), data.size()), + i + 1); + CHECK(!etag.empty()); + + minio::s3::Part part; + part.number = i + 1; + part.etag = etag; + part.size = data.size(); + + parts.push_back(part); + } + + // last part is 1MiB + { + const unsigned int part_number = parts.size() + 1; + const size_t part_size = 1 << 20; // 1MiB + std::string etag = conn.upload_multipart_object_part( + bucket_name, + object_name, + upload_id, + std::span(data.data(), data.size()), + part_number); + CHECK(!etag.empty()); + + minio::s3::Part part; + part.number = part_number; + part.etag = etag; + part.size = part_size; + + parts.push_back(part); + } + + CHECK(conn.complete_multipart_object( + bucket_name, object_name, upload_id, parts)); + + CHECK(conn.object_exists(bucket_name, object_name)); + + // cleanup + CHECK(conn.delete_object(bucket_name, object_name)); + + retval = 1; + } catch (const std::exception& e) { + LOGE("Failed to create S3 connection: %s", e.what()); + } catch (...) { + LOGE("Failed to create S3 connection: unknown error"); + } + + return retval; + } +} +#endif diff --git a/src/common/s3.connection.hh b/src/common/s3.connection.hh new file mode 100644 index 00000000..b966e1f4 --- /dev/null +++ b/src/common/s3.connection.hh @@ -0,0 +1,130 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace acquire::sink::zarr::common { +struct S3Connection final +{ + S3Connection() = delete; + explicit S3Connection(const std::string& endpoint, + const std::string& access_key_id, + const std::string& secret_access_key); + + ~S3Connection() noexcept = default; + + /// @brief Test a connection by listing all buckets at this connection's + /// endpoint. + /// @returns True if the connection is valid, otherwise false. + bool check_connection(); + + // bucket operations + /// @brief Check whether a bucket exists. + /// @param bucket_name The name of the bucket. + /// @returns True if the bucket exists, otherwise false. + /// @throws std::runtime_error if the bucket name is empty. + bool bucket_exists(std::string_view bucket_name); + + // object operations + /// @brief Check whether an object exists. + /// @param bucket_name The name of the bucket containing the object. + /// @param object_name The name of the object. + /// @returns True if the object exists, otherwise false. + /// @throws std::runtime_error if the bucket name is empty or the object + /// name is empty. + bool object_exists(std::string_view bucket_name, + std::string_view object_name); + + /// @brief Put an object. + /// @param bucket_name The name of the bucket to put the object in. + /// @param object_name The name of the object. + /// @param data The data to put in the object. + /// @returns The etag of the object. + /// @throws std::runtime_error if the bucket name is empty, the object name + /// is empty, or @p data is empty. + [[nodiscard]] std::string put_object(std::string_view bucket_name, + std::string_view object_name, + std::span data); + + /// @brief Delete an object. + /// @param bucket_name The name of the bucket containing the object. + /// @param object_name The name of the object. + /// @returns True if the object was successfully deleted, otherwise false. + /// @throws std::runtime_error if the bucket name is empty or the object + /// name is empty. + [[nodiscard]] bool delete_object(std::string_view bucket_name, + std::string_view object_name); + + // multipart object operations + /// @brief Create a multipart object. + /// @param bucket_name The name of the bucket containing the object. + /// @param object_name The name of the object. + /// @returns The upload id of the multipart object. Nonempty if and only if + /// the operation succeeds. + /// @throws std::runtime_error if the bucket name is empty or the object + /// name is empty. + [[nodiscard]] std::string create_multipart_object( + std::string_view bucket_name, + std::string_view object_name); + + /// @brief Upload a part of a multipart object. + /// @param bucket_name The name of the bucket containing the object. + /// @param object_name The name of the object. + /// @param upload_id The upload id of the multipart object. + /// @param data The data to upload. + /// @param part_number The part number of the object. + /// @returns The etag of the uploaded part. Nonempty if and only if the + /// operation is successful. + /// @throws std::runtime_error if the bucket name is empty, the object name + /// is empty, @p data is empty, or @p part_number is 0. + [[nodiscard]] std::string upload_multipart_object_part( + std::string_view bucket_name, + std::string_view object_name, + std::string_view upload_id, + std::span data, + unsigned int part_number); + + /// @brief Complete a multipart object. + /// @param bucket_name The name of the bucket containing the object. + /// @param object_name The name of the object. + /// @param upload_id The upload id of the multipart object. + /// @param parts List of the parts making up the object. + /// @returns True if the object was successfully completed, otherwise false. + [[nodiscard]] bool complete_multipart_object( + std::string_view bucket_name, + std::string_view object_name, + std::string_view upload_id, + const std::list& parts); + + private: + std::unique_ptr client_; + std::unique_ptr provider_; +}; + +struct S3ConnectionPool final +{ + public: + S3ConnectionPool(size_t n_connections, + const std::string& endpoint, + const std::string& access_key_id, + const std::string& secret_access_key); + ~S3ConnectionPool() noexcept; + + std::unique_ptr get_connection(); + void return_connection(std::unique_ptr&& conn); + + private: + std::vector> connections_; + mutable std::mutex connections_mutex_; + std::condition_variable cv_; + + std::atomic is_accepting_connections_; +}; +} // namespace acquire::sink::zarr::common diff --git a/src/common/utilities.cpp b/src/common/utilities.cpp index 63b98f94..769468e6 100644 --- a/src/common/utilities.cpp +++ b/src/common/utilities.cpp @@ -197,23 +197,47 @@ common::split_uri(const std::string& uri) { const char delim = '/'; - size_t begin = 0; - auto end = uri.find_first_of(delim); - std::vector out; + size_t begin = 0, end = uri.find_first_of(delim); + while (end != std::string::npos) { - if (end > begin) { - out.emplace_back(uri.substr(begin, end - begin)); - } + std::string part = uri.substr(begin, end - begin); + if (!part.empty()) + out.push_back(part); begin = end + 1; end = uri.find_first_of(delim, begin); } + + // Add the last segment of the URI (if any) after the last '/' + std::string last_part = uri.substr(begin); + if (!last_part.empty()) { + out.push_back(last_part); + } + return out; } +void +common::parse_path_from_uri(std::string_view uri, + std::string& bucket_name, + std::string& path) +{ + auto parts = split_uri(uri.data()); + EXPECT(parts.size() > 2, "Invalid URI: %s", uri.data()); + + bucket_name = parts[2]; + path = ""; + for (size_t i = 3; i < parts.size(); ++i) { + path += parts[i]; + if (i < parts.size() - 1) { + path += "/"; + } + } +} + bool -common::is_s3_uri(const std::string& uri) +common::is_web_uri(std::string_view uri) { return uri.starts_with("s3://") || uri.starts_with("http://") || uri.starts_with("https://"); @@ -480,6 +504,76 @@ extern "C" return retval; } -} -#endif \ No newline at end of file + acquire_export int unit_test__split_uri() + { + try { + auto parts = common::split_uri("s3://bucket/key"); + CHECK(parts.size() == 3); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + + parts = common::split_uri("s3://bucket/key/"); + CHECK(parts.size() == 3); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + + parts = common::split_uri("s3://bucket/key/with/slashes"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + + parts = common::split_uri("s3://bucket"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://"); + CHECK(parts.size() == 1); + CHECK(parts[0] == "s3:"); + + parts = common::split_uri("s3:///"); + CHECK(parts.size() == 1); + CHECK(parts[0] == "s3:"); + + parts = common::split_uri("s3://bucket/"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://bucket/"); + CHECK(parts.size() == 2); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + + parts = common::split_uri("s3://bucket/key/with/slashes/"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + + parts = common::split_uri("s3://bucket/key/with/slashes//"); + CHECK(parts.size() == 5); + CHECK(parts[0] == "s3:"); + CHECK(parts[1] == "bucket"); + CHECK(parts[2] == "key"); + CHECK(parts[3] == "with"); + CHECK(parts[4] == "slashes"); + return 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + return 0; + } +} +#endif diff --git a/src/common/utilities.hh b/src/common/utilities.hh index 1290c53e..e2b4c8a5 100644 --- a/src/common/utilities.hh +++ b/src/common/utilities.hh @@ -22,8 +22,13 @@ namespace acquire::sink::zarr { struct Zarr; -namespace common { +enum class ZarrVersion +{ + V2 = 2, + V3 +}; +namespace common { /// @brief Get the number of chunks along a dimension. /// @param dimension A dimension. /// @return The number of, possibly ragged, chunks along the dimension, given @@ -99,11 +104,20 @@ align_up(size_t n, size_t align); std::vector split_uri(const std::string& uri); +/// @brief Get the endpoint and bucket name from a URI. +/// @param[in] uri String to parse. +/// @param[out] endpoint The endpoint of the URI. +/// @param[out] bucket_name The bucket name of the URI. +void +parse_path_from_uri(std::string_view uri, + std::string& bucket_name, + std::string& path); + /// @brief Check if a URI is an S3 URI. /// @param uri String to check. /// @return True if the URI is an S3 URI, false otherwise. bool -is_s3_uri(const std::string& uri); +is_web_uri(std::string_view uri); } // namespace acquire::sink::zarr::common } // namespace acquire::sink::zarr diff --git a/src/writers/file.sink.cpp b/src/writers/file.sink.cpp index d5106dcb..375bde9a 100644 --- a/src/writers/file.sink.cpp +++ b/src/writers/file.sink.cpp @@ -16,9 +16,8 @@ zarr::FileSink::FileSink(const std::string& uri) bool zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) { - if (!file_) { - return false; - } + CHECK(buf); + CHECK(bytes_of_buf); return file_write(file_.get(), offset, buf, buf + bytes_of_buf); } diff --git a/src/writers/s3.sink.cpp b/src/writers/s3.sink.cpp new file mode 100644 index 00000000..b746f70c --- /dev/null +++ b/src/writers/s3.sink.cpp @@ -0,0 +1,170 @@ +#include "s3.sink.hh" + +#include "common/utilities.hh" +#include "logger.h" + +#include + +namespace zarr = acquire::sink::zarr; + +zarr::S3Sink::S3Sink(std::string_view bucket_name, + std::string_view object_key, + std::shared_ptr connection_pool) + : bucket_name_{ bucket_name } + , object_key_{ object_key } + , connection_pool_{ connection_pool } +{ + CHECK(!bucket_name_.empty()); + CHECK(!object_key_.empty()); + CHECK(connection_pool_); +} + +zarr::S3Sink::~S3Sink() +{ + if (!is_multipart_upload_() && n_bytes_buffered_ > 0) { + if (!put_object_()) { + LOGE("Failed to upload object: %s", object_key_.c_str()); + } + } else if (is_multipart_upload_()) { + if (n_bytes_buffered_ > 0 && !flush_part_()) { + LOGE("Failed to upload part %zu of object %s", + parts_.size() + 1, + object_key_.c_str()); + } + if (!finalize_multipart_upload_()) { + LOGE("Failed to finalize multipart upload of object %s", + object_key_.c_str()); + } + } +} + +bool +zarr::S3Sink::write(size_t _, const uint8_t* data, size_t bytes_of_data) +{ + CHECK(data); + CHECK(bytes_of_data); + + while (bytes_of_data > 0) { + const auto bytes_to_write = + std::min(bytes_of_data, part_buffer_.size() - n_bytes_buffered_); + + if (bytes_to_write) { + std::copy_n( + data, bytes_to_write, part_buffer_.begin() + n_bytes_buffered_); + n_bytes_buffered_ += bytes_to_write; + data += bytes_to_write; + bytes_of_data -= bytes_to_write; + } + + if (n_bytes_buffered_ == part_buffer_.size() && !flush_part_()) { + return false; + } + } + + return true; +} + +bool +zarr::S3Sink::put_object_() +{ + if (n_bytes_buffered_ == 0) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = false; + try { + std::string etag = + connection->put_object(bucket_name_, + object_key_, + { part_buffer_.data(), n_bytes_buffered_ }); + EXPECT( + !etag.empty(), "Failed to upload object: %s", object_key_.c_str()); + + retval = true; + } catch (const std::exception& exc) { + LOGE("Error: %s", exc.what()); + } + + // cleanup + connection_pool_->return_connection(std::move(connection)); + n_bytes_buffered_ = 0; + + return retval; +} + +bool +zarr::S3Sink::is_multipart_upload_() const +{ + return !upload_id_.empty() && !parts_.empty(); +} + +std::string +zarr::S3Sink::get_multipart_upload_id_() +{ + if (upload_id_.empty()) { + upload_id_ = + connection_pool_->get_connection()->create_multipart_object( + bucket_name_, object_key_); + } + + return upload_id_; +} + +bool +zarr::S3Sink::flush_part_() +{ + if (n_bytes_buffered_ == 0) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = false; + try { + minio::s3::Part part; + part.number = static_cast(parts_.size()) + 1; + + std::span data(part_buffer_.data(), n_bytes_buffered_); + part.etag = + connection->upload_multipart_object_part(bucket_name_, + object_key_, + get_multipart_upload_id_(), + data, + part.number); + EXPECT(!part.etag.empty(), + "Failed to upload part %u of object %s", + part.number, + object_key_.c_str()); + + parts_.push_back(part); + + retval = true; + } catch (const std::exception& exc) { + LOGE("Error: %s", exc.what()); + } + + // cleanup + connection_pool_->return_connection(std::move(connection)); + n_bytes_buffered_ = 0; + + return retval; +} + +bool +zarr::S3Sink::finalize_multipart_upload_() +{ + if (!is_multipart_upload_()) { + return false; + } + + auto connection = connection_pool_->get_connection(); + + bool retval = connection->complete_multipart_object( + bucket_name_, object_key_, upload_id_, parts_); + + connection_pool_->return_connection(std::move(connection)); + + return retval; +} diff --git a/src/writers/s3.sink.hh b/src/writers/s3.sink.hh new file mode 100644 index 00000000..5b12bce5 --- /dev/null +++ b/src/writers/s3.sink.hh @@ -0,0 +1,60 @@ +#pragma once + +#include "sink.hh" +#include "platform.h" +#include "common/s3.connection.hh" + +#include + +#include +#include + +namespace acquire::sink::zarr { +class S3Sink final : public Sink +{ + public: + S3Sink() = delete; + S3Sink(std::string_view bucket_name, + std::string_view object_key, + std::shared_ptr connection_pool); + ~S3Sink() override; + + bool write(size_t offset, + const uint8_t* data, + size_t bytes_of_data) override; + + private: + std::string bucket_name_; + std::string object_key_; + + std::shared_ptr connection_pool_; + + // multipart upload + std::array part_buffer_; /// temporary 5MiB buffer for multipart upload + size_t n_bytes_buffered_ = 0; + + std::string upload_id_; + std::list parts_; + + // single-part upload + /// @brief Upload the object to S3. + /// @returns True if the object was successfully uploaded, otherwise false. + [[nodiscard]] bool put_object_(); + + // multipart upload + bool is_multipart_upload_() const; + + /// @brief Get the multipart upload ID, if it exists. Otherwise, create a new + /// multipart upload. + /// @returns The multipart upload ID. + std::string get_multipart_upload_id_(); + + /// @brief Flush the current part to S3. + /// @returns True if the part was successfully flushed, otherwise false. + [[nodiscard]] bool flush_part_(); + /// @brief Finalize the multipart upload. + /// @returns True if a multipart upload was successfully finalized, + /// otherwise false. + [[nodiscard]] bool finalize_multipart_upload_(); +}; +} // namespace acquire::sink::zarr diff --git a/src/writers/sink.creator.cpp b/src/writers/sink.creator.cpp index b72d0c3a..83c7cce5 100644 --- a/src/writers/sink.creator.cpp +++ b/src/writers/sink.creator.cpp @@ -1,5 +1,6 @@ #include "sink.creator.hh" #include "file.sink.hh" +#include "s3.sink.hh" #include "common/utilities.hh" #include @@ -8,8 +9,11 @@ namespace zarr = acquire::sink::zarr; namespace common = zarr::common; -zarr::SinkCreator::SinkCreator(std::shared_ptr thread_pool_) +zarr::SinkCreator::SinkCreator( + std::shared_ptr thread_pool_, + std::shared_ptr connection_pool) : thread_pool_{ thread_pool_ } + , connection_pool_{ connection_pool } { } @@ -22,14 +26,24 @@ zarr::SinkCreator::make_data_sinks( { std::queue paths; - std::string base_dir = base_uri; - if (base_uri.starts_with("file://")) { - base_dir = base_uri.substr(7); - } - paths.emplace(base_dir); + bool is_s3 = common::is_web_uri(base_uri); + std::string bucket_name; + if (is_s3) { + std::string base_dir; + common::parse_path_from_uri(base_uri, bucket_name, base_dir); - if (!make_dirs_(paths)) { - return false; + paths.push(base_dir); + } else { + std::string base_dir = base_uri; + + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + paths.emplace(base_dir); + + if (!make_dirs_(paths)) { + return false; + } } // create directories @@ -49,7 +63,7 @@ zarr::SinkCreator::make_data_sinks( } } - if (!make_dirs_(paths)) { + if (!is_s3 && !make_dirs_(paths)) { return false; } } @@ -70,56 +84,47 @@ zarr::SinkCreator::make_data_sinks( } } - return make_files_(paths, part_sinks); -} - -bool -zarr::SinkCreator::create_v2_metadata_sinks( - const std::string& base_uri, - size_t n_arrays, - std::vector>& metadata_sinks) -{ - if (base_uri.empty()) { - LOGE("Base URI is empty."); - return false; - } - - std::queue dir_paths, file_paths; - - file_paths.emplace(".metadata"); // base metadata - file_paths.emplace("0/.zattrs"); // external metadata - file_paths.emplace(".zattrs"); // group metadata - - for (auto i = 0; i < n_arrays; ++i) { - const auto idx_string = std::to_string(i); - dir_paths.push(idx_string); - file_paths.push(idx_string + "/.zarray"); // array metadata - } - - return make_metadata_sinks_( - base_uri, dir_paths, file_paths, metadata_sinks); + return is_s3 ? make_s3_objects_(bucket_name, paths, part_sinks) + : make_files_(paths, part_sinks); } bool -zarr::SinkCreator::create_v3_metadata_sinks( +zarr::SinkCreator::make_metadata_sinks( + acquire::sink::zarr::ZarrVersion version, const std::string& base_uri, size_t n_arrays, std::vector>& metadata_sinks) { - if (base_uri.empty()) { - LOGE("Base URI is empty."); - return false; - } + EXPECT(!base_uri.empty(), "URI must not be empty."); std::queue dir_paths, file_paths; - dir_paths.emplace("meta"); - dir_paths.emplace("meta/root"); + switch (version) { + case ZarrVersion::V2: + file_paths.emplace(".metadata"); // base metadata + file_paths.emplace("0/.zattrs"); // external metadata + file_paths.emplace(".zattrs"); // group metadata - file_paths.emplace("zarr.json"); - file_paths.emplace("meta/root.group.json"); - for (auto i = 0; i < n_arrays; ++i) { - file_paths.push("meta/root/" + std::to_string(i) + ".array.json"); + for (auto i = 0; i < n_arrays; ++i) { + const auto idx_string = std::to_string(i); + dir_paths.push(idx_string); + file_paths.push(idx_string + "/.zarray"); // array metadata + } + break; + case ZarrVersion::V3: + dir_paths.emplace("meta"); + dir_paths.emplace("meta/root"); + + file_paths.emplace("zarr.json"); + file_paths.emplace("meta/root.group.json"); + for (auto i = 0; i < n_arrays; ++i) { + file_paths.push("meta/root/" + std::to_string(i) + + ".array.json"); + } + break; + default: + throw std::runtime_error("Invalid Zarr version " + + std::to_string(static_cast(version))); } return make_metadata_sinks_( @@ -236,39 +241,92 @@ zarr::SinkCreator::make_metadata_sinks_( std::queue& file_paths, std::vector>& metadata_sinks) { - std::string base_dir = base_uri; - if (base_uri.starts_with("file://")) { - base_dir = base_uri.substr(7); - } + bool is_s3 = common::is_web_uri(base_uri); + std::string bucket_name; + std::string base_dir; - // remove trailing slashes - if (base_uri.ends_with("/") || base_uri.ends_with("\\")) { - base_dir = base_dir.substr(0, base_dir.size() - 1); - } + if (is_s3) { + common::parse_path_from_uri(base_uri, bucket_name, base_dir); - // create the base directories if they don't already exist - // we create them in serial because - // 1. there are only a few of them; and - // 2. they may be nested - while (!dir_paths.empty()) { - const auto dir = base_dir + "/" + dir_paths.front(); - dir_paths.pop(); - if (!fs::is_directory(dir) && !fs::create_directories(dir)) { + // create the bucket if it doesn't already exist + if (!bucket_exists_(bucket_name)) { return false; } + } else { + base_dir = base_uri; + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + + // remove trailing slashes + if (base_uri.ends_with("/") || base_uri.ends_with("\\")) { + base_dir = base_dir.substr(0, base_dir.size() - 1); + } + + // create the base directories if they don't already exist + // we create them in serial because + // 1. there are only a few of them; and + // 2. they may be nested + while (!dir_paths.empty()) { + const auto dir = base_dir + "/" + dir_paths.front(); + dir_paths.pop(); + if (!fs::is_directory(dir) && !fs::create_directories(dir)) { + return false; + } + } } // make files size_t n_paths = file_paths.size(); + const std::string prefix = base_dir.empty() ? "" : base_dir + "/"; for (auto i = 0; i < n_paths; ++i) { - const auto path = base_dir + "/" + file_paths.front(); + const auto path = prefix + file_paths.front(); file_paths.pop(); file_paths.push(path); } - if (!make_files_(file_paths, metadata_sinks)) { + return is_s3 ? make_s3_objects_(bucket_name, file_paths, metadata_sinks) + : make_files_(file_paths, metadata_sinks); +} + +bool +zarr::SinkCreator::bucket_exists_(std::string_view bucket_name) +{ + CHECK(!bucket_name.empty()); + EXPECT(connection_pool_, "S3 connection pool not provided."); + + auto conn = connection_pool_->get_connection(); + bool bucket_exists = conn->bucket_exists(bucket_name); + + connection_pool_->return_connection(std::move(conn)); + + return bucket_exists; +} + +bool +zarr::SinkCreator::make_s3_objects_(std::string_view bucket_name, + std::queue& object_keys, + std::vector>& sinks) +{ + if (object_keys.empty()) { + return true; + } + if (bucket_name.empty()) { + LOGE("Bucket name not provided."); return false; } + if (!connection_pool_) { + LOGE("S3 connection pool not provided."); + return false; + } + + const auto n_objects = object_keys.size(); + sinks.resize(n_objects); + for (auto i = 0; i < n_objects; ++i) { + sinks[i] = std::make_unique( + bucket_name, object_keys.front(), connection_pool_); + object_keys.pop(); + } return true; } @@ -291,7 +349,7 @@ extern "C" auto thread_pool = std::make_shared( std::thread::hardware_concurrency(), [](const std::string& err) { LOGE("Error: %s\n", err.c_str()); }); - zarr::SinkCreator creator{ thread_pool }; + zarr::SinkCreator creator{ thread_pool, nullptr }; std::vector dims; dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks @@ -337,7 +395,7 @@ extern "C" auto thread_pool = std::make_shared( std::thread::hardware_concurrency(), [](const std::string& err) { LOGE("Error: %s", err.c_str()); }); - zarr::SinkCreator creator{ thread_pool }; + zarr::SinkCreator creator{ thread_pool, nullptr }; std::vector dims; dims.emplace_back( diff --git a/src/writers/sink.creator.hh b/src/writers/sink.creator.hh index 29797db1..01c04d78 100644 --- a/src/writers/sink.creator.hh +++ b/src/writers/sink.creator.hh @@ -1,9 +1,10 @@ -#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 -#define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 +#pragma once #include "sink.hh" +#include "common/utilities.hh" #include "common/dimension.hh" #include "common/thread.pool.hh" +#include "common/s3.connection.hh" #include #include @@ -13,7 +14,8 @@ struct SinkCreator final { public: SinkCreator() = delete; - explicit SinkCreator(std::shared_ptr thread_pool); + SinkCreator(std::shared_ptr thread_pool_, + std::shared_ptr connection_pool); ~SinkCreator() noexcept = default; /// @brief Create a collection of data sinks, either chunk or shard. @@ -22,32 +24,31 @@ struct SinkCreator final /// @param[in] parts_along_dimension Function for computing the number of /// parts (either chunk or shard) along each dimension. /// @param[out] part_sinks The sinks created. + /// @throws std::runtime_error if @p base_uri is not valid, if the number of + /// parts along a dimension cannot be computed, or if, for S3 sinks, + /// the bucket does not exist. [[nodiscard]] bool make_data_sinks( const std::string& base_uri, const std::vector& dimensions, const std::function& parts_along_dimension, std::vector>& part_sinks); - /// @brief Create a collection of metadata sinks for a Zarr V2 dataset. + /// @brief Create a collection of metadata sinks for a Zarr dataset. + /// @param[in] version The Zarr version. /// @param[in] base_uri The base URI for the dataset. /// @param[in] n_arrays The number of data arrays. /// @param[out] metadata_sinks The sinks created. - [[nodiscard]] bool create_v2_metadata_sinks( - const std::string& base_uri, - size_t n_arrays, - std::vector>& metadata_sinks); - - /// @brief Create a collection of metadata sinks for a Zarr V3 dataset. - /// @param[in] base_uri The base URI for the dataset. - /// @param[in] n_arrays The number of data arrays. - /// @param[out] metadata_sinks The sinks created. - [[nodiscard]] bool create_v3_metadata_sinks( + /// @throws std::runtime_error if @p base_uri is not valid, or if, for S3 + /// sinks, the bucket does not exist. + [[nodiscard]] bool make_metadata_sinks( + ZarrVersion version, const std::string& base_uri, size_t n_arrays, std::vector>& metadata_sinks); private: std::shared_ptr thread_pool_; + std::shared_ptr connection_pool_; // could be null /// @brief Parallel create a collection of directories. /// @param[in] dir_paths The directories to create. @@ -72,7 +73,19 @@ struct SinkCreator final std::queue& dir_paths, std::queue& file_paths, std::vector>& metadata_sinks); + + /// @brief Check whether an S3 bucket exists. + /// @param[in] bucket_name The name of the bucket to check. + /// @return True iff the bucket exists. + bool bucket_exists_(std::string_view bucket_name); + + /// @brief Create a collection of S3 objects. + /// @param[in] bucket_name The name of the bucket. + /// @param[in,out] object_keys The keys of the objects to create. + /// @param[out] sinks The sinks created. + [[nodiscard]] bool make_s3_objects_( + std::string_view bucket_name, + std::queue& object_keys, + std::vector>& sinks); }; } // namespace acquire::sink::zarr - -#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 \ No newline at end of file diff --git a/src/writers/sink.hh b/src/writers/sink.hh index 4f64db1d..96b61d37 100644 --- a/src/writers/sink.hh +++ b/src/writers/sink.hh @@ -14,6 +14,8 @@ struct Sink /// @param buf The buffer to write to the sink. /// @param bytes_of_buf The number of bytes to write from @p buf. /// @return True if the write was successful, false otherwise. + /// @throws std::runtime_error if @p buf is nullptr, @p bytes_of_buf is 0, + /// or the write fails. [[nodiscard]] virtual bool write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) = 0; diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index 2c90b422..3bfb9dfe 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -1,10 +1,10 @@ -#include #include "writer.hh" #include "common/utilities.hh" #include #include #include +#include namespace zarr = acquire::sink::zarr; @@ -177,9 +177,11 @@ zarr::downsample(const WriterConfig& config, WriterConfig& downsampled_config) /// Writer zarr::Writer::Writer(const WriterConfig& config, - std::shared_ptr thread_pool) + std::shared_ptr thread_pool, + std::shared_ptr connection_pool) : config_{ config } , thread_pool_{ thread_pool } + , connection_pool_{ connection_pool } , bytes_to_flush_{ 0 } , frames_written_{ 0 } , append_chunk_index_{ 0 } @@ -450,7 +452,7 @@ class TestWriter : public zarr::Writer public: TestWriter(const zarr::WriterConfig& array_spec, std::shared_ptr thread_pool) - : zarr::Writer(array_spec, thread_pool) + : zarr::Writer(array_spec, thread_pool, nullptr) { } diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 93f50199..cfdf6939 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -6,6 +6,7 @@ #include "common/dimension.hh" #include "common/thread.pool.hh" +#include "common/s3.connection.hh" #include "blosc.compressor.hh" #include "file.sink.hh" @@ -40,7 +41,8 @@ struct Writer public: Writer() = delete; Writer(const WriterConfig& config, - std::shared_ptr thread_pool); + std::shared_ptr thread_pool, + std::shared_ptr connection_pool); virtual ~Writer() noexcept = default; @@ -71,6 +73,8 @@ struct Writer uint32_t append_chunk_index_; bool is_finalizing_; + std::shared_ptr connection_pool_; + void make_buffers_() noexcept; size_t write_frame_to_chunks_(const uint8_t* buf, size_t buf_size); bool should_flush_() const; diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp index 9ef50305..1d6e1b2d 100644 --- a/src/writers/zarrv2.writer.cpp +++ b/src/writers/zarrv2.writer.cpp @@ -10,8 +10,9 @@ namespace zarr = acquire::sink::zarr; zarr::ZarrV2Writer::ZarrV2Writer( const WriterConfig& config, - std::shared_ptr thread_pool) - : Writer(config, thread_pool) + std::shared_ptr thread_pool, + std::shared_ptr connection_pool) + : Writer(config, thread_pool, connection_pool) { } @@ -21,10 +22,10 @@ zarr::ZarrV2Writer::flush_impl_() // create chunk files CHECK(sinks_.empty()); const std::string data_root = - (fs::path(data_root_) / std::to_string(append_chunk_index_)).string(); + data_root_ + "/" + std::to_string(append_chunk_index_); { - SinkCreator creator(thread_pool_); + SinkCreator creator(thread_pool_, connection_pool_); if (!creator.make_data_sinks(data_root, config_.dimensions, common::chunks_along_dimension, @@ -151,7 +152,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(config, thread_pool); + zarr::ZarrV2Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); @@ -276,7 +278,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(config, thread_pool); + zarr::ZarrV2Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); @@ -388,7 +391,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(config, thread_pool); + zarr::ZarrV2Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); diff --git a/src/writers/zarrv2.writer.hh b/src/writers/zarrv2.writer.hh index c11baa2c..7a20451e 100644 --- a/src/writers/zarrv2.writer.hh +++ b/src/writers/zarrv2.writer.hh @@ -21,7 +21,8 @@ struct ZarrV2Writer final : public Writer public: ZarrV2Writer() = delete; ZarrV2Writer(const WriterConfig& config, - std::shared_ptr thread_pool); + std::shared_ptr thread_pool, + std::shared_ptr connection_pool); ~ZarrV2Writer() override = default; diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp index 57a97008..e1c982fe 100644 --- a/src/writers/zarrv3.writer.cpp +++ b/src/writers/zarrv3.writer.cpp @@ -9,8 +9,9 @@ namespace zarr = acquire::sink::zarr; zarr::ZarrV3Writer::ZarrV3Writer( const WriterConfig& array_spec, - std::shared_ptr thread_pool) - : Writer(array_spec, thread_pool) + std::shared_ptr thread_pool, + std::shared_ptr connection_pool) + : Writer(array_spec, thread_pool, connection_pool) , shard_file_offsets_(common::number_of_shards(array_spec.dimensions), 0) , shard_tables_{ common::number_of_shards(array_spec.dimensions) } { @@ -29,11 +30,10 @@ zarr::ZarrV3Writer::flush_impl_() { // create shard files if they don't exist const std::string data_root = - (fs::path(data_root_) / ("c" + std::to_string(append_chunk_index_))) - .string(); + data_root_ + "/c" + std::to_string(append_chunk_index_); { - SinkCreator creator(thread_pool_); + SinkCreator creator(thread_pool_, connection_pool_); if (sinks_.empty() && !creator.make_data_sinks(data_root, config_.dimensions, @@ -243,7 +243,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(config, thread_pool); + zarr::ZarrV3Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); @@ -391,7 +392,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(config, thread_pool); + zarr::ZarrV3Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); @@ -531,7 +533,8 @@ extern "C" .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(config, thread_pool); + zarr::ZarrV3Writer writer( + config, thread_pool, std::shared_ptr()); const size_t frame_size = array_width * array_height * nbytes_px; std::vector data(frame_size, 0); diff --git a/src/writers/zarrv3.writer.hh b/src/writers/zarrv3.writer.hh index 4243b79e..8f9a5d6a 100644 --- a/src/writers/zarrv3.writer.hh +++ b/src/writers/zarrv3.writer.hh @@ -21,7 +21,8 @@ struct ZarrV3Writer final : public Writer public: ZarrV3Writer() = delete; ZarrV3Writer(const WriterConfig& array_spec, - std::shared_ptr thread_pool); + std::shared_ptr thread_pool, + std::shared_ptr connection_pool); ~ZarrV3Writer() override = default; diff --git a/src/zarr.cpp b/src/zarr.cpp index 92c2bf95..fa191e04 100644 --- a/src/zarr.cpp +++ b/src/zarr.cpp @@ -62,10 +62,11 @@ validate_props(const StorageProperties* props) props->external_metadata_json.nbytes); std::string uri{ props->uri.str, props->uri.nbytes - 1 }; - EXPECT(!uri.starts_with("s3://"), "S3 URIs are not yet supported."); - // check that the URI value points to a writable directory - { + if (common::is_web_uri(uri)) { + std::vector tokens = common::split_uri(uri); + CHECK(tokens.size() > 2); // http://endpoint/bucket + } else { const fs::path path = as_path(*props); fs::path parent_path = path.parent_path().string(); if (parent_path.empty()) @@ -357,12 +358,29 @@ zarr::Zarr::set(const StorageProperties* props) // checks the directory exists and is writable validate_props(props); - // TODO (aliddell): we will eventually support S3 URIs, - // dataset_root_ should be a string - dataset_root_ = as_path(*props).string(); + + std::string uri(props->uri.str, props->uri.nbytes - 1); + + if (common::is_web_uri(uri)) { + dataset_root_ = uri; + } else { + dataset_root_ = as_path(*props).string(); + } + + if (props->access_key_id.str) { + s3_access_key_id_ = std::string(props->access_key_id.str, + props->access_key_id.nbytes - 1); + } + + if (props->secret_access_key.str) { + s3_secret_access_key_ = std::string( + props->secret_access_key.str, props->secret_access_key.nbytes - 1); + } if (props->external_metadata_json.str) { - external_metadata_json_ = props->external_metadata_json.str; + external_metadata_json_ = + std::string(props->external_metadata_json.str, + props->external_metadata_json.nbytes - 1); } pixel_scale_um_ = props->pixel_scale_um; @@ -407,6 +425,29 @@ zarr::Zarr::get(StorageProperties* props) const pixel_scale_um_, acquisition_dimensions_.size())); + // set access key and secret + { + const char* access_key_id = + s3_access_key_id_.has_value() ? s3_access_key_id_->c_str() : nullptr; + const size_t bytes_of_access_key_id = + access_key_id ? s3_access_key_id_->size() + 1 : 0; + + const char* secret_access_key = s3_secret_access_key_.has_value() + ? s3_secret_access_key_->c_str() + : nullptr; + const size_t bytes_of_secret_access_key = + secret_access_key ? s3_secret_access_key_->size() + 1 : 0; + + if (access_key_id && secret_access_key) { + CHECK(storage_properties_set_access_key_and_secret( + props, + access_key_id, + bytes_of_access_key_id, + secret_access_key, + bytes_of_secret_access_key)); + } + } + for (auto i = 0; i < acquisition_dimensions_.size(); ++i) { const auto dim = acquisition_dimensions_.at(i); CHECK(storage_properties_set_dimension(props, @@ -430,6 +471,7 @@ zarr::Zarr::get_meta(StoragePropertyMetadata* meta) const memset(meta, 0, sizeof(*meta)); meta->chunking_is_supported = 1; + meta->s3_is_supported = 1; } void @@ -437,19 +479,30 @@ zarr::Zarr::start() { error_ = true; - if (fs::exists(dataset_root_)) { - std::error_code ec; - EXPECT(fs::remove_all(dataset_root_, ec), - R"(Failed to remove folder for "%s": %s)", - dataset_root_.c_str(), - ec.message().c_str()); - } - fs::create_directories(dataset_root_); - thread_pool_ = std::make_shared( std::thread::hardware_concurrency(), [this](const std::string& err) { this->set_error(err); }); + if (common::is_web_uri(dataset_root_)) { + std::vector tokens = common::split_uri(dataset_root_); + CHECK(tokens.size() > 1); + const std::string endpoint = tokens[0] + "//" + tokens[1]; + connection_pool_ = std::make_shared( + 8, endpoint, *s3_access_key_id_, *s3_secret_access_key_); + } else { + // remove the folder if it exists + if (fs::exists(dataset_root_)) { + std::error_code ec; + EXPECT(fs::remove_all(dataset_root_, ec), + R"(Failed to remove folder for "%s": %s)", + dataset_root_.c_str(), + ec.message().c_str()); + } + + // create the dataset folder + fs::create_directories(dataset_root_); + } + allocate_writers_(); make_metadata_sinks_(); @@ -482,6 +535,8 @@ zarr::Zarr::stop() noexcept thread_pool_->await_stop(); thread_pool_ = nullptr; + connection_pool_ = nullptr; + // don't clear before all working threads have shut down writers_.clear(); diff --git a/src/zarr.hh b/src/zarr.hh index a85eb663..ce21aa9a 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -5,6 +5,7 @@ #include "common/utilities.hh" #include "common/thread.pool.hh" +#include "common/s3.connection.hh" #include "writers/writer.hh" #include "writers/blosc.compressor.hh" @@ -47,6 +48,8 @@ struct Zarr : public Storage /// changes on set std::string dataset_root_; + std::optional s3_access_key_id_; + std::optional s3_secret_access_key_; std::string external_metadata_json_; PixelScale pixel_scale_um_; bool enable_multiscale_; @@ -60,11 +63,14 @@ struct Zarr : public Storage // scaled frames, keyed by level-of-detail std::unordered_map> scaled_frames_; - // changes on flush + /// changes on start + std::shared_ptr thread_pool_; + std::shared_ptr connection_pool_; + + /// changes on flush std::vector> metadata_sinks_; /// Multithreading - std::shared_ptr thread_pool_; mutable std::mutex mutex_; // for error_ / error_msg_ /// Error state diff --git a/src/zarr.v2.cpp b/src/zarr.v2.cpp index 175ab9be..96ec37d6 100644 --- a/src/zarr.v2.cpp +++ b/src/zarr.v2.cpp @@ -79,7 +79,8 @@ zarr::ZarrV2::allocate_writers_() .compression_params = blosc_compression_params_, }; - writers_.push_back(std::make_shared(config, thread_pool_)); + writers_.push_back( + std::make_shared(config, thread_pool_, connection_pool_)); if (enable_multiscale_) { WriterConfig downsampled_config; @@ -88,8 +89,8 @@ zarr::ZarrV2::allocate_writers_() int level = 1; while (do_downsample) { do_downsample = downsample(config, downsampled_config); - writers_.push_back( - std::make_shared(downsampled_config, thread_pool_)); + writers_.push_back(std::make_shared( + downsampled_config, thread_pool_, connection_pool_)); scaled_frames_.emplace(level++, std::nullopt); config = std::move(downsampled_config); @@ -101,9 +102,9 @@ zarr::ZarrV2::allocate_writers_() void zarr::ZarrV2::make_metadata_sinks_() { - SinkCreator creator(thread_pool_); - CHECK(creator.create_v2_metadata_sinks( - dataset_root_, writers_.size(), metadata_sinks_)); + SinkCreator creator(thread_pool_, connection_pool_); + CHECK(creator.make_metadata_sinks( + ZarrVersion::V2, dataset_root_, writers_.size(), metadata_sinks_)); } void diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp index 840a2c35..db46f7bf 100644 --- a/src/zarr.v3.cpp +++ b/src/zarr.v3.cpp @@ -67,7 +67,8 @@ zarr::ZarrV3::allocate_writers_() .compression_params = blosc_compression_params_, }; - writers_.push_back(std::make_shared(config, thread_pool_)); + writers_.push_back( + std::make_shared(config, thread_pool_, connection_pool_)); if (enable_multiscale_) { WriterConfig downsampled_config; @@ -76,8 +77,8 @@ zarr::ZarrV3::allocate_writers_() int level = 1; while (do_downsample) { do_downsample = downsample(config, downsampled_config); - writers_.push_back( - std::make_shared(downsampled_config, thread_pool_)); + writers_.push_back(std::make_shared( + downsampled_config, thread_pool_, connection_pool_)); scaled_frames_.emplace(level++, std::nullopt); config = std::move(downsampled_config); @@ -97,10 +98,11 @@ zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const void zarr::ZarrV3::make_metadata_sinks_() { - SinkCreator creator(thread_pool_); - CHECK(creator.create_v3_metadata_sinks( - dataset_root_, writers_.size(), metadata_sinks_)); + SinkCreator creator(thread_pool_, connection_pool_); + CHECK(creator.make_metadata_sinks( + ZarrVersion::V3, dataset_root_, writers_.size(), metadata_sinks_)); } + /// @brief Write the metadata for the dataset. void zarr::ZarrV3::write_base_metadata_() const diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4b68de2c..d73fb521 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -34,11 +34,13 @@ else () write-zarr-v2-raw-multiscale write-zarr-v2-raw-multiscale-with-trivial-tile-size write-zarr-v2-compressed-multiscale + write-zarr-v2-to-s3 multiscales-metadata write-zarr-v3-raw write-zarr-v3-raw-with-ragged-sharding write-zarr-v3-raw-chunk-exceeds-array write-zarr-v3-compressed + write-zarr-v3-to-s3 ) foreach (name ${tests}) @@ -54,6 +56,7 @@ else () acquire-core-platform acquire-video-runtime nlohmann_json::nlohmann_json + miniocpp::miniocpp ) add_test(NAME test-${tgt} COMMAND ${tgt}) diff --git a/tests/README.md b/tests/README.md index 75529f33..8335e9b0 100644 --- a/tests/README.md +++ b/tests/README.md @@ -8,3 +8,15 @@ ctest -L acquire-driver-zarr --output-on-failure You can disable unit tests by setting `-DNO_UNIT_TESTS=ON` when configuring the project. You can disable testing altogether by setting `-DNOTEST=ON`. + +## Testing S3 components + +To test the S3 components, you need to set the following environment variables +to point to your S3 bucket: + +- ZARR_S3_ENDPOINT +- ZARR_S3_BUCKET_NAME +- ZARR_S3_ACCESS_KEY_ID +- ZARR_S3_SECRET_ACCESS_KEY + +with the appropriate values. diff --git a/tests/get-meta.cpp b/tests/get-meta.cpp index 324b93ff..161540bb 100644 --- a/tests/get-meta.cpp +++ b/tests/get-meta.cpp @@ -86,6 +86,7 @@ main() CHECK(Device_Ok == storage_get_meta(storage, &metadata)); CHECK(metadata.chunking_is_supported); + CHECK(metadata.s3_is_supported); CHECK((bool)metadata.sharding_is_supported == name.starts_with("ZarrV3")); CHECK((bool)metadata.multiscale_is_supported != diff --git a/tests/unit-tests.cpp b/tests/unit-tests.cpp index 08b7d49a..e02880b8 100644 --- a/tests/unit-tests.cpp +++ b/tests/unit-tests.cpp @@ -83,15 +83,19 @@ main() const std::vector tests{ #define CASE(e) { .name = #e, .test = (int (*)())lib_load(&lib, #e) } CASE(unit_test__trim), + CASE(unit_test__split_uri), CASE(unit_test__shard_index_for_chunk), CASE(unit_test__shard_internal_index), CASE(unit_test__average_frame), CASE(unit_test__thread_pool__push_to_job_queue), + CASE(unit_test__s3_connection__put_object), + CASE(unit_test__s3_connection__upload_multipart_object), CASE(unit_test__sink_creator__create_chunk_file_sinks), CASE(unit_test__sink_creator__create_shard_file_sinks), CASE(unit_test__chunk_lattice_index), CASE(unit_test__tile_group_offset), CASE(unit_test__chunk_internal_offset), + CASE(unit_test__writer__write_frame_to_chunks), CASE(unit_test__downsample_writer_config), CASE(unit_test__writer__write_frame_to_chunks), CASE(unit_test__zarrv2_writer__write_even), diff --git a/tests/write-zarr-v2-to-s3.cpp b/tests/write-zarr-v2-to-s3.cpp new file mode 100644 index 00000000..19045573 --- /dev/null +++ b/tests/write-zarr-v2-to-s3.cpp @@ -0,0 +1,363 @@ +/// @file write-zarr-v2-to-s3.cpp +/// @brief Test using Zarr V2 storage with S3 backend. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "logger.h" + +#include + +#include +#include +#include + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that strings a == b +/// example: `ASSERT_STREQ("foo",container_of_foo)` +#define ASSERT_STREQ(a, b) \ + do { \ + std::string a_ = (a); \ + std::string b_ = (b); \ + EXPECT(a_ == b_, \ + "Expected %s==%s but '%s' != '%s'", \ + #a, \ + #b, \ + a_.c_str(), \ + b_.c_str()); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",42,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +namespace { +std::string s3_endpoint; +std::string s3_bucket_name; +std::string s3_access_key_id; +std::string s3_secret_access_key; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + ERR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + ERR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + ERR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + ERR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +bucket_exists(minio::s3::Client& client) +{ + if (s3_bucket_name.empty()) { + return false; + } + + try { + minio::s3::BucketExistsArgs args; + args.bucket = s3_bucket_name; + + minio::s3::BucketExistsResponse response = client.BucketExists(args); + CHECK(response); + + return response.exist; + } catch (const std::exception& e) { + ERR("Failed to check existence of bucket: %s", e.what()); + } + + return false; +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + try { + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + ERR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; + } catch (const std::exception& e) { + ERR("Failed to clear bucket %s: %s", s3_bucket_name.c_str(), e.what()); + } + + return false; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + if (s3_bucket_name.empty() || object_name.empty()) { + return false; + } + + try { + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; + } catch (const std::exception& e) { + ERR("Failed to check if object %s exists in bucket %s: %s", + object_name.c_str(), + s3_bucket_name.c_str(), + e.what()); + } + + return false; +} +} // namespace + +void +configure(AcquireRuntime* runtime) +{ + CHECK(runtime); + + const DeviceManager* dm = acquire_device_manager(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u16; + props.video[0].camera.settings.shape = { .x = 1920, .y = 1080 }; + // we may drop frames with lower exposure + props.video[0].camera.settings.exposure_time_us = 1e4; + + props.video[0].max_frame_count = 100; + + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrBlosc1Lz4ByteShuffle"), + &props.video[0].storage.identifier)); + + // check if the bucket already exists + { + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + + CHECK(bucket_exists(client)); + } + + std::string uri = s3_endpoint + ("/" + s3_bucket_name); + storage_properties_init(&props.video[0].storage.settings, + 0, + uri.c_str(), + uri.length() + 1, + nullptr, + 0, + {}, + 3); + CHECK(storage_properties_set_access_key_and_secret( + &props.video[0].storage.settings, + s3_access_key_id.c_str(), + s3_access_key_id.size() + 1, + s3_secret_access_key.c_str(), + s3_secret_access_key.size() + 1 + )); + + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 0, + SIZED("x") + 1, + DimensionType_Space, + 1920, + 1920, + 1)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 1, + SIZED("y") + 1, + DimensionType_Space, + 1080, + 540, + 2)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 2, + SIZED("t") + 1, + DimensionType_Time, + 0, + 5, + 1)); + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + acquire_start(runtime); + acquire_stop(runtime); +} + +void +validate_and_cleanup(AcquireRuntime* runtime) +{ + std::vector paths{ + ".metadata", + ".zattrs", + "0/.zarray", + "0/.zattrs", + }; + for (auto i = 0; i < 20; ++i) { + paths.push_back("0/" + std::to_string(i) + "/0/0"); + paths.push_back("0/" + std::to_string(i) + "/1/0"); + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + CHECK(bucket_exists(client)); + + try { + for (const auto& path : paths) { + CHECK(object_exists(client, path)); + } + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + remove_items(client, paths); + + CHECK(runtime); + acquire_shutdown(runtime); +} + +int +main() +{ + if (!get_credentials()) { + return 0; + } + + int retval = 1; + AcquireRuntime* runtime = acquire_init(reporter); + + try { + configure(runtime); + acquire(runtime); + validate_and_cleanup(runtime); + retval = 0; + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + + return retval; +} diff --git a/tests/write-zarr-v3-to-s3.cpp b/tests/write-zarr-v3-to-s3.cpp new file mode 100644 index 00000000..3474c053 --- /dev/null +++ b/tests/write-zarr-v3-to-s3.cpp @@ -0,0 +1,361 @@ +/// @file write-zarr-v3-to-s3.cpp +/// @brief Test using Zarr V3 storage with S3 backend. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "logger.h" + +#include + +#include +#include +#include + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that strings a == b +/// example: `ASSERT_STREQ("foo",container_of_foo)` +#define ASSERT_STREQ(a, b) \ + do { \ + std::string a_ = (a); \ + std::string b_ = (b); \ + EXPECT(a_ == b_, \ + "Expected %s==%s but '%s' != '%s'", \ + #a, \ + #b, \ + a_.c_str(), \ + b_.c_str()); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",42,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +namespace { +std::string s3_endpoint; +std::string s3_bucket_name; +std::string s3_access_key_id; +std::string s3_secret_access_key; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + ERR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + ERR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + ERR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + ERR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +bucket_exists(minio::s3::Client& client) +{ + if (s3_bucket_name.empty()) { + return false; + } + + try { + minio::s3::BucketExistsArgs args; + args.bucket = s3_bucket_name; + + minio::s3::BucketExistsResponse response = client.BucketExists(args); + CHECK(response); + + return response.exist; + } catch (const std::exception& e) { + ERR("Failed to check existence of bucket: %s", e.what()); + } + + return false; +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + try { + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + ERR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; + } catch (const std::exception& e) { + ERR("Failed to clear bucket %s: %s", s3_bucket_name.c_str(), e.what()); + } + + return false; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + if (s3_bucket_name.empty() || object_name.empty()) { + return false; + } + + try { + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; + } catch (const std::exception& e) { + ERR("Failed to check if object %s exists in bucket %s: %s", + object_name.c_str(), + s3_bucket_name.c_str(), + e.what()); + } + + return false; +} +} // namespace + +void +configure(AcquireRuntime* runtime) +{ + CHECK(runtime); + + const DeviceManager* dm = acquire_device_manager(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u16; + props.video[0].camera.settings.shape = { .x = 1920, .y = 1080 }; + // we may drop frames with lower exposure + props.video[0].camera.settings.exposure_time_us = 1e4; + + props.video[0].max_frame_count = 100; + + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3Blosc1ZstdByteShuffle"), + &props.video[0].storage.identifier)); + + // check if the bucket already exists + { + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + + CHECK(bucket_exists(client)); + } + + std::string uri = s3_endpoint + ("/" + s3_bucket_name); + storage_properties_init(&props.video[0].storage.settings, + 0, + uri.c_str(), + uri.length() + 1, + nullptr, + 0, + {}, + 3); + CHECK(storage_properties_set_access_key_and_secret( + &props.video[0].storage.settings, + s3_access_key_id.c_str(), + s3_access_key_id.size() + 1, + s3_secret_access_key.c_str(), + s3_secret_access_key.size() + 1 + )); + + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 0, + SIZED("x") + 1, + DimensionType_Space, + 1920, + 1920, + 1)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 1, + SIZED("y") + 1, + DimensionType_Space, + 1080, + 540, + 2)); + CHECK(storage_properties_set_dimension(&props.video[0].storage.settings, + 2, + SIZED("t") + 1, + DimensionType_Time, + 0, + 5, + 1)); + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + acquire_start(runtime); + acquire_stop(runtime); +} + +void +validate_and_cleanup(AcquireRuntime* runtime) +{ + CHECK(runtime); + + std::vector paths{ "zarr.json", + "meta/root.group.json", + "meta/root/0.array.json" }; + for (auto i = 0; i < 20; ++i) { + paths.push_back("data/root/0/c" + std::to_string(i) + "/0/0"); + } + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + + minio::s3::Client client(url, &provider); + CHECK(bucket_exists(client)); + + try { + for (const auto& path : paths) { + CHECK(object_exists(client, path)); + } + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + remove_items(client, paths); + + CHECK(runtime); + acquire_shutdown(runtime); +} + +int +main() +{ + if (!get_credentials()) { + return 0; + } + + int retval = 1; + AcquireRuntime* runtime = acquire_init(reporter); + + try { + configure(runtime); + acquire(runtime); + validate_and_cleanup(runtime); + retval = 0; + } catch (const std::exception& e) { + ERR("Exception: %s", e.what()); + } catch (...) { + ERR("Unknown exception"); + } + + return retval; +}