From dc674fce082752c1233c2fa6623f9a7d9b887682 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Fri, 4 Oct 2024 13:24:22 -0700 Subject: [PATCH] Standalone 13/N: Integrate components into ZarrStream_s (#305) Depends on #304. --- src/streaming/acquire.zarr.cpp | 4 + src/streaming/array.writer.cpp | 31 +- src/streaming/array.writer.hh | 13 +- src/streaming/file.sink.cpp | 2 +- src/streaming/file.sink.hh | 2 +- src/streaming/s3.sink.cpp | 4 +- src/streaming/s3.sink.hh | 2 +- src/streaming/sink.cpp | 6 + src/streaming/sink.hh | 2 +- src/streaming/zarr.stream.cpp | 593 +++++++++++++++++- src/streaming/zarr.stream.hh | 28 +- src/streaming/zarrv2.array.writer.cpp | 8 +- src/streaming/zarrv2.array.writer.hh | 4 +- src/streaming/zarrv3.array.writer.cpp | 10 +- src/streaming/zarrv3.array.writer.hh | 4 +- tests/CMakeLists.txt | 1 + tests/integration/CMakeLists.txt | 34 + ...tream-zarr-v2-compressed-to-filesystem.cpp | 313 +++++++++ .../stream-zarr-v2-compressed-to-s3.cpp | 447 +++++++++++++ .../stream-zarr-v2-raw-to-filesystem.cpp | 288 +++++++++ .../integration/stream-zarr-v2-raw-to-s3.cpp | 429 +++++++++++++ ...tream-zarr-v3-compressed-to-filesystem.cpp | 365 +++++++++++ .../stream-zarr-v3-compressed-to-s3.cpp | 482 ++++++++++++++ .../stream-zarr-v3-raw-to-filesystem.cpp | 341 ++++++++++ .../integration/stream-zarr-v3-raw-to-s3.cpp | 469 ++++++++++++++ tests/integration/test.macros.hh | 58 ++ tests/unit-tests/create-stream.cpp | 9 +- tests/unit-tests/zarrv2-writer-write-even.cpp | 7 +- .../zarrv2-writer-write-ragged-append-dim.cpp | 7 +- ...arrv2-writer-write-ragged-internal-dim.cpp | 7 +- tests/unit-tests/zarrv3-writer-write-even.cpp | 15 +- .../zarrv3-writer-write-ragged-append-dim.cpp | 15 +- ...arrv3-writer-write-ragged-internal-dim.cpp | 17 +- 33 files changed, 3906 insertions(+), 111 deletions(-) create mode 100644 tests/integration/CMakeLists.txt create mode 100644 tests/integration/stream-zarr-v2-compressed-to-filesystem.cpp create mode 100644 tests/integration/stream-zarr-v2-compressed-to-s3.cpp create mode 100644 tests/integration/stream-zarr-v2-raw-to-filesystem.cpp create mode 100644 tests/integration/stream-zarr-v2-raw-to-s3.cpp create mode 100644 tests/integration/stream-zarr-v3-compressed-to-filesystem.cpp create mode 100644 tests/integration/stream-zarr-v3-compressed-to-s3.cpp create mode 100644 tests/integration/stream-zarr-v3-raw-to-filesystem.cpp create mode 100644 tests/integration/stream-zarr-v3-raw-to-s3.cpp create mode 100644 tests/integration/test.macros.hh diff --git a/src/streaming/acquire.zarr.cpp b/src/streaming/acquire.zarr.cpp index bbc57da..91f7f5f 100644 --- a/src/streaming/acquire.zarr.cpp +++ b/src/streaming/acquire.zarr.cpp @@ -145,6 +145,10 @@ extern "C" void ZarrStream_destroy(struct ZarrStream_s* stream) { + if (!finalize_stream(stream)) { + return; + } + delete stream; } diff --git a/src/streaming/array.writer.cpp b/src/streaming/array.writer.cpp index 496625d..f1a3b55 100644 --- a/src/streaming/array.writer.cpp +++ b/src/streaming/array.writer.cpp @@ -49,7 +49,7 @@ zarr::downsample(const ArrayWriterConfig& config, shard_size_chunks }; } } - downsampled_config.dimensions = std::make_unique( + downsampled_config.dimensions = std::make_shared( std::move(downsampled_dims), config.dtype); downsampled_config.level_of_detail = config.level_of_detail + 1; @@ -76,17 +76,17 @@ zarr::downsample(const ArrayWriterConfig& config, } /// Writer -zarr::ArrayWriter::ArrayWriter(ArrayWriterConfig&& config, +zarr::ArrayWriter::ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool) : ArrayWriter(std::move(config), thread_pool, nullptr) { } zarr::ArrayWriter::ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool) - : config_{ std::move(config) } + : config_{ config } , thread_pool_{ thread_pool } , s3_connection_pool_{ s3_connection_pool } , bytes_to_flush_{ 0 } @@ -97,7 +97,7 @@ zarr::ArrayWriter::ArrayWriter( } size_t -zarr::ArrayWriter::write_frame(std::span data) +zarr::ArrayWriter::write_frame(std::span data) { const auto nbytes_data = data.size(); const auto nbytes_frame = @@ -239,12 +239,12 @@ zarr::ArrayWriter::make_buffers_() noexcept for (auto& buf : chunk_buffers_) { buf.resize(nbytes); - std::fill_n(buf.begin(), nbytes, std::byte(0)); + std::fill(buf.begin(), buf.end(), std::byte(0)); } } size_t -zarr::ArrayWriter::write_frame_to_chunks_(std::span data) +zarr::ArrayWriter::write_frame_to_chunks_(std::span data) { // break the frame into tiles and write them to the chunk buffers const auto bytes_per_px = bytes_of_type(config_.dtype); @@ -423,6 +423,11 @@ zarr::ArrayWriter::flush_() void zarr::ArrayWriter::close_sinks_() { + for (auto i = 0; i < data_sinks_.size(); ++i) { + EXPECT(finalize_sink(std::move(data_sinks_[i])), + "Failed to finalize sink ", + i); + } data_sinks_.clear(); } @@ -438,14 +443,24 @@ zarr::ArrayWriter::rollover_() bool zarr::finalize_array(std::unique_ptr&& writer) { + if (writer == nullptr) { + LOG_INFO("Array writer is null. Nothing to finalize."); + return true; + } + writer->is_finalizing_ = true; try { - writer->flush_(); + writer->flush_(); // data sinks finalized here } catch (const std::exception& exc) { LOG_ERROR("Failed to finalize array writer: ", exc.what()); return false; } + if (!finalize_sink(std::move(writer->metadata_sink_))) { + LOG_ERROR("Failed to finalize metadata sink"); + return false; + } + writer.reset(); return true; } diff --git a/src/streaming/array.writer.hh b/src/streaming/array.writer.hh index 8945dab..cf934f4 100644 --- a/src/streaming/array.writer.hh +++ b/src/streaming/array.writer.hh @@ -1,6 +1,6 @@ #pragma once -#include "zarr.stream.hh" +#include "zarr.dimension.hh" #include "thread.pool.hh" #include "s3.connection.hh" #include "blosc.compression.params.hh" @@ -12,10 +12,9 @@ namespace fs = std::filesystem; namespace zarr { - struct ArrayWriterConfig { - std::unique_ptr dimensions; + std::shared_ptr dimensions; ZarrDataType dtype; int level_of_detail; std::optional bucket_name; @@ -40,10 +39,10 @@ downsample(const ArrayWriterConfig& config, class ArrayWriter { public: - ArrayWriter(ArrayWriterConfig&& config, + ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool); - ArrayWriter(ArrayWriterConfig&& config, + ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool); @@ -54,7 +53,7 @@ class ArrayWriter * @param data The frame data. * @return The number of bytes written. */ - [[nodiscard]] size_t write_frame(std::span data); + [[nodiscard]] size_t write_frame(std::span data); protected: ArrayWriterConfig config_; @@ -89,7 +88,7 @@ class ArrayWriter bool should_flush_() const; virtual bool should_rollover_() const = 0; - size_t write_frame_to_chunks_(std::span data); + size_t write_frame_to_chunks_(std::span data); void compress_buffers_(); void flush_(); diff --git a/src/streaming/file.sink.cpp b/src/streaming/file.sink.cpp index 4db4f3a..312d90c 100644 --- a/src/streaming/file.sink.cpp +++ b/src/streaming/file.sink.cpp @@ -10,7 +10,7 @@ zarr::FileSink::FileSink(std::string_view filename) } bool -zarr::FileSink::write(size_t offset, std::span data) +zarr::FileSink::write(size_t offset, std::span data) { const auto bytes_of_buf = data.size(); if (data.data() == nullptr || bytes_of_buf == 0) { diff --git a/src/streaming/file.sink.hh b/src/streaming/file.sink.hh index 0bd2181..1849f79 100644 --- a/src/streaming/file.sink.hh +++ b/src/streaming/file.sink.hh @@ -11,7 +11,7 @@ class FileSink : public Sink public: explicit FileSink(std::string_view filename); - bool write(size_t offset, std::span data) override; + bool write(size_t offset, std::span data) override; protected: bool flush_() override; diff --git a/src/streaming/s3.sink.cpp b/src/streaming/s3.sink.cpp index 3ce664b..04ea3c7 100644 --- a/src/streaming/s3.sink.cpp +++ b/src/streaming/s3.sink.cpp @@ -50,7 +50,7 @@ zarr::S3Sink::flush_() } bool -zarr::S3Sink::write(size_t offset, std::span data) +zarr::S3Sink::write(size_t offset, std::span data) { if (data.data() == nullptr || data.empty()) { return true; @@ -66,7 +66,7 @@ zarr::S3Sink::write(size_t offset, std::span data) nbytes_buffered_ = offset - nbytes_flushed_; size_t bytes_of_data = data.size(); - std::byte* data_ptr = data.data(); + const std::byte* data_ptr = data.data(); while (bytes_of_data > 0) { const auto bytes_to_write = std::min(bytes_of_data, part_buffer_.size() - nbytes_buffered_); diff --git a/src/streaming/s3.sink.hh b/src/streaming/s3.sink.hh index 1e2c9d9..49d2219 100644 --- a/src/streaming/s3.sink.hh +++ b/src/streaming/s3.sink.hh @@ -17,7 +17,7 @@ class S3Sink : public Sink std::string_view object_key, std::shared_ptr connection_pool); - bool write(size_t offset, std::span data) override; + bool write(size_t offset, std::span data) override; protected: bool flush_() override; diff --git a/src/streaming/sink.cpp b/src/streaming/sink.cpp index 9f75566..e351a7b 100644 --- a/src/streaming/sink.cpp +++ b/src/streaming/sink.cpp @@ -1,8 +1,14 @@ #include "sink.hh" +#include "macros.hh" bool zarr::finalize_sink(std::unique_ptr&& sink) { + if (sink == nullptr) { + LOG_INFO("Sink is null. Nothing to finalize."); + return true; + } + if (!sink->flush_()) { return false; } diff --git a/src/streaming/sink.hh b/src/streaming/sink.hh index ba578c7..502679f 100644 --- a/src/streaming/sink.hh +++ b/src/streaming/sink.hh @@ -18,7 +18,7 @@ class Sink * @return True if the write was successful, false otherwise. */ [[nodiscard]] virtual bool write(size_t offset, - std::span buf) = 0; + std::span buf) = 0; protected: [[nodiscard]] virtual bool flush_() = 0; diff --git a/src/streaming/zarr.stream.cpp b/src/streaming/zarr.stream.cpp index 1ff7d27..ccb3d01 100644 --- a/src/streaming/zarr.stream.cpp +++ b/src/streaming/zarr.stream.cpp @@ -1,11 +1,19 @@ #include "macros.hh" #include "zarr.stream.hh" +#include "acquire.zarr.h" #include "zarr.common.hh" +#include "zarrv2.array.writer.hh" +#include "zarrv3.array.writer.hh" +#include "sink.creator.hh" #include #include +#ifdef min +#undef min +#endif + namespace fs = std::filesystem; namespace { @@ -239,7 +247,8 @@ validate_settings(const struct ZarrStreamSettings_s* settings) // we must have at least 3 dimensions const size_t ndims = settings->dimension_count; if (ndims < 3) { - LOG_ERROR("Invalid number of dimensions: ", ndims, ". Must be at least 3"); + LOG_ERROR( + "Invalid number of dimensions: ", ndims, ". Must be at least 3"); return false; } @@ -264,12 +273,116 @@ validate_settings(const struct ZarrStreamSettings_s* settings) return true; } + +std::string +dimension_type_to_string(ZarrDimensionType type) +{ + switch (type) { + case ZarrDimensionType_Time: + return "time"; + case ZarrDimensionType_Channel: + return "channel"; + case ZarrDimensionType_Space: + return "space"; + case ZarrDimensionType_Other: + return "other"; + default: + return "(unknown)"; + } +} + +template +[[nodiscard]] +std::byte* +scale_image(const std::byte* const src, + size_t& bytes_of_src, + size_t& width, + size_t& height) +{ + CHECK(src); + + const size_t bytes_of_frame = width * height * sizeof(T); + EXPECT(bytes_of_src >= bytes_of_frame, + "Expecting at least %zu bytes, got %zu", + bytes_of_frame, + bytes_of_src); + + const int downscale = 2; + constexpr auto bytes_of_type = static_cast(sizeof(T)); + const double factor = 0.25; + + const auto w_pad = static_cast(width + (width % downscale)); + const auto h_pad = static_cast(height + (height % downscale)); + + const auto size_downscaled = + static_cast(w_pad * h_pad * factor * bytes_of_type); + + auto* dst = new T[size_downscaled]; + EXPECT(dst, + "Failed to allocate ", + size_downscaled, + " bytes for destination frame"); + + memset(dst, 0, size_downscaled); + + size_t dst_idx = 0; + for (auto row = 0; row < height; row += downscale) { + const bool pad_height = (row == height - 1 && height != h_pad); + + for (auto col = 0; col < width; col += downscale) { + size_t src_idx = row * width + col; + const bool pad_width = (col == width - 1 && width != w_pad); + + auto here = static_cast(src[src_idx]); + auto right = static_cast( + src[src_idx + (1 - static_cast(pad_width))]); + auto down = static_cast( + src[src_idx + width * (1 - static_cast(pad_height))]); + auto diag = static_cast( + src[src_idx + width * (1 - static_cast(pad_height)) + + (1 - static_cast(pad_width))]); + + dst[dst_idx++] = + static_cast(factor * (here + right + down + diag)); + } + } + + bytes_of_src = size_downscaled; + width = static_cast(w_pad) / 2; + height = static_cast(h_pad) / 2; + + return reinterpret_cast(dst); +} + +template +void +average_two_frames(void* dst_, + size_t bytes_of_dst, + const void* src_, + size_t bytes_of_src) +{ + CHECK(dst_); + CHECK(src_); + EXPECT(bytes_of_dst == bytes_of_src, + "Expecting %zu bytes in destination, got %zu", + bytes_of_src, + bytes_of_dst); + + T* dst = static_cast(dst_); + const T* src = static_cast(src_); + + const auto num_pixels = bytes_of_src / sizeof(T); + for (auto i = 0; i < num_pixels; ++i) { + dst[i] = static_cast(0.5 * (dst[i] + src[i])); + } +} } // namespace /* ZarrStream_s implementation */ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings) : error_() + , frame_buffer_offset_(0) { if (!validate_settings(settings)) { throw std::runtime_error("Invalid Zarr stream settings"); @@ -277,6 +390,15 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings) commit_settings_(settings); + // spin up thread pool + thread_pool_ = std::make_shared( + std::thread::hardware_concurrency(), + [this](const std::string& err) { this->set_error_(err); }); + + // allocate a frame buffer + frame_buffer_.resize( + zarr::bytes_of_frame(*dimensions_, static_cast(dtype_))); + // create the data store EXPECT(create_store_(), error_); @@ -284,7 +406,9 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings) EXPECT(create_writers_(), error_); // allocate multiscale frame placeholders - create_scaled_frames_(); + if (multiscale_) { + create_scaled_frames_(); + } // allocate metadata sinks EXPECT(create_metadata_sinks_(), error_); @@ -299,21 +423,63 @@ ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings) EXPECT(write_external_metadata_(), error_); } -ZarrStream_s::~ZarrStream_s() +size_t +ZarrStream::append(const void* data_, size_t nbytes) { - try { - // must precede close of chunk file - write_group_metadata_(); - } catch (const std::exception& e) { - LOG_ERROR("Error finalizing Zarr stream: ", e.what()); + EXPECT(error_.empty(), "Cannot append data: ", error_.c_str()); + + if (0 == nbytes) { + return 0; } -} -size_t -ZarrStream::append(const void* data, size_t nbytes) -{ - // TODO (aliddell): implement this - return 0; + auto* data = static_cast(data_); + + const size_t bytes_of_frame = frame_buffer_.size(); + size_t bytes_written = 0; + + while (bytes_written < nbytes) { + const size_t bytes_remaining = nbytes - bytes_written; + if (frame_buffer_offset_ > 0) { // add to / finish a partial frame + const size_t bytes_to_copy = + std::min(bytes_of_frame - frame_buffer_offset_, bytes_remaining); + + memcpy(frame_buffer_.data() + frame_buffer_offset_, + data + bytes_written, + bytes_to_copy); + frame_buffer_offset_ += bytes_to_copy; + bytes_written += bytes_to_copy; + + // ready to flush the frame buffer + if (frame_buffer_offset_ == bytes_of_frame) { + const size_t bytes_written_this_frame = + writers_[0]->write_frame({ data, bytes_of_frame }); + if (bytes_written_this_frame == 0) { + break; + } + + bytes_written += bytes_to_copy; + data += bytes_to_copy; + frame_buffer_offset_ = 0; + } + } else if (bytes_remaining < bytes_of_frame) { // begin partial frame + memcpy(frame_buffer_.data(), data, bytes_remaining); + frame_buffer_offset_ = bytes_remaining; + bytes_written += bytes_remaining; + } else { // at least one full frame + const size_t bytes_written_this_frame = + writers_[0]->write_frame({ data, bytes_of_frame }); + if (bytes_written_this_frame == 0) { + break; + } + + write_multiscale_frames_(data, bytes_written_this_frame); + + bytes_written += bytes_written_this_frame; + data += bytes_written_this_frame; + } + } + + return bytes_written; } bool @@ -344,7 +510,8 @@ ZarrStream_s::commit_settings_(const struct ZarrStreamSettings_s* settings) .endpoint = zarr::trim(settings->s3_settings->endpoint), .bucket_name = zarr::trim(settings->s3_settings->bucket_name), .access_key_id = zarr::trim(settings->s3_settings->access_key_id), - .secret_access_key = zarr::trim(settings->s3_settings->secret_access_key), + .secret_access_key = + zarr::trim(settings->s3_settings->secret_access_key), }; } @@ -383,7 +550,26 @@ bool ZarrStream_s::create_store_() { if (is_s3_acquisition_()) { - // TODO (aliddell): implement this + // spin up S3 connection pool + try { + s3_connection_pool_ = std::make_shared( + std::thread::hardware_concurrency(), + s3_settings_->endpoint, + s3_settings_->access_key_id, + s3_settings_->secret_access_key); + } catch (const std::exception& e) { + set_error_("Error creating S3 connection pool: " + + std::string(e.what())); + return false; + } + + // test the S3 connection + auto conn = s3_connection_pool_->get_connection(); + if (!conn->is_connection_valid()) { + set_error_("Failed to connect to S3"); + return false; + } + s3_connection_pool_->return_connection(std::move(conn)); } else { if (fs::exists(store_path_)) { // remove everything inside the store path @@ -414,54 +600,409 @@ ZarrStream_s::create_store_() bool ZarrStream_s::create_writers_() { - // TODO (aliddell): implement this + writers_.clear(); + + // construct Blosc compression parameters + std::optional blosc_compression_params; + if (is_compressed_acquisition_()) { + blosc_compression_params = zarr::BloscCompressionParams( + zarr::blosc_codec_to_string(compression_settings_->codec), + compression_settings_->level, + compression_settings_->shuffle); + } + + std::optional s3_bucket_name; + if (is_s3_acquisition_()) { + s3_bucket_name = s3_settings_->bucket_name; + } + + zarr::ArrayWriterConfig config = { + .dimensions = dimensions_, + .dtype = static_cast(dtype_), + .level_of_detail = 0, + .bucket_name = s3_bucket_name, + .store_path = store_path_, + .compression_params = blosc_compression_params, + }; + + if (version_ == 2) { + writers_.push_back(std::make_unique( + config, thread_pool_, s3_connection_pool_)); + } else { + writers_.push_back(std::make_unique( + config, thread_pool_, s3_connection_pool_)); + } + + if (multiscale_) { + zarr::ArrayWriterConfig downsampled_config; + + bool do_downsample = true; + while (do_downsample) { + do_downsample = downsample(config, downsampled_config); + + if (version_ == 2) { + writers_.push_back(std::make_unique( + downsampled_config, thread_pool_, s3_connection_pool_)); + } else { + writers_.push_back(std::make_unique( + downsampled_config, thread_pool_, s3_connection_pool_)); + } + // scaled_frames_.emplace(level++, std::nullopt); + + config = std::move(downsampled_config); + downsampled_config = {}; + } + } + return true; } void ZarrStream_s::create_scaled_frames_() { - // TODO (aliddell): implement this + for (size_t level = 1; level < writers_.size(); ++level) { + scaled_frames_.emplace(level, std::nullopt); + } } bool ZarrStream_s::create_metadata_sinks_() { - // TODO (aliddell): implement this + zarr::SinkCreator creator(thread_pool_, s3_connection_pool_); + + try { + if (s3_connection_pool_) { + if (!creator.make_metadata_sinks(version_, + s3_settings_->bucket_name, + store_path_, + metadata_sinks_)) { + set_error_("Error creating metadata sinks"); + return false; + } + } else { + if (!creator.make_metadata_sinks( + version_, store_path_, metadata_sinks_)) { + set_error_("Error creating metadata sinks"); + return false; + } + } + } catch (const std::exception& e) { + set_error_("Error creating metadata sinks: " + std::string(e.what())); + return false; + } + return true; } bool ZarrStream_s::write_base_metadata_() { - // TODO (aliddell): implement this + nlohmann::json metadata; + std::string metadata_key; + + if (version_ == 2) { + metadata["multiscales"] = make_multiscale_metadata_(); + + metadata_key = ".zattrs"; + } else { + metadata["extensions"] = nlohmann::json::array(); + metadata["metadata_encoding"] = + "https://purl.org/zarr/spec/protocol/core/3.0"; + metadata["metadata_key_suffix"] = ".json"; + metadata["zarr_format"] = + "https://purl.org/zarr/spec/protocol/core/3.0"; + + metadata_key = "zarr.json"; + } + + const std::unique_ptr& sink = metadata_sinks_.at(metadata_key); + if (!sink) { + set_error_("Metadata sink '" + metadata_key + "'not found"); + return false; + } + + const std::string metadata_str = metadata.dump(4); + std::span data{ reinterpret_cast(metadata_str.data()), + metadata_str.size() }; + + if (!sink->write(0, data)) { + set_error_("Error writing base metadata"); + return false; + } + return true; } bool ZarrStream_s::write_group_metadata_() { - // TODO (aliddell): implement this + nlohmann::json metadata; + std::string metadata_key; + + if (version_ == 2) { + metadata = { { "zarr_format", 2 } }; + + metadata_key = ".zgroup"; + } else { + metadata["attributes"]["multiscales"] = make_multiscale_metadata_(); + + metadata_key = "meta/root.group.json"; + } + + const std::unique_ptr& sink = metadata_sinks_.at(metadata_key); + if (!sink) { + set_error_("Metadata sink '" + metadata_key + "'not found"); + return false; + } + + const std::string metadata_str = metadata.dump(4); + std::span data{ reinterpret_cast(metadata_str.data()), + metadata_str.size() }; + if (!sink->write(0, data)) { + set_error_("Error writing group metadata"); + return false; + } + return true; } bool ZarrStream_s::write_external_metadata_() { - // TODO (aliddell): implement this + if (custom_metadata_.empty()) { + return true; + } + + auto metadata = nlohmann::json::parse(custom_metadata_, + nullptr, // callback + false, // allow exceptions + true // ignore comments + ); + std::string metadata_key = "acquire.json"; + + if (version_ == 3) { + metadata_key = "meta/" + metadata_key; + } + + const std::unique_ptr& sink = metadata_sinks_.at(metadata_key); + if (!sink) { + set_error_("Metadata sink '" + metadata_key + "'not found"); + return false; + } + + const std::string metadata_str = metadata.dump(4); + std::span data{ reinterpret_cast(metadata_str.data()), + metadata_str.size() }; + if (!sink->write(0, data)) { + set_error_("Error writing external metadata"); + return false; + } + return true; } nlohmann::json ZarrStream_s::make_multiscale_metadata_() const { - // TODO (aliddell): implement this - return {}; + nlohmann::json multiscales; + multiscales[0]["version"] = "0.4"; + + auto& axes = multiscales[0]["axes"]; + for (auto i = 0; i < dimensions_->ndims(); ++i) { + const auto& dim = dimensions_->at(i); + std::string type = dimension_type_to_string(dim.type); + + if (i < dimensions_->ndims() - 2) { + axes.push_back({ { "name", dim.name.c_str() }, { "type", type } }); + } else { + axes.push_back({ { "name", dim.name.c_str() }, + { "type", type }, + { "unit", "micrometer" } }); + } + } + + // spatial multiscale metadata + std::vector scales(dimensions_->ndims(), 1.0); + multiscales[0]["datasets"] = { + { + { "path", "0" }, + { "coordinateTransformations", + { + { + { "type", "scale" }, + { "scale", scales }, + }, + } }, + }, + }; + + for (auto i = 1; i < writers_.size(); ++i) { + scales.clear(); + scales.push_back(std::pow(2, i)); // append + for (auto k = 0; k < dimensions_->ndims() - 3; ++k) { + scales.push_back(1.); + } + scales.push_back(std::pow(2, i)); // y + scales.push_back(std::pow(2, i)); // x + + multiscales[0]["datasets"].push_back({ + { "path", std::to_string(i) }, + { "coordinateTransformations", + { + { + { "type", "scale" }, + { "scale", scales }, + }, + } }, + }); + + // downsampling metadata + multiscales[0]["type"] = "local_mean"; + multiscales[0]["metadata"] = { + { "description", + "The fields in the metadata describe how to reproduce this " + "multiscaling in scikit-image. The method and its parameters " + "are " + "given here." }, + { "method", "skimage.transform.downscale_local_mean" }, + { "version", "0.21.0" }, + { "args", "[2]" }, + { "kwargs", { "cval", 0 } }, + }; + } + + return multiscales; } void -ZarrStream_s::write_multiscale_frames_(const uint8_t* data, +ZarrStream_s::write_multiscale_frames_(const std::byte* data, size_t bytes_of_data) { - // TODO (aliddell): implement this + if (!multiscale_) { + return; + } + + std::function scale; + std::function average2; + + switch (dtype_) { + case ZarrDataType_uint8: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_uint16: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_uint32: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_uint64: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_int8: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_int16: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_int32: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_int64: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_float32: + scale = scale_image; + average2 = average_two_frames; + break; + case ZarrDataType_float64: + scale = scale_image; + average2 = average_two_frames; + break; + default: + throw std::runtime_error("Invalid data type: " + + std::to_string(dtype_)); + } + + size_t frame_width = dimensions_->width_dim().array_size_px; + size_t frame_height = dimensions_->height_dim().array_size_px; + + std::byte* dst; + for (auto i = 1; i < writers_.size(); ++i) { + dst = scale(data, bytes_of_data, frame_width, frame_height); + + // bytes_of data is now downscaled + // frame_width and frame_height are now the new dimensions + + if (scaled_frames_[i]) { + average2(dst, bytes_of_data, *scaled_frames_[i], bytes_of_data); + std::span frame_data{ reinterpret_cast(dst), + bytes_of_data }; + EXPECT(writers_[i]->write_frame(frame_data), + "Failed to write frame to writer %zu", + i); + + // clean up this LOD + delete[] *scaled_frames_[i]; + scaled_frames_[i].reset(); + + // set up for next iteration + if (i + 1 < writers_.size()) { + data = dst; + } else { + delete[] dst; + } + } else { + scaled_frames_[i] = dst; + break; + } + } } + +bool +finalize_stream(struct ZarrStream_s* stream) +{ + if (stream == nullptr) { + LOG_INFO("Stream is null. Nothing to finalize."); + return true; + } + + if (!stream->write_group_metadata_()) { + LOG_ERROR("Error finalizing Zarr stream: ", stream->error_); + return false; + } + + for (auto& [sink_name, sink] : stream->metadata_sinks_) { + if (!finalize_sink(std::move(sink))) { + LOG_ERROR("Error finalizing Zarr stream. Failed to write ", + sink_name); + return false; + } + } + stream->metadata_sinks_.clear(); + + for (auto i = 0; i < stream->writers_.size(); ++i) { + if (!finalize_array(std::move(stream->writers_[i]))) { + LOG_ERROR("Error finalizing Zarr stream. Failed to write array ", + i); + return false; + } + } + stream->writers_.clear(); // flush before shutting down thread pool + stream->thread_pool_->await_stop(); + + for (auto& [_, frame] : stream->scaled_frames_) { + if (frame) { + delete[] *frame; + } + } + + return true; +} \ No newline at end of file diff --git a/src/streaming/zarr.stream.hh b/src/streaming/zarr.stream.hh index b1f0308..d359391 100644 --- a/src/streaming/zarr.stream.hh +++ b/src/streaming/zarr.stream.hh @@ -1,6 +1,10 @@ #pragma once #include "zarr.dimension.hh" +#include "thread.pool.hh" +#include "s3.connection.hh" +#include "sink.hh" +#include "array.writer.hh" #include @@ -12,7 +16,6 @@ struct ZarrStream_s { public: ZarrStream_s(struct ZarrStreamSettings_s* settings); - ~ZarrStream_s(); /** * @brief Append data to the stream. @@ -47,6 +50,18 @@ struct ZarrStream_s std::shared_ptr dimensions_; bool multiscale_; + std::vector frame_buffer_; + size_t frame_buffer_offset_; + + std::shared_ptr thread_pool_; + std::shared_ptr s3_connection_pool_; + + std::vector> writers_; + std::unordered_map> + metadata_sinks_; + + std::unordered_map> scaled_frames_; + bool is_s3_acquisition_() const; bool is_compressed_acquisition_() const; @@ -78,13 +93,18 @@ struct ZarrStream_s [[nodiscard]] bool write_base_metadata_(); /** @brief Write Zarr group metadata. */ - bool write_group_metadata_(); + [[nodiscard]] bool write_group_metadata_(); /** @brief Write external metadata. */ [[nodiscard]] bool write_external_metadata_(); /** @brief Construct OME metadata pertaining to the multiscale pyramid. */ - nlohmann::json make_multiscale_metadata_() const; + [[nodiscard]] nlohmann::json make_multiscale_metadata_() const; + + void write_multiscale_frames_(const std::byte* data, size_t bytes_of_data); - void write_multiscale_frames_(const uint8_t* data, size_t bytes_of_data); + friend bool finalize_stream(struct ZarrStream_s* stream); }; + +bool +finalize_stream(struct ZarrStream_s* stream); diff --git a/src/streaming/zarrv2.array.writer.cpp b/src/streaming/zarrv2.array.writer.cpp index b93b4aa..8b620ca 100644 --- a/src/streaming/zarrv2.array.writer.cpp +++ b/src/streaming/zarrv2.array.writer.cpp @@ -58,17 +58,17 @@ sample_type_to_dtype(ZarrDataType t, std::string& t_str) } // namespace zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool) - : ArrayWriter(std::move(config), thread_pool) + : ArrayWriter(config, thread_pool) { } zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool) - : ArrayWriter(std::move(config), thread_pool, s3_connection_pool) + : ArrayWriter(config, thread_pool, s3_connection_pool) { } diff --git a/src/streaming/zarrv2.array.writer.hh b/src/streaming/zarrv2.array.writer.hh index b96b115..2a6039a 100644 --- a/src/streaming/zarrv2.array.writer.hh +++ b/src/streaming/zarrv2.array.writer.hh @@ -6,10 +6,10 @@ namespace zarr { class ZarrV2ArrayWriter final : public ArrayWriter { public: - ZarrV2ArrayWriter(ArrayWriterConfig&& config, + ZarrV2ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool); - ZarrV2ArrayWriter(ArrayWriterConfig&& config, + ZarrV2ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool); diff --git a/src/streaming/zarrv3.array.writer.cpp b/src/streaming/zarrv3.array.writer.cpp index d58db69..d687076 100644 --- a/src/streaming/zarrv3.array.writer.cpp +++ b/src/streaming/zarrv3.array.writer.cpp @@ -46,17 +46,17 @@ sample_type_to_dtype(ZarrDataType t) } // namespace zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool) - : ZarrV3ArrayWriter(std::move(config), thread_pool, nullptr) + : ZarrV3ArrayWriter(config, thread_pool, nullptr) { } zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool) - : ArrayWriter(std::move(config), thread_pool, s3_connection_pool) + : ArrayWriter(config, thread_pool, s3_connection_pool) { const auto number_of_shards = config_.dimensions->number_of_shards(); const auto chunks_per_shard = config_.dimensions->chunks_per_shard(); @@ -86,7 +86,7 @@ zarr::ZarrV3ArrayWriter::flush_impl_() std::vector> chunk_in_shards(n_shards); for (auto i = 0; i < chunk_buffers_.size(); ++i) { const auto index = config_.dimensions->shard_index_for_chunk(i); - chunk_in_shards.at(index).push_back(i); + chunk_in_shards[index].push_back(i); } // write out chunks to shards diff --git a/src/streaming/zarrv3.array.writer.hh b/src/streaming/zarrv3.array.writer.hh index 4681ea3..3ad1b05 100644 --- a/src/streaming/zarrv3.array.writer.hh +++ b/src/streaming/zarrv3.array.writer.hh @@ -6,10 +6,10 @@ namespace zarr { struct ZarrV3ArrayWriter : public ArrayWriter { public: - ZarrV3ArrayWriter(ArrayWriterConfig&& config, + ZarrV3ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool); ZarrV3ArrayWriter( - ArrayWriterConfig&& config, + const ArrayWriterConfig& config, std::shared_ptr thread_pool, std::shared_ptr s3_connection_pool); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 57f3c4c..f369bb8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,6 +2,7 @@ if (${NOTEST}) message(STATUS "Skipping test targets") else () add_subdirectory(unit-tests) + add_subdirectory(integration) if (BUILD_ACQUIRE_DRIVER_ZARR) add_subdirectory(driver) diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt new file mode 100644 index 0000000..0df6408 --- /dev/null +++ b/tests/integration/CMakeLists.txt @@ -0,0 +1,34 @@ +set(project acquire-zarr) + +set(tests + stream-zarr-v2-raw-to-filesystem + stream-zarr-v2-compressed-to-filesystem + stream-zarr-v2-raw-to-s3 + stream-zarr-v2-compressed-to-s3 + stream-zarr-v3-raw-to-filesystem + stream-zarr-v3-compressed-to-filesystem + stream-zarr-v3-raw-to-s3 + stream-zarr-v3-compressed-to-s3 +) + +foreach (name ${tests}) + set(tgt "${project}-${name}") + add_executable(${tgt} ${name}.cpp test.macros.hh) + target_compile_definitions(${tgt} PUBLIC "TEST=\"${tgt}\"") + set_target_properties(${tgt} PROPERTIES + MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>" + ) + target_include_directories(${tgt} PRIVATE + ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/src/logger + ) + target_link_libraries(${tgt} PRIVATE + acquire-logger + acquire-zarr + nlohmann_json::nlohmann_json + miniocpp::miniocpp + ) + + add_test(NAME test-${tgt} COMMAND ${tgt}) + set_tests_properties(test-${tgt} PROPERTIES LABELS "anyplatform;acquire-zarr") +endforeach () \ No newline at end of file diff --git a/tests/integration/stream-zarr-v2-compressed-to-filesystem.cpp b/tests/integration/stream-zarr-v2-compressed-to-filesystem.cpp new file mode 100644 index 0000000..4e43421 --- /dev/null +++ b/tests/integration/stream-zarr-v2-compressed-to-filesystem.cpp @@ -0,0 +1,313 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include + +#include +#include +#include + +namespace fs = std::filesystem; + +namespace { +const std::string test_path = + (fs::temp_directory_path() / (TEST ".zarr")).string(); + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const size_t nbytes_px = sizeof(int32_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; +} // namespace/s + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = test_path.c_str(), + .s3_settings = nullptr, + .data_type = ZarrDataType_int32, + .version = ZarrVersion_2, + }; + + ZarrCompressionSettings compression_settings = { + .compressor = ZarrCompressor_Blosc1, + .codec = ZarrCompressionCodec_BloscZstd, + .level = 1, + .shuffle = 1, + }; + settings.compression_settings = &compression_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, 0); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, 0); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, 0); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, 0); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, 0); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT( + type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT( + type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(double, scale[0].get(), 1.0); + EXPECT_EQ(double, scale[1].get(), 1.0); + EXPECT_EQ(double, scale[2].get(), 1.0); + EXPECT_EQ(double, scale[3].get(), 1.0); + EXPECT_EQ(double, scale[4].get(), 1.0); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto zarr_format = meta["zarr_format"].get(); + EXPECT_EQ(int, zarr_format, 2); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunks"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto dtype = meta["dtype"].get(); + EXPECT(dtype == "(); + EXPECT(compressor_id == "blosc", + "Expected compressor id to be blosc, but got ", + compressor_id); + + const auto cname = compressor["cname"].get(); + EXPECT( + cname == "zstd", "Expected compressor cname to be zstd, but got ", cname); + + const auto clevel = compressor["clevel"].get(); + EXPECT_EQ(int, clevel, 1); + + const auto shuffle = compressor["shuffle"].get(); + EXPECT_EQ(int, shuffle, 1); +} + +void +verify_file_data() +{ + const auto expected_file_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * + nbytes_px; + + fs::path data_root = fs::path(test_path) / "0"; + + CHECK(fs::is_directory(data_root)); + for (auto t = 0; t < chunks_in_t; ++t) { + const auto t_dir = data_root / std::to_string(t); + CHECK(fs::is_directory(t_dir)); + + for (auto c = 0; c < chunks_in_c; ++c) { + const auto c_dir = t_dir / std::to_string(c); + CHECK(fs::is_directory(c_dir)); + + for (auto z = 0; z < chunks_in_z; ++z) { + const auto z_dir = c_dir / std::to_string(z); + CHECK(fs::is_directory(z_dir)); + + for (auto y = 0; y < chunks_in_y; ++y) { + const auto y_dir = z_dir / std::to_string(y); + CHECK(fs::is_directory(y_dir)); + + for (auto x = 0; x < chunks_in_x; ++x) { + const auto x_file = y_dir / std::to_string(x); + CHECK(fs::is_regular_file(x_file)); + const auto file_size = fs::file_size(x_file); + EXPECT_LT(size_t, file_size, expected_file_size); + } + + CHECK(!fs::is_regular_file(y_dir / + std::to_string(chunks_in_x))); + } + + CHECK(!fs::is_directory(z_dir / std::to_string(chunks_in_y))); + } + + CHECK(!fs::is_directory(c_dir / std::to_string(chunks_in_z))); + } + + CHECK(!fs::is_directory(t_dir / std::to_string(chunks_in_c))); + } + + CHECK(!fs::is_directory(data_root / std::to_string(chunks_in_t))); +} + +void +verify() +{ + CHECK(std::filesystem::is_directory(test_path)); + + { + fs::path base_metadata_path = fs::path(test_path) / ".zattrs"; + std::ifstream f(base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(f); + + verify_base_metadata(base_metadata); + } + + { + fs::path group_metadata_path = fs::path(test_path) / ".zgroup"; + std::ifstream f = std::ifstream(group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(f); + + verify_group_metadata(group_metadata); + } + + { + fs::path array_metadata_path = fs::path(test_path) / "0" / ".zarray"; + std::ifstream f = std::ifstream(array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(f); + + verify_array_metadata(array_metadata); + } + + verify_file_data(); +} + +int +main() +{ + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify(); + + // Clean up + fs::remove_all(test_path); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: ", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v2-compressed-to-s3.cpp b/tests/integration/stream-zarr-v2-compressed-to-s3.cpp new file mode 100644 index 0000000..8cabd4f --- /dev/null +++ b/tests/integration/stream-zarr-v2-compressed-to-s3.cpp @@ -0,0 +1,447 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include +#include + +#include + +namespace { +std::string s3_endpoint, s3_bucket_name, s3_access_key_id, s3_secret_access_key; + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const size_t nbytes_px = sizeof(int32_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOG_ERROR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOG_ERROR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOG_ERROR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOG_ERROR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; +} + +size_t +get_object_size(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + if (!response) { + LOG_ERROR("Failed to get object size: %s", object_name.c_str()); + return 0; + } + + return response.size; +} + +std::string +get_object_contents(minio::s3::Client& client, const std::string& object_name) +{ + std::stringstream ss; + + minio::s3::GetObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + args.datafunc = [&ss](minio::http::DataFunctionArgs args) -> bool { + ss << args.datachunk; + return true; + }; + + // Call get object. + minio::s3::GetObjectResponse resp = client.GetObject(args); + + return ss.str(); +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + LOG_ERROR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; +} +} // namespace/s + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = TEST, + .data_type = ZarrDataType_int32, + .version = ZarrVersion_2, + }; + + ZarrS3Settings s3_settings{ + .endpoint = s3_endpoint.c_str(), + .bucket_name = s3_bucket_name.c_str(), + .access_key_id = s3_access_key_id.c_str(), + .secret_access_key = s3_secret_access_key.c_str(), + }; + settings.s3_settings = &s3_settings; + + ZarrCompressionSettings compression_settings = { + .compressor = ZarrCompressor_Blosc1, + .codec = ZarrCompressionCodec_BloscZstd, + .level = 1, + .shuffle = 1, + }; + settings.compression_settings = &compression_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, 0); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, 0); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, 0); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, 0); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, 0); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT( + type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT( + type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto zarr_format = meta["zarr_format"].get(); + EXPECT_EQ(int, zarr_format, 2); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunks"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto dtype = meta["dtype"].get(); + EXPECT(dtype == "(); + EXPECT(compressor_id == "blosc", + "Expected compressor id to be 'blosc', but got '%s'", + compressor_id.c_str()); + + const auto cname = compressor["cname"].get(); + EXPECT(cname == "zstd", + "Expected compressor cname to be 'zstd', but got '%s'", + cname.c_str()); + + const auto clevel = compressor["clevel"].get(); + EXPECT_EQ(int, clevel, 1); + + const auto shuffle = compressor["shuffle"].get(); + EXPECT_EQ(int, shuffle, 1); +} + +void +verify_and_cleanup() +{ + + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + minio::s3::Client client(url, &provider); + + std::string base_metadata_path = TEST "/.zattrs"; + std::string group_metadata_path = TEST "/.zgroup"; + std::string array_metadata_path = TEST "/0/.zarray"; + + { + EXPECT(object_exists(client, base_metadata_path), + "Object does not exist: %s", + base_metadata_path.c_str()); + std::string contents = get_object_contents(client, base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(contents); + + verify_base_metadata(base_metadata); + } + + { + EXPECT(object_exists(client, group_metadata_path), + "Object does not exist: %s", + group_metadata_path.c_str()); + std::string contents = get_object_contents(client, group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(contents); + + verify_group_metadata(group_metadata); + } + + { + EXPECT(object_exists(client, array_metadata_path), + "Object does not exist: %s", + array_metadata_path.c_str()); + std::string contents = get_object_contents(client, array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(contents); + + verify_array_metadata(array_metadata); + } + + CHECK(remove_items( + client, + { base_metadata_path, group_metadata_path, array_metadata_path })); + + const auto expected_file_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * + nbytes_px; + + // verify and clean up data files + std::vector data_files; + std::string data_root = TEST "/0"; + + for (auto t = 0; t < chunks_in_t; ++t) { + const auto t_dir = data_root + "/" + std::to_string(t); + + for (auto c = 0; c < chunks_in_c; ++c) { + const auto c_dir = t_dir + "/" + std::to_string(c); + + for (auto z = 0; z < chunks_in_z; ++z) { + const auto z_dir = c_dir + "/" + std::to_string(z); + + for (auto y = 0; y < chunks_in_y; ++y) { + const auto y_dir = z_dir + "/" + std::to_string(y); + + for (auto x = 0; x < chunks_in_x; ++x) { + const auto x_file = y_dir + "/" + std::to_string(x); + EXPECT(object_exists(client, x_file), + "Object does not exist: %s", + x_file.c_str()); + const auto file_size = get_object_size(client, x_file); + EXPECT_LT(size_t, file_size, expected_file_size); + data_files.push_back(x_file); + } + + CHECK(!object_exists( + client, y_dir + "/" + std::to_string(chunks_in_x))); + } + } + } + } + + CHECK(remove_items(client, data_files)); +} + +int +main() +{ + if (!get_credentials()) { + LOG_WARNING("Failed to get credentials. Skipping test."); + return 0; + } + + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify_and_cleanup(); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: ", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v2-raw-to-filesystem.cpp b/tests/integration/stream-zarr-v2-raw-to-filesystem.cpp new file mode 100644 index 0000000..838ca73 --- /dev/null +++ b/tests/integration/stream-zarr-v2-raw-to-filesystem.cpp @@ -0,0 +1,288 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include + +#include +#include +#include + +namespace fs = std::filesystem; + +namespace { +const std::string test_path = + (fs::temp_directory_path() / (TEST ".zarr")).string(); + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const size_t nbytes_px = sizeof(int32_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; +} // namespace/s + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = test_path.c_str(), + .s3_settings = nullptr, + .compression_settings = nullptr, + .data_type = ZarrDataType_int32, + .version = ZarrVersion_2, + }; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, 0); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, 0); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, 0); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, 0); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, 0); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT(type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT(type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto zarr_format = meta["zarr_format"].get(); + EXPECT_EQ(int, zarr_format, 2); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunks"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto dtype = meta["dtype"].get(); + EXPECT(dtype == " frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", i, ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify(); + + // Clean up + fs::remove_all(test_path); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: %s", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v2-raw-to-s3.cpp b/tests/integration/stream-zarr-v2-raw-to-s3.cpp new file mode 100644 index 0000000..f13f1a1 --- /dev/null +++ b/tests/integration/stream-zarr-v2-raw-to-s3.cpp @@ -0,0 +1,429 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include +#include + +#include + +namespace { +std::string s3_endpoint, s3_bucket_name, s3_access_key_id, s3_secret_access_key; + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const size_t nbytes_px = sizeof(int32_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOG_ERROR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOG_ERROR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOG_ERROR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOG_ERROR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; +} + +size_t +get_object_size(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + if (!response) { + LOG_ERROR("Failed to get object size: %s", object_name.c_str()); + return 0; + } + + return response.size; +} + +std::string +get_object_contents(minio::s3::Client& client, const std::string& object_name) +{ + std::stringstream ss; + + minio::s3::GetObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + args.datafunc = [&ss](minio::http::DataFunctionArgs args) -> bool { + ss << args.datachunk; + return true; + }; + + // Call get object. + minio::s3::GetObjectResponse resp = client.GetObject(args); + + return ss.str(); +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + LOG_ERROR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; +} +} // namespace + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = TEST, + .compression_settings = nullptr, + .data_type = ZarrDataType_int32, + .version = ZarrVersion_2, + }; + + ZarrS3Settings s3_settings{ + .endpoint = s3_endpoint.c_str(), + .bucket_name = s3_bucket_name.c_str(), + .access_key_id = s3_access_key_id.c_str(), + .secret_access_key = s3_secret_access_key.c_str(), + }; + + settings.s3_settings = &s3_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, 0); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, 0); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, 0); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, 0); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, 0); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '%s'", name.c_str()); + EXPECT( + type == "time", "Expected type to be 'time', but got '%s'", type.c_str()); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '%s'", name.c_str()); + EXPECT(type == "channel", + "Expected type to be 'channel', but got '%s'", + type.c_str()); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '%s'", name.c_str()); + EXPECT(type == "space", + "Expected type to be 'space', but got '%s'", + type.c_str()); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '%s'", name.c_str()); + EXPECT(type == "space", + "Expected type to be 'space', but got '%s'", + type.c_str()); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '%s'", + unit.c_str()); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '%s'", name.c_str()); + EXPECT(type == "space", + "Expected type to be 'space', but got '%s'", + type.c_str()); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '%s'", + unit.c_str()); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '%s'", path.c_str()); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT(type == "scale", + "Expected type to be 'scale', but got '%s'", + type.c_str()); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto zarr_format = meta["zarr_format"].get(); + EXPECT_EQ(int, zarr_format, 2); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunks"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto dtype = meta["dtype"].get(); + EXPECT(dtype == " data_files; + std::string data_root = TEST "/0"; + + for (auto t = 0; t < chunks_in_t; ++t) { + const auto t_dir = data_root + "/" + std::to_string(t); + + for (auto c = 0; c < chunks_in_c; ++c) { + const auto c_dir = t_dir + "/" + std::to_string(c); + + for (auto z = 0; z < chunks_in_z; ++z) { + const auto z_dir = c_dir + "/" + std::to_string(z); + + for (auto y = 0; y < chunks_in_y; ++y) { + const auto y_dir = z_dir + "/" + std::to_string(y); + + for (auto x = 0; x < chunks_in_x; ++x) { + const auto x_file = y_dir + "/" + std::to_string(x); + EXPECT(object_exists(client, x_file), + "Object does not exist: %s", + x_file.c_str()); + const auto file_size = get_object_size(client, x_file); + EXPECT_EQ(size_t, file_size, expected_file_size); + data_files.push_back(x_file); + } + + CHECK(!object_exists( + client, y_dir + "/" + std::to_string(chunks_in_x))); + } + } + } + } + + CHECK(remove_items(client, data_files)); +} + +int +main() +{ + if (!get_credentials()) { + LOG_WARNING("Failed to get credentials. Skipping test."); + return 0; + } + + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame %d: %s", + i, + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify_and_cleanup(); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: ", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v3-compressed-to-filesystem.cpp b/tests/integration/stream-zarr-v3-compressed-to-filesystem.cpp new file mode 100644 index 0000000..d778ec3 --- /dev/null +++ b/tests/integration/stream-zarr-v3-compressed-to-filesystem.cpp @@ -0,0 +1,365 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include + +#include +#include +#include + +namespace fs = std::filesystem; + +namespace { +const std::string test_path = + (fs::temp_directory_path() / (TEST ".zarr")).string(); + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int shard_width = 2, shard_height = 1, shard_planes = 1, + shard_channels = 2, shard_timepoints = 2; +const unsigned int chunks_per_shard = + shard_width * shard_height * shard_planes * shard_channels * shard_timepoints; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const unsigned int shards_in_x = + (chunks_in_x + shard_width - 1) / shard_width; // 2 shards +const unsigned int shards_in_y = + (chunks_in_y + shard_height - 1) / shard_height; // 3 shards +const unsigned int shards_in_z = + (chunks_in_z + shard_planes - 1) / shard_planes; // 3 shards +const unsigned int shards_in_c = + (chunks_in_c + shard_channels - 1) / shard_channels; // 1 shard +const unsigned int shards_in_t = + (chunks_in_t + shard_timepoints - 1) / shard_timepoints; // 1 shard + +const size_t nbytes_px = sizeof(uint16_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; +} // namespace/s + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = test_path.c_str(), + .s3_settings = nullptr, + .data_type = ZarrDataType_uint16, + .version = ZarrVersion_3, + }; + + ZarrCompressionSettings compression_settings = { + .compressor = ZarrCompressor_Blosc1, + .codec = ZarrCompressionCodec_BloscLZ4, + .level = 2, + .shuffle = 2, + }; + settings.compression_settings = &compression_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, shard_timepoints); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, shard_channels); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, shard_planes); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, shard_height); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto extensions = meta["extensions"]; + EXPECT_EQ(size_t, extensions.size(), 0); + + const auto encoding = meta["metadata_encoding"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); + + const auto suffix = meta["metadata_key_suffix"].get(); + EXPECT(suffix == ".json", + "Expected suffix to be '.json', but got '%s'", + suffix.c_str()); + + const auto zarr_format = meta["zarr_format"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["attributes"]["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT( + type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT( + type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunk_grid"]["chunk_shape"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto& shards = + meta["storage_transformers"][0]["configuration"]["chunks_per_shard"]; + EXPECT_EQ(size_t, shards.size(), 5); + EXPECT_EQ(int, shards[0].get(), shard_timepoints); + EXPECT_EQ(int, shards[1].get(), shard_channels); + EXPECT_EQ(int, shards[2].get(), shard_planes); + EXPECT_EQ(int, shards[3].get(), shard_height); + EXPECT_EQ(int, shards[4].get(), shard_width); + + const auto dtype = meta["data_type"].get(); + EXPECT(dtype == "uint16", + "Expected dtype to be 'uint16', but got '", + dtype, + "'"); + + const auto& compressor = meta["compressor"]; + EXPECT(!compressor.is_null(), "Expected compressor to be non-null"); + + const auto codec = compressor["codec"].get(); + EXPECT(codec == "https://purl.org/zarr/spec/codec/blosc/1.0", + "Expected codec to be 'https://purl.org/zarr/spec/codec/blosc/1.0', " + "but got '%s'", + codec.c_str()); + + const auto& configuration = compressor["configuration"]; + EXPECT_EQ(int, configuration["blocksize"].get(), 0); + EXPECT_EQ(int, configuration["clevel"].get(), 2); + EXPECT_EQ(int, configuration["shuffle"].get(), 2); + + const auto cname = configuration["cname"].get(); + EXPECT(cname == "lz4", "Expected cname to be 'lz4', but got '", cname, "'"); +} + +void +verify_file_data() +{ + const auto chunk_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * nbytes_px; + const auto index_size = chunks_per_shard * + sizeof(uint64_t) * // indices are 64 bits + 2; // 2 indices per chunk + const auto expected_file_size = shard_width * shard_height * shard_planes * + shard_channels * shard_timepoints * + chunk_size + + index_size; + + fs::path data_root = fs::path(test_path) / "data" / "root" / "0"; + + CHECK(fs::is_directory(data_root)); + for (auto t = 0; t < shards_in_t; ++t) { + const auto t_dir = data_root / ("c" + std::to_string(t)); + CHECK(fs::is_directory(t_dir)); + + for (auto c = 0; c < shards_in_c; ++c) { + const auto c_dir = t_dir / std::to_string(c); + CHECK(fs::is_directory(c_dir)); + + for (auto z = 0; z < shards_in_z; ++z) { + const auto z_dir = c_dir / std::to_string(z); + CHECK(fs::is_directory(z_dir)); + + for (auto y = 0; y < shards_in_y; ++y) { + const auto y_dir = z_dir / std::to_string(y); + CHECK(fs::is_directory(y_dir)); + + for (auto x = 0; x < shards_in_x; ++x) { + const auto x_file = y_dir / std::to_string(x); + CHECK(fs::is_regular_file(x_file)); + const auto file_size = fs::file_size(x_file); + EXPECT_LT(size_t, file_size, expected_file_size); + } + + CHECK(!fs::is_regular_file(y_dir / + std::to_string(shards_in_x))); + } + + CHECK(!fs::is_directory(z_dir / std::to_string(shards_in_y))); + } + + CHECK(!fs::is_directory(c_dir / std::to_string(shards_in_z))); + } + + CHECK(!fs::is_directory(t_dir / std::to_string(shards_in_c))); + } + + CHECK(!fs::is_directory(data_root / ("c" + std::to_string(shards_in_t)))); +} + +void +verify() +{ + CHECK(std::filesystem::is_directory(test_path)); + + { + fs::path base_metadata_path = fs::path(test_path) / "zarr.json"; + std::ifstream f(base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(f); + + verify_base_metadata(base_metadata); + } + + { + fs::path group_metadata_path = + fs::path(test_path) / "meta" / "root.group.json"; + std::ifstream f = std::ifstream(group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(f); + + verify_group_metadata(group_metadata); + } + + { + fs::path array_metadata_path = + fs::path(test_path) / "meta" / "root" / "0.array.json"; + std::ifstream f = std::ifstream(array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(f); + + verify_array_metadata(array_metadata); + } + + verify_file_data(); +} + +int +main() +{ + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify(); + + // Clean up + fs::remove_all(test_path); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: %s", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v3-compressed-to-s3.cpp b/tests/integration/stream-zarr-v3-compressed-to-s3.cpp new file mode 100644 index 0000000..75a6eb1 --- /dev/null +++ b/tests/integration/stream-zarr-v3-compressed-to-s3.cpp @@ -0,0 +1,482 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include +#include + +#include + +namespace { +std::string s3_endpoint, s3_bucket_name, s3_access_key_id, s3_secret_access_key; + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int shard_width = 2, shard_height = 1, shard_planes = 1, + shard_channels = 2, shard_timepoints = 2; +const unsigned int chunks_per_shard = + shard_width * shard_height * shard_planes * shard_channels * shard_timepoints; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const unsigned int shards_in_x = + (chunks_in_x + shard_width - 1) / shard_width; // 2 shards +const unsigned int shards_in_y = + (chunks_in_y + shard_height - 1) / shard_height; // 3 shards +const unsigned int shards_in_z = + (chunks_in_z + shard_planes - 1) / shard_planes; // 3 shards +const unsigned int shards_in_c = + (chunks_in_c + shard_channels - 1) / shard_channels; // 1 shard +const unsigned int shards_in_t = + (chunks_in_t + shard_timepoints - 1) / shard_timepoints; // 1 shard + +const size_t nbytes_px = sizeof(uint16_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOG_ERROR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOG_ERROR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOG_ERROR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOG_ERROR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; +} + +size_t +get_object_size(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + if (!response) { + LOG_ERROR("Failed to get object size: %s", object_name.c_str()); + return 0; + } + + return response.size; +} + +std::string +get_object_contents(minio::s3::Client& client, const std::string& object_name) +{ + std::stringstream ss; + + minio::s3::GetObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + args.datafunc = [&ss](minio::http::DataFunctionArgs args) -> bool { + ss << args.datachunk; + return true; + }; + + // Call get object. + minio::s3::GetObjectResponse resp = client.GetObject(args); + + return ss.str(); +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + LOG_ERROR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; +} +} // namespace + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = TEST, + .data_type = ZarrDataType_uint16, + .version = ZarrVersion_3, + }; + + ZarrS3Settings s3_settings{ + .endpoint = s3_endpoint.c_str(), + .bucket_name = s3_bucket_name.c_str(), + .access_key_id = s3_access_key_id.c_str(), + .secret_access_key = s3_secret_access_key.c_str(), + }; + settings.s3_settings = &s3_settings; + + ZarrCompressionSettings compression_settings = { + .compressor = ZarrCompressor_Blosc1, + .codec = ZarrCompressionCodec_BloscLZ4, + .level = 3, + .shuffle = 1, + }; + settings.compression_settings = &compression_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, shard_timepoints); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, shard_channels); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, shard_planes); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, shard_height); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto extensions = meta["extensions"]; + EXPECT_EQ(size_t, extensions.size(), 0); + + const auto encoding = meta["metadata_encoding"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); + + const auto suffix = meta["metadata_key_suffix"].get(); + EXPECT(suffix == ".json", + "Expected suffix to be '.json', but got '%s'", + suffix.c_str()); + + const auto zarr_format = meta["zarr_format"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["attributes"]["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT(type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT(type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunk_grid"]["chunk_shape"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto& shards = + meta["storage_transformers"][0]["configuration"]["chunks_per_shard"]; + EXPECT_EQ(size_t, shards.size(), 5); + EXPECT_EQ(int, shards[0].get(), shard_timepoints); + EXPECT_EQ(int, shards[1].get(), shard_channels); + EXPECT_EQ(int, shards[2].get(), shard_planes); + EXPECT_EQ(int, shards[3].get(), shard_height); + EXPECT_EQ(int, shards[4].get(), shard_width); + + const auto dtype = meta["data_type"].get(); + EXPECT(dtype == "uint16", + "Expected dtype to be 'uint16', but got '", + dtype, + "'"); + + const auto& compressor = meta["compressor"]; + EXPECT(!compressor.is_null(), "Expected compressor to be non-null"); + + const auto codec = compressor["codec"].get(); + EXPECT(codec == "https://purl.org/zarr/spec/codec/blosc/1.0", + "Expected codec to be 'https://purl.org/zarr/spec/codec/blosc/1.0', " + "but got '%s'", + codec.c_str()); + + const auto& configuration = compressor["configuration"]; + EXPECT_EQ(int, configuration["blocksize"].get(), 0); + EXPECT_EQ(int, configuration["clevel"].get(), 3); + EXPECT_EQ(int, configuration["shuffle"].get(), 1); + + const auto cname = configuration["cname"].get(); + EXPECT(cname == "lz4", "Expected cname to be 'lz4', but got '", cname, "'"); +} + +void +verify_and_cleanup() +{ + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + minio::s3::Client client(url, &provider); + + std::string base_metadata_path = TEST "/zarr.json"; + std::string group_metadata_path = TEST "/meta/root.group.json"; + std::string array_metadata_path = TEST "/meta/root/0.array.json"; + + { + EXPECT(object_exists(client, base_metadata_path), + "Object does not exist: %s", + base_metadata_path.c_str()); + std::string contents = get_object_contents(client, base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(contents); + + verify_base_metadata(base_metadata); + } + + { + EXPECT(object_exists(client, group_metadata_path), + "Object does not exist: %s", + group_metadata_path.c_str()); + std::string contents = get_object_contents(client, group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(contents); + + verify_group_metadata(group_metadata); + } + + { + EXPECT(object_exists(client, array_metadata_path), + "Object does not exist: %s", + array_metadata_path.c_str()); + std::string contents = get_object_contents(client, array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(contents); + + verify_array_metadata(array_metadata); + } + + CHECK(remove_items( + client, + { base_metadata_path, group_metadata_path, array_metadata_path })); + + const auto chunk_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * nbytes_px; + const auto index_size = chunks_per_shard * + sizeof(uint64_t) * // indices are 64 bits + 2; // 2 indices per chunk + const auto expected_file_size = shard_width * shard_height * shard_planes * + shard_channels * shard_timepoints * + chunk_size + + index_size; + + // verify and clean up data files + std::vector data_files; + std::string data_root = TEST "/data/root/0"; + + for (auto t = 0; t < shards_in_t; ++t) { + const auto t_dir = data_root + "/" + ("c" + std::to_string(t)); + + for (auto c = 0; c < shards_in_c; ++c) { + const auto c_dir = t_dir + "/" + std::to_string(c); + + for (auto z = 0; z < shards_in_z; ++z) { + const auto z_dir = c_dir + "/" + std::to_string(z); + + for (auto y = 0; y < shards_in_y; ++y) { + const auto y_dir = z_dir + "/" + std::to_string(y); + + for (auto x = 0; x < shards_in_x; ++x) { + const auto x_file = y_dir + "/" + std::to_string(x); + EXPECT(object_exists(client, x_file), + "Object does not exist: %s", + x_file.c_str()); + const auto file_size = get_object_size(client, x_file); + EXPECT_LT(size_t, file_size, expected_file_size); + } + } + } + } + } +} + +int +main() +{ + if (!get_credentials()) { + LOG_WARNING("Failed to get credentials. Skipping test."); + return 0; + } + + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify_and_cleanup(); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: %s", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v3-raw-to-filesystem.cpp b/tests/integration/stream-zarr-v3-raw-to-filesystem.cpp new file mode 100644 index 0000000..31225a6 --- /dev/null +++ b/tests/integration/stream-zarr-v3-raw-to-filesystem.cpp @@ -0,0 +1,341 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include + +#include +#include +#include + +namespace fs = std::filesystem; + +namespace { +const std::string test_path = + (fs::temp_directory_path() / (TEST ".zarr")).string(); + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int shard_width = 2, shard_height = 1, shard_planes = 1, + shard_channels = 2, shard_timepoints = 2; +const unsigned int chunks_per_shard = + shard_width * shard_height * shard_planes * shard_channels * shard_timepoints; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const unsigned int shards_in_x = + (chunks_in_x + shard_width - 1) / shard_width; // 2 shards +const unsigned int shards_in_y = + (chunks_in_y + shard_height - 1) / shard_height; // 3 shards +const unsigned int shards_in_z = + (chunks_in_z + shard_planes - 1) / shard_planes; // 3 shards +const unsigned int shards_in_c = + (chunks_in_c + shard_channels - 1) / shard_channels; // 1 shard +const unsigned int shards_in_t = + (chunks_in_t + shard_timepoints - 1) / shard_timepoints; // 1 shard + +const size_t nbytes_px = sizeof(uint16_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; +} // namespace/s + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = test_path.c_str(), + .s3_settings = nullptr, + .compression_settings = nullptr, + .data_type = ZarrDataType_uint16, + .version = ZarrVersion_3, + }; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, shard_timepoints); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, shard_channels); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, shard_planes); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, shard_height); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto extensions = meta["extensions"]; + EXPECT_EQ(size_t, extensions.size(), 0); + + const auto encoding = meta["metadata_encoding"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); + + const auto suffix = meta["metadata_key_suffix"].get(); + EXPECT(suffix == ".json", + "Expected suffix to be '.json', but got '%s'", + suffix.c_str()); + + const auto zarr_format = meta["zarr_format"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["attributes"]["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT(type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT(type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT(type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunk_grid"]["chunk_shape"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto& shards = + meta["storage_transformers"][0]["configuration"]["chunks_per_shard"]; + EXPECT_EQ(size_t, shards.size(), 5); + EXPECT_EQ(int, shards[0].get(), shard_timepoints); + EXPECT_EQ(int, shards[1].get(), shard_channels); + EXPECT_EQ(int, shards[2].get(), shard_planes); + EXPECT_EQ(int, shards[3].get(), shard_height); + EXPECT_EQ(int, shards[4].get(), shard_width); + + const auto dtype = meta["data_type"].get(); + EXPECT(dtype == "uint16", + "Expected dtype to be 'uint16', but got '", + dtype, + "'"); + + const auto& compressor = meta["compressor"]; + EXPECT(compressor.is_null(), + "Expected compressor to be null, but got '%s'", + compressor.dump().c_str()); +} + +void +verify_file_data() +{ + const auto chunk_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * nbytes_px; + const auto index_size = chunks_per_shard * + sizeof(uint64_t) * // indices are 64 bits + 2; // 2 indices per chunk + const auto expected_file_size = shard_width * shard_height * shard_planes * + shard_channels * shard_timepoints * + chunk_size + + index_size; + + fs::path data_root = fs::path(test_path) / "data" / "root" / "0"; + + CHECK(fs::is_directory(data_root)); + for (auto t = 0; t < shards_in_t; ++t) { + const auto t_dir = data_root / ("c" + std::to_string(t)); + CHECK(fs::is_directory(t_dir)); + + for (auto c = 0; c < shards_in_c; ++c) { + const auto c_dir = t_dir / std::to_string(c); + CHECK(fs::is_directory(c_dir)); + + for (auto z = 0; z < shards_in_z; ++z) { + const auto z_dir = c_dir / std::to_string(z); + CHECK(fs::is_directory(z_dir)); + + for (auto y = 0; y < shards_in_y; ++y) { + const auto y_dir = z_dir / std::to_string(y); + CHECK(fs::is_directory(y_dir)); + + for (auto x = 0; x < shards_in_x; ++x) { + const auto x_file = y_dir / std::to_string(x); + CHECK(fs::is_regular_file(x_file)); + const auto file_size = fs::file_size(x_file); + EXPECT_EQ(size_t, file_size, expected_file_size); + } + + CHECK(!fs::is_regular_file(y_dir / + std::to_string(shards_in_x))); + } + + CHECK(!fs::is_directory(z_dir / std::to_string(shards_in_y))); + } + + CHECK(!fs::is_directory(c_dir / std::to_string(shards_in_z))); + } + + CHECK(!fs::is_directory(t_dir / std::to_string(shards_in_c))); + } + + CHECK(!fs::is_directory(data_root / ("c" + std::to_string(shards_in_t)))); +} + +void +verify() +{ + CHECK(std::filesystem::is_directory(test_path)); + + { + fs::path base_metadata_path = fs::path(test_path) / "zarr.json"; + std::ifstream f(base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(f); + + verify_base_metadata(base_metadata); + } + + { + fs::path group_metadata_path = + fs::path(test_path) / "meta" / "root.group.json"; + std::ifstream f = std::ifstream(group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(f); + + verify_group_metadata(group_metadata); + } + + { + fs::path array_metadata_path = + fs::path(test_path) / "meta" / "root" / "0.array.json"; + std::ifstream f = std::ifstream(array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(f); + + verify_array_metadata(array_metadata); + } + + verify_file_data(); +} + +int +main() +{ + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify(); + + // Clean up + fs::remove_all(test_path); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: %s", e.what()); + } + + return retval; +} diff --git a/tests/integration/stream-zarr-v3-raw-to-s3.cpp b/tests/integration/stream-zarr-v3-raw-to-s3.cpp new file mode 100644 index 0000000..f099a33 --- /dev/null +++ b/tests/integration/stream-zarr-v3-raw-to-s3.cpp @@ -0,0 +1,469 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include +#include + +#include + +namespace { +std::string s3_endpoint, s3_bucket_name, s3_access_key_id, s3_secret_access_key; + +const unsigned int array_width = 64, array_height = 48, array_planes = 6, + array_channels = 8, array_timepoints = 10; + +const unsigned int chunk_width = 16, chunk_height = 16, chunk_planes = 2, + chunk_channels = 4, chunk_timepoints = 5; + +const unsigned int shard_width = 2, shard_height = 1, shard_planes = 1, + shard_channels = 2, shard_timepoints = 2; +const unsigned int chunks_per_shard = + shard_width * shard_height * shard_planes * shard_channels * shard_timepoints; + +const unsigned int chunks_in_x = + (array_width + chunk_width - 1) / chunk_width; // 4 chunks +const unsigned int chunks_in_y = + (array_height + chunk_height - 1) / chunk_height; // 3 chunks +const unsigned int chunks_in_z = + (array_planes + chunk_planes - 1) / chunk_planes; // 3 chunks +const unsigned int chunks_in_c = + (array_channels + chunk_channels - 1) / chunk_channels; // 2 chunks +const unsigned int chunks_in_t = + (array_timepoints + chunk_timepoints - 1) / chunk_timepoints; + +const unsigned int shards_in_x = + (chunks_in_x + shard_width - 1) / shard_width; // 2 shards +const unsigned int shards_in_y = + (chunks_in_y + shard_height - 1) / shard_height; // 3 shards +const unsigned int shards_in_z = + (chunks_in_z + shard_planes - 1) / shard_planes; // 3 shards +const unsigned int shards_in_c = + (chunks_in_c + shard_channels - 1) / shard_channels; // 1 shard +const unsigned int shards_in_t = + (chunks_in_t + shard_timepoints - 1) / shard_timepoints; // 1 shard + +const size_t nbytes_px = sizeof(uint16_t); +const uint32_t frames_to_acquire = + array_planes * array_channels * array_timepoints; +const size_t bytes_of_frame = array_width * array_height * nbytes_px; + +bool +get_credentials() +{ + char* env = nullptr; + if (!(env = std::getenv("ZARR_S3_ENDPOINT"))) { + LOG_ERROR("ZARR_S3_ENDPOINT not set."); + return false; + } + s3_endpoint = env; + + if (!(env = std::getenv("ZARR_S3_BUCKET_NAME"))) { + LOG_ERROR("ZARR_S3_BUCKET_NAME not set."); + return false; + } + s3_bucket_name = env; + + if (!(env = std::getenv("ZARR_S3_ACCESS_KEY_ID"))) { + LOG_ERROR("ZARR_S3_ACCESS_KEY_ID not set."); + return false; + } + s3_access_key_id = env; + + if (!(env = std::getenv("ZARR_S3_SECRET_ACCESS_KEY"))) { + LOG_ERROR("ZARR_S3_SECRET_ACCESS_KEY not set."); + return false; + } + s3_secret_access_key = env; + + return true; +} + +bool +object_exists(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + return (bool)response; +} + +size_t +get_object_size(minio::s3::Client& client, const std::string& object_name) +{ + minio::s3::StatObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + + minio::s3::StatObjectResponse response = client.StatObject(args); + + if (!response) { + LOG_ERROR("Failed to get object size: %s", object_name.c_str()); + return 0; + } + + return response.size; +} + +std::string +get_object_contents(minio::s3::Client& client, const std::string& object_name) +{ + std::stringstream ss; + + minio::s3::GetObjectArgs args; + args.bucket = s3_bucket_name; + args.object = object_name; + args.datafunc = [&ss](minio::http::DataFunctionArgs args) -> bool { + ss << args.datachunk; + return true; + }; + + // Call get object. + minio::s3::GetObjectResponse resp = client.GetObject(args); + + return ss.str(); +} + +bool +remove_items(minio::s3::Client& client, + const std::vector& item_keys) +{ + std::list objects; + for (const auto& key : item_keys) { + minio::s3::DeleteObject object; + object.name = key; + objects.push_back(object); + } + + minio::s3::RemoveObjectsArgs args; + args.bucket = s3_bucket_name; + + auto it = objects.begin(); + + args.func = [&objects = objects, + &i = it](minio::s3::DeleteObject& obj) -> bool { + if (i == objects.end()) + return false; + obj = *i; + i++; + return true; + }; + + minio::s3::RemoveObjectsResult result = client.RemoveObjects(args); + for (; result; result++) { + minio::s3::DeleteError err = *result; + if (!err) { + LOG_ERROR("Failed to delete object %s: %s", + err.object_name.c_str(), + err.message.c_str()); + return false; + } + } + + return true; +} +} // namespace + +ZarrStream* +setup() +{ + ZarrStreamSettings settings = { + .store_path = TEST, + .compression_settings = nullptr, + .data_type = ZarrDataType_uint16, + .version = ZarrVersion_3, + }; + + ZarrS3Settings s3_settings{ + .endpoint = s3_endpoint.c_str(), + .bucket_name = s3_bucket_name.c_str(), + .access_key_id = s3_access_key_id.c_str(), + .secret_access_key = s3_secret_access_key.c_str(), + }; + + settings.s3_settings = &s3_settings; + + CHECK_OK(ZarrStreamSettings_create_dimension_array(&settings, 5)); + + ZarrDimensionProperties* dim; + dim = settings.dimensions; + *dim = DIM("t", ZarrDimensionType_Time, array_timepoints, chunk_timepoints, shard_timepoints); + + dim = settings.dimensions + 1; + *dim = DIM("c", ZarrDimensionType_Channel, array_channels, chunk_channels, shard_channels); + + dim = settings.dimensions + 2; + *dim = DIM("z", ZarrDimensionType_Space, array_planes, chunk_planes, shard_planes); + + dim = settings.dimensions + 3; + *dim = DIM("y", ZarrDimensionType_Space, array_height, chunk_height, shard_height); + + dim = settings.dimensions + 4; + *dim = DIM("x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); + + auto* stream = ZarrStream_create(&settings); + + return stream; +} + +void +verify_base_metadata(const nlohmann::json& meta) +{ + const auto extensions = meta["extensions"]; + EXPECT_EQ(size_t, extensions.size(), 0); + + const auto encoding = meta["metadata_encoding"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); + + const auto suffix = meta["metadata_key_suffix"].get(); + EXPECT(suffix == ".json", + "Expected suffix to be '.json', but got '%s'", + suffix.c_str()); + + const auto zarr_format = meta["zarr_format"].get(); + EXPECT(encoding == "https://purl.org/zarr/spec/protocol/core/3.0", + "Expected encoding to be " + "'https://purl.org/zarr/spec/protocol/core/3.0', but got '%s'", + encoding.c_str()); +} + +void +verify_group_metadata(const nlohmann::json& meta) +{ + const auto multiscales = meta["attributes"]["multiscales"][0]; + + const auto axes = multiscales["axes"]; + EXPECT_EQ(size_t, axes.size(), 5); + std::string name, type, unit; + + name = axes[0]["name"]; + type = axes[0]["type"]; + EXPECT(name == "t", "Expected name to be 't', but got '", name, "'"); + EXPECT(type == "time", "Expected type to be 'time', but got '", type, "'"); + + name = axes[1]["name"]; + type = axes[1]["type"]; + EXPECT(name == "c", "Expected name to be 'c', but got '", name, "'"); + EXPECT( + type == "channel", "Expected type to be 'channel', but got '", type, "'"); + + name = axes[2]["name"]; + type = axes[2]["type"]; + EXPECT(name == "z", "Expected name to be 'z', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + + name = axes[3]["name"]; + type = axes[3]["type"]; + unit = axes[3]["unit"]; + EXPECT(name == "y", "Expected name to be 'y', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + name = axes[4]["name"]; + type = axes[4]["type"]; + unit = axes[4]["unit"]; + EXPECT(name == "x", "Expected name to be 'x', but got '", name, "'"); + EXPECT( + type == "space", "Expected type to be 'space', but got '", type, "'"); + EXPECT(unit == "micrometer", + "Expected unit to be 'micrometer', but got '", + unit, + "'"); + + const auto datasets = multiscales["datasets"][0]; + const std::string path = datasets["path"].get(); + EXPECT(path == "0", "Expected path to be '0', but got '", path, "'"); + + const auto coordinate_transformations = + datasets["coordinateTransformations"][0]; + + type = coordinate_transformations["type"].get(); + EXPECT( + type == "scale", "Expected type to be 'scale', but got '", type, "'"); + + const auto scale = coordinate_transformations["scale"]; + EXPECT_EQ(size_t, scale.size(), 5); + EXPECT_EQ(int, scale[0].get(), 1.0); + EXPECT_EQ(int, scale[1].get(), 1.0); + EXPECT_EQ(int, scale[2].get(), 1.0); + EXPECT_EQ(int, scale[3].get(), 1.0); + EXPECT_EQ(int, scale[4].get(), 1.0); +} + +void +verify_array_metadata(const nlohmann::json& meta) +{ + const auto& shape = meta["shape"]; + EXPECT_EQ(size_t, shape.size(), 5); + EXPECT_EQ(int, shape[0].get(), array_timepoints); + EXPECT_EQ(int, shape[1].get(), array_channels); + EXPECT_EQ(int, shape[2].get(), array_planes); + EXPECT_EQ(int, shape[3].get(), array_height); + EXPECT_EQ(int, shape[4].get(), array_width); + + const auto& chunks = meta["chunk_grid"]["chunk_shape"]; + EXPECT_EQ(size_t, chunks.size(), 5); + EXPECT_EQ(int, chunks[0].get(), chunk_timepoints); + EXPECT_EQ(int, chunks[1].get(), chunk_channels); + EXPECT_EQ(int, chunks[2].get(), chunk_planes); + EXPECT_EQ(int, chunks[3].get(), chunk_height); + EXPECT_EQ(int, chunks[4].get(), chunk_width); + + const auto& shards = + meta["storage_transformers"][0]["configuration"]["chunks_per_shard"]; + EXPECT_EQ(size_t, shards.size(), 5); + EXPECT_EQ(int, shards[0].get(), shard_timepoints); + EXPECT_EQ(int, shards[1].get(), shard_channels); + EXPECT_EQ(int, shards[2].get(), shard_planes); + EXPECT_EQ(int, shards[3].get(), shard_height); + EXPECT_EQ(int, shards[4].get(), shard_width); + + const auto dtype = meta["data_type"].get(); + EXPECT(dtype == "uint16", + "Expected dtype to be 'uint16', but got '", + dtype, + "'"); + + const auto& compressor = meta["compressor"]; + EXPECT(compressor.is_null(), + "Expected compressor to be null, but got '%s'", + compressor.dump().c_str()); +} + +void +verify_and_cleanup() +{ + minio::s3::BaseUrl url(s3_endpoint); + url.https = s3_endpoint.starts_with("https://"); + + minio::creds::StaticProvider provider(s3_access_key_id, + s3_secret_access_key); + minio::s3::Client client(url, &provider); + + std::string base_metadata_path = TEST "/zarr.json"; + std::string group_metadata_path = TEST "/meta/root.group.json"; + std::string array_metadata_path = TEST "/meta/root/0.array.json"; + + { + EXPECT(object_exists(client, base_metadata_path), + "Object does not exist: %s", + base_metadata_path.c_str()); + std::string contents = get_object_contents(client, base_metadata_path); + nlohmann::json base_metadata = nlohmann::json::parse(contents); + + verify_base_metadata(base_metadata); + } + + { + EXPECT(object_exists(client, group_metadata_path), + "Object does not exist: %s", + group_metadata_path.c_str()); + std::string contents = get_object_contents(client, group_metadata_path); + nlohmann::json group_metadata = nlohmann::json::parse(contents); + + verify_group_metadata(group_metadata); + } + + { + EXPECT(object_exists(client, array_metadata_path), + "Object does not exist: %s", + array_metadata_path.c_str()); + std::string contents = get_object_contents(client, array_metadata_path); + nlohmann::json array_metadata = nlohmann::json::parse(contents); + + verify_array_metadata(array_metadata); + } + + CHECK(remove_items( + client, + { base_metadata_path, group_metadata_path, array_metadata_path })); + + const auto chunk_size = chunk_width * chunk_height * chunk_planes * + chunk_channels * chunk_timepoints * nbytes_px; + const auto index_size = chunks_per_shard * + sizeof(uint64_t) * // indices are 64 bits + 2; // 2 indices per chunk + const auto expected_file_size = shard_width * shard_height * shard_planes * + shard_channels * shard_timepoints * + chunk_size + + index_size; + + // verify and clean up data files + std::vector data_files; + std::string data_root = TEST "/data/root/0"; + + for (auto t = 0; t < shards_in_t; ++t) { + const auto t_dir = data_root + "/" + ("c" + std::to_string(t)); + + for (auto c = 0; c < shards_in_c; ++c) { + const auto c_dir = t_dir + "/" + std::to_string(c); + + for (auto z = 0; z < shards_in_z; ++z) { + const auto z_dir = c_dir + "/" + std::to_string(z); + + for (auto y = 0; y < shards_in_y; ++y) { + const auto y_dir = z_dir + "/" + std::to_string(y); + + for (auto x = 0; x < shards_in_x; ++x) { + const auto x_file = y_dir + "/" + std::to_string(x); + EXPECT(object_exists(client, x_file), + "Object does not exist: %s", + x_file.c_str()); + const auto file_size = get_object_size(client, x_file); + EXPECT_EQ(size_t, file_size, expected_file_size); + } + } + } + } + } +} + +int +main() +{ + if (!get_credentials()) { + LOG_WARNING("Failed to get credentials. Skipping test."); + return 0; + } + + Zarr_set_log_level(ZarrLogLevel_Debug); + + auto* stream = setup(); + std::vector frame(array_width * array_height, 0); + + int retval = 1; + + try { + size_t bytes_out; + for (auto i = 0; i < frames_to_acquire; ++i) { + ZarrStatusCode status = ZarrStream_append( + stream, frame.data(), bytes_of_frame, &bytes_out); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ": ", + Zarr_get_status_message(status)); + EXPECT_EQ(size_t, bytes_out, bytes_of_frame); + } + + ZarrStream_destroy(stream); + + verify_and_cleanup(); + + retval = 0; + } catch (const std::exception& e) { + LOG_ERROR("Caught exception: %s", e.what()); + } + + return retval; +} diff --git a/tests/integration/test.macros.hh b/tests/integration/test.macros.hh new file mode 100644 index 0000000..3c79024 --- /dev/null +++ b/tests/integration/test.macros.hh @@ -0,0 +1,58 @@ +#pragma once + +#include "logger.hh" + +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + const std::string __err = LOG_ERROR(__VA_ARGS__); \ + throw std::runtime_error(__err); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t", #e) + +/// Check that a==b +/// example: `ASSERT_EQ(int,42,meaning_of_life())` +#define EXPECT_EQ(T, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ == b_, "Expected ", #a, " == ", #b, " but ", a_, " != ", b_); \ + } while (0) + +#define CHECK_OK(e) CHECK((e) == ZarrStatusCode_Success) + +#define EXPECT_STR_EQ(a, b) \ + do { \ + std::string a_ = (a) ? (a) : ""; \ + std::string b_ = (b) ? (b) : ""; \ + EXPECT(a_ == b_, \ + "Expected ", \ + #a, \ + " == ", \ + #b, \ + " but ", \ + a_, \ + " != ", \ + b_, \ + #a, \ + #b, \ + a_, \ + b_); \ + } while (0) + +#define EXPECT_LT(T, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ < b_, "Expected ", #a, " < ", #b, " but ", a_, " >= ", b_); \ + } while (0) + +#define SIZED(str) str, sizeof(str) +#define DIM(name_, type_, array_size, chunk_size, shard_size) \ + { \ + .name = (name_), .type = (type_), \ + .array_size_px = (array_size), .chunk_size_px = (chunk_size), \ + .shard_size_chunks = (shard_size) \ + } \ No newline at end of file diff --git a/tests/unit-tests/create-stream.cpp b/tests/unit-tests/create-stream.cpp index 468e37e..2dc8e39 100644 --- a/tests/unit-tests/create-stream.cpp +++ b/tests/unit-tests/create-stream.cpp @@ -69,10 +69,11 @@ main() } // cleanup - if (fs::is_directory(settings.store_path)) { - fs::remove_all(settings.store_path); - } - ZarrStreamSettings_destroy_dimension_array(&settings); ZarrStream_destroy(stream); + + std::error_code ec; + if (fs::is_directory(settings.store_path) && !fs::remove_all(settings.store_path, ec)) { + LOG_ERROR("Failed to remove store path: ", ec.message().c_str()); + } return retval; } \ No newline at end of file diff --git a/tests/unit-tests/zarrv2-writer-write-even.cpp b/tests/unit-tests/zarrv2-writer-write-even.cpp index 0149bd8..6f94f19 100644 --- a/tests/unit-tests/zarrv2-writer-write-even.cpp +++ b/tests/unit-tests/zarrv2-writer-write-even.cpp @@ -94,11 +94,9 @@ main() "y", ZarrDimensionType_Space, array_height, chunk_height, 0); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, 0); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = level_of_detail, .bucket_name = std::nullopt, @@ -111,8 +109,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0)); for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); diff --git a/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp b/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp index cfae968..245c35f 100644 --- a/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp +++ b/tests/unit-tests/zarrv2-writer-write-ragged-append-dim.cpp @@ -78,11 +78,9 @@ main() "y", ZarrDimensionType_Space, array_height, chunk_height, 0); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, 0); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = level_of_detail, .bucket_name = std::nullopt, @@ -95,8 +93,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0)); for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); diff --git a/tests/unit-tests/zarrv2-writer-write-ragged-internal-dim.cpp b/tests/unit-tests/zarrv2-writer-write-ragged-internal-dim.cpp index cd42b69..0f39b61 100644 --- a/tests/unit-tests/zarrv2-writer-write-ragged-internal-dim.cpp +++ b/tests/unit-tests/zarrv2-writer-write-ragged-internal-dim.cpp @@ -85,11 +85,9 @@ main() "y", ZarrDimensionType_Space, array_height, chunk_height, 0); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, 0); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = level_of_detail, .bucket_name = std::nullopt, @@ -102,8 +100,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0)); for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); diff --git a/tests/unit-tests/zarrv3-writer-write-even.cpp b/tests/unit-tests/zarrv3-writer-write-even.cpp index 0b9b326..ae06273 100644 --- a/tests/unit-tests/zarrv3-writer-write-even.cpp +++ b/tests/unit-tests/zarrv3-writer-write-even.cpp @@ -59,8 +59,8 @@ check_json() nlohmann::json meta = nlohmann::json::parse(f); EXPECT(meta["data_type"].get() == "uint16", - "Expected dtype to be 'uint16', but got '%s'", - meta["data_type"].get().c_str()); + "Expected dtype to be uint16, but got ", + meta["data_type"].get()); const auto& array_shape = meta["shape"]; const auto& chunk_shape = meta["chunk_grid"]["chunk_shape"]; @@ -102,7 +102,7 @@ main() try { auto thread_pool = std::make_shared( std::thread::hardware_concurrency(), - [](const std::string& err) { LOG_ERROR("Error: %s", err.c_str()); }); + [](const std::string& err) { LOG_ERROR("Error: ", err.c_str()); }); std::vector dims; dims.emplace_back("t", @@ -127,11 +127,9 @@ main() shard_height); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = level_of_detail, .bucket_name = std::nullopt, @@ -144,8 +142,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0)); for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); @@ -211,7 +208,7 @@ main() retval = 0; } catch (const std::exception& exc) { - LOG_ERROR("Exception: %s\n", exc.what()); + LOG_ERROR("Exception: ", exc.what()); } // cleanup diff --git a/tests/unit-tests/zarrv3-writer-write-ragged-append-dim.cpp b/tests/unit-tests/zarrv3-writer-write-ragged-append-dim.cpp index 1b4bc60..f31d7e7 100644 --- a/tests/unit-tests/zarrv3-writer-write-ragged-append-dim.cpp +++ b/tests/unit-tests/zarrv3-writer-write-ragged-append-dim.cpp @@ -47,8 +47,8 @@ check_json() nlohmann::json meta = nlohmann::json::parse(f); EXPECT(meta["data_type"].get() == "int32", - "Expected dtype to be 'int32', but got '%s'", - meta["data_type"].get().c_str()); + "Expected dtype to be int32, but got ", + meta["data_type"].get()); const auto& array_shape = meta["shape"]; const auto& chunk_shape = meta["chunk_grid"]["chunk_shape"]; @@ -84,7 +84,7 @@ main() try { auto thread_pool = std::make_shared( std::thread::hardware_concurrency(), - [](const std::string& err) { LOG_ERROR("Error: %s", err.c_str()); }); + [](const std::string& err) { LOG_ERROR("Error: ", err.c_str()); }); std::vector dims; dims.emplace_back("z", @@ -99,11 +99,9 @@ main() shard_height); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = 4, .bucket_name = std::nullopt, @@ -116,8 +114,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0)); for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); @@ -166,7 +163,7 @@ main() retval = 0; } catch (const std::exception& exc) { - LOG_ERROR("Exception: %s\n", exc.what()); + LOG_ERROR("Exception: ", exc.what()); } // cleanup diff --git a/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp b/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp index 24a0d7b..0affa4e 100644 --- a/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp +++ b/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp @@ -55,8 +55,8 @@ check_json() nlohmann::json meta = nlohmann::json::parse(f); EXPECT(meta["data_type"].get() == "float64", - "Expected dtype to be 'uint16', but got '%s'", - meta["data_type"].get().c_str()); + "Expected dtype to be uint16, but got ", + meta["data_type"].get()); const auto& array_shape = meta["shape"]; const auto& chunk_shape = meta["chunk_grid"]["chunk_shape"]; @@ -95,7 +95,7 @@ main() try { auto thread_pool = std::make_shared( std::thread::hardware_concurrency(), - [](const std::string& err) { LOG_ERROR("Error: %s", err.c_str()); }); + [](const std::string& err) { LOG_ERROR("Error: ", err.c_str()); }); std::vector dims; dims.emplace_back("t", @@ -115,11 +115,9 @@ main() shard_height); dims.emplace_back( "x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); - auto dimensions = - std::make_unique(std::move(dims), dtype); zarr::ArrayWriterConfig config = { - .dimensions = std::move(dimensions), + .dimensions = std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = 5, .bucket_name = std::nullopt, @@ -132,8 +130,7 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data_(frame_size, std::byte(0)); - std::span data(data_); + std::vector data(frame_size, std::byte(0));; for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); @@ -171,7 +168,7 @@ main() const auto x_file = y_dir / std::to_string(x); CHECK(fs::is_regular_file(x_file)); const auto file_size = fs::file_size(x_file); - CHECK(file_size == expected_file_size); + EXPECT_EQ(int, file_size, expected_file_size); } CHECK(!fs::is_regular_file(y_dir / @@ -189,7 +186,7 @@ main() retval = 0; } catch (const std::exception& exc) { - LOG_ERROR("Exception: %s\n", exc.what()); + LOG_ERROR("Exception: ", exc.what()); } // cleanup