diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ee0375e..1d4697df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support for [Zarr v3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html). +- Support for + the [sharding storage transformer](https://web.archive.org/web/20230213221154/https://zarr-specs.readthedocs.io/en/latest/extensions/storage-transformers/sharding/v1.0.html) + in Zarr v3. - Ship debug libs for C-Blosc on Linux and Mac. ### Changed @@ -34,7 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- `ChunkWriter`s need to specify which multiscale layer they write to. +- `ZarrV2Writer`s need to specify which multiscale layer they write to. - The Zarr writer now validates that image and tile shapes are set and compatible with each other before the first append. diff --git a/README.md b/README.md index 8fc700b6..1612d285 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,9 @@ This is an Acquire Driver that supports chunked streaming to [zarr][]. - **Zarr** - **ZarrBlosc1ZstdByteShuffle** - **ZarrBlosc1Lz4ByteShuffle** +- **ZarrV3** +- **ZarrV3Blosc1ZstdByteShuffle** +- **ZarrV3Blosc1Lz4ByteShuffle** ## Using the Zarr storage device @@ -24,67 +27,108 @@ Chunking is configured using `storage_properties_set_chunking_props()` when conf Multiscale storage can be enabled or disabled by calling `storage_properties_set_enable_multiscale()` when configuring the video stream. +For the [Zarr v3] version of each device, you can use the `ZarrV3*` devices. +**Note:** Zarr v3 is not [yet](https://github.com/ome/ngff/pull/206) supported +by [ome-zarr-py](https://github.com/ome/ome-zarr-py), so you +will not be able to read multiscale metadata from the resulting dataset. + +Zarr v3 *is* supported by [zarr-python](https://github.com/zarr-developers/zarr-python), but you will need to set two +environment variables to work with it: + +```bash +export ZARR_V3_EXPERIMENTAL_API=1 +export ZARR_V3_SHARDING=1 +``` + +You can also set these variables in your Python script: + +```python +import os + +# these MUST come before importing zarr +os.environ["ZARR_V3_EXPERIMENTAL_API"] = "1" +os.environ["ZARR_V3_SHARDING"] = "1" + +import zarr +``` + ### Configuring chunking You can configure chunking by calling `storage_properties_set_chunking_props()` on your `StorageProperties` object _after_ calling `storage_properties_init()`. -There are 4 parameters you can set to determine the chunk size, namely `tile_width`, `tile_height`, `tile_planes`, -and `bytes_per_chunk`: +There are 3 parameters you can set to determine the chunk size, namely `chunk_width`, `chunk_height`, +and `chunk_planes`: ```c int storage_properties_set_chunking_props(struct StorageProperties* out, - uint32_t tile_width, - uint32_t tile_height, - uint32_t tile_planes, - uint64_t max_bytes_per_chunk) + uint32_t chunk_width, + uint32_t chunk_height, + uint32_t chunk_planes) ``` | ![frames](https://github.com/aliddell/acquire-driver-zarr/assets/844464/3510d468-4751-4fa0-b2bf-0e29a5f3ea1c) | -|:--:| -| A collection of frames. | +|:-------------------------------------------------------------------------------------------------------------:| +| A collection of frames. | A _tile_ is a contiguous section, or region of interest, of a _frame_. | ![tiles](https://github.com/aliddell/acquire-driver-zarr/assets/844464/f8d16139-e0ac-44db-855f-2f5ef305c98b) | -|:--:| -| A collection of frames, divided into tiles. | +|:------------------------------------------------------------------------------------------------------------:| +| A collection of frames, divided into tiles. | A _chunk_ is nothing more than some number of stacked tiles from subsequent frames, with each tile in a chunk having the same ROI in its respective frame. -| ![chunks](https://github.com/aliddell/acquire-driver-zarr/assets/844464/653e4d82-363e-4e04-9a42-927b052fb6e7) | -|:--:| -| A collection of frames, divided into tiles. A single chunk has been highlighted in red. | +| ![chunks](https://github.com/aliddell/acquire-driver-zarr/assets/844464/653e4d82-363e-4e04-9a42-927b052fb6e7) | +|:-------------------------------------------------------------------------------------------------------------:| +| A collection of frames, divided into tiles. A single chunk has been highlighted in red. | -You can specify the width and height, in pixels, of each tile, and if your frame size has more than one plane, you can -specify the number of planes you want per tile as well. +You can specify the width and height, in pixels, of each tile. If any of these values are unset (equivalently, set to 0), or if they are set to a value larger than the frame size, the full value of the frame size along that dimension will be used instead. You should take care that the values you select won't result in tile sizes that are too small or too large for your application. - -The `max_bytes_per_chunk` parameter can be used to cap the size of a chunk. -A minimum of 16 MiB is enforced, but no maximum, so if you are compressing you must ensure that you have sufficient -memory for all your chunks to be stored in memory at once. +You can also set the number of tile *planes* to concatenate into a chunk. +If this value is unset (or set to 0), it will default to a prescribed minimum value of 32. #### Example -Suppose your frame size is 1920 x 1080 x 1, with a pixel type of unsigned 8-bit integer. -You can use a tile size of 640 x 360 x 1, which will divide your frame evenly into 9 tiles. -You want chunk sizes of at most 64 MiB. +Suppose your frame size is 1920 x 1080, with a pixel type of unsigned 8-bit integer. +You can use a tile size of 640 x 360, which will divide your frame evenly into 9 tiles. +You want chunk sizes of at most 32 MiB and this works out to 32 * 2^20 / (640 * 360) = 145.63555555555556, so you select +145 chunk planes. You would configure your storage properties as follows: ```c storage_properties_set_chunking_props(&storage_props, 640, 360, - 1, - 64 * 1024 * 1024); + 145); ``` -Note that 64 * 1024 * 1024 / (640 * 360) = 291.2711111111111, so each chunk will contain 291 tiles, or about 63.94 MiB -raw, before compression. +### Configuring sharding + +Configuring sharding is similar to configuring chunking. +You can configure sharding by calling `storage_properties_set_sharding_props()` on your `StorageProperties` object +_after_ calling `storage_properties_init()`. +There are 3 parameters you can set to determine the shard size, namely `shard_width`, `shard_height`, +and `shard_planes`. +**Note:** whereas the unit for the width, height, and plane values when chunking is *pixels*, when sharding, the unit is +*chunks*. +So in the previous example, if you wanted combine all your chunks together into a single shard, you would set your shard +properties like so: + +```c +storage_properties_set_sharding_props(&storage_props, + 3, // width: 1920 / 640 + 3, // height: 1080 / 360 + 1); +``` + +This would result in all 9 chunks being combined into a single shard. + +```c ### Compression @@ -120,3 +164,5 @@ Then the sequence of levels will have dimensions 1920 x 1080, 960 x 540, 480 x 2 [Blosc]: https://github.com/Blosc/c-blosc [Blosc docs]: https://www.blosc.org/ + +[Zarr v3]: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html \ No newline at end of file diff --git a/examples/no-striping.cpp b/examples/no-striping.cpp index 3d2da327..afc56296 100644 --- a/examples/no-striping.cpp +++ b/examples/no-striping.cpp @@ -1,8 +1,8 @@ /// @file /// @brief Generate a Zarr dataset with a single chunk using the simulated -/// radial sine pattern with a u16 sample type. This example was used to generate -/// data for a visual EXAMPLE of a fix for a striping artifact observed when -/// writing to a Zarr dataset with multibyte samples. +/// radial sine pattern with a u16 sample type. This example was used to +/// generate data for a visual EXAMPLE of a fix for a striping artifact observed +/// when writing to a Zarr dataset with multibyte samples. #include "device/hal/device.manager.h" #include "acquire.h" @@ -88,7 +88,7 @@ reporter(int is_error, const static uint32_t frame_width = 1280; const static uint32_t frame_height = 720; -const static uint32_t expected_frames_per_chunk = 30; +const static uint32_t frames_per_chunk = 30; void acquire(AcquireRuntime* runtime, const char* filename) @@ -120,8 +120,10 @@ acquire(AcquireRuntime* runtime, const char* filename) sizeof(external_metadata), sample_spacing_um); - storage_properties_set_chunking_props( - &props.video[0].storage.settings, frame_width, frame_height, 1, 64 << 20); + storage_properties_set_chunking_props(&props.video[0].storage.settings, + frame_width, + frame_height, + frames_per_chunk); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u16; @@ -129,7 +131,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 2e5; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = frames_per_chunk; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -164,13 +166,13 @@ main() ASSERT_STREQ("()); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); ASSERT_EQ(int, "%d", frame_height, chunks[2]); ASSERT_EQ(int, "%d", frame_width, chunks[3]); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5ac73553..bcb6c7d7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,19 +6,22 @@ endif () set(tgt acquire-driver-zarr) add_library(${tgt} MODULE - prelude.h common.hh common.cpp writers/writer.hh writers/writer.cpp - writers/chunk.writer.hh - writers/chunk.writer.cpp + writers/zarrv2.writer.hh + writers/zarrv2.writer.cpp + writers/zarrv3.writer.hh + writers/zarrv3.writer.cpp writers/blosc.compressor.hh writers/blosc.compressor.cpp zarr.hh zarr.cpp zarr.v2.hh zarr.v2.cpp + zarr.v3.hh + zarr.v3.cpp zarr.driver.c ) target_enable_simd(${tgt}) diff --git a/src/README.md b/src/README.md index 7a9fba97..6e6321c3 100644 --- a/src/README.md +++ b/src/README.md @@ -2,40 +2,44 @@ ## Components -### The `StorageInterface` class. - -Defines the interface that all Acquire `Storage` devices must implement, namely - -- `set`: Set the storage properties. -- `get`: Get the storage properties. -- `get_meta`: Get metadata for the storage properties. -- `start`: Signal to the `Storage` device that it should start accepting frames. -- `stop`: Signal to the `Storage` device that it should stop accepting frames. -- `append`: Write incoming frames to the filesystem or other storage layer. -- `reserve_image_shape`: Set the image shape for allocating chunk writers. - ### The `Zarr` class -An abstract class that implements the `StorageInterface`. -Zarr is "[a file storage format for chunked, compressed, N-dimensional arrays based on an open-source specification.](https://zarr.readthedocs.io/en/stable/index.html)" +An abstract class that implements the `Storage` device interface. +Zarr +is "[a file storage format for chunked, compressed, N-dimensional arrays based on an open-source specification.](https://zarr.readthedocs.io/en/stable/index.html)" ### The `ZarrV2` class Subclass of the `Zarr` class. Implements abstract methods for writer allocation and metadata. -Specifically, `ZarrV2` allocates one writer of type `ChunkWriter` for each multiscale level-of-detail +Specifically, `ZarrV2` allocates one writer of type `ZarrV2Writer` for each multiscale level-of-detail and writes metadata in the format specified by the [Zarr V2 spec](https://zarr.readthedocs.io/en/stable/spec/v2.html). +### The `ZarrV3` class + +Subclass of the `Zarr` class. +Implements abstract methods for writer allocation and metadata. +Specifically, `ZarrV3` allocates one writer of type `ZarrV3Writer` for each multiscale level-of-detail +and writes metadata in the format specified by +the [Zarr V3 spec](https://zarr-specs.readthedocs.io/en/latest/specs.html). + ### The `Writer` class An abstract class that writes frames to the filesystem or other storage layer. In general, frames are chunked and potentially compressed. The `Writer` handles chunking, chunk compression, and writing. -### The `ChunkWriter` class +### The `ZarrV2Writer` class Subclass of the `Writer` class. Implements abstract methods relating to writing and flushing chunk buffers. +Chunk buffers, whether raw or compressed, are written to individual chunk files. + +### The `ZarrV3Writer` class + +Subclass of the `Writer` class. +Implements abstract methods relating to writing, sharding, and flushing chunk buffers. +Chunk buffers, whether raw or compressed, are concatenated into shards, which are written out to individual shard files. ### The `BloscCompressionParams` struct diff --git a/src/acquire-core-libs b/src/acquire-core-libs index 983719dd..8bea9c8a 160000 --- a/src/acquire-core-libs +++ b/src/acquire-core-libs @@ -1 +1 @@ -Subproject commit 983719dd2d184fd1516e49b9809fe4918b6c1dc0 +Subproject commit 8bea9c8ab8806d481ae943d3577c9adcede98672 diff --git a/src/common.cpp b/src/common.cpp index 1f3b69c7..e762455f 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -1,10 +1,89 @@ #include "common.hh" +#include "zarr.hh" #include "platform.h" #include +#include namespace common = acquire::sink::zarr::common; +common::ThreadPool::ThreadPool(size_t n_threads, + std::function err) + : error_handler_{ err } +{ + if (n_threads == 0) { + throw std::runtime_error("Cannot create thread pool with 0 threads."); + } + + for (auto i = 0; i < n_threads; ++i) { + threads_.emplace_back([this] { thread_worker_(); }); + } +} + +common::ThreadPool::~ThreadPool() noexcept +{ + await_stop(); +} + +void +common::ThreadPool::push_to_job_queue(JobT&& job) +{ + std::unique_lock lock(jobs_mutex_); + jobs_.push(std::move(job)); + lock.unlock(); + + cv_.notify_one(); +} + +void +common::ThreadPool::await_stop() noexcept +{ + should_stop_ = true; + cv_.notify_all(); + + // spin down threads + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } +} + +std::optional +common::ThreadPool::pop_from_job_queue_() noexcept +{ + if (jobs_.empty()) { + return std::nullopt; + } + + auto job = std::move(jobs_.front()); + jobs_.pop(); + return job; +} + +void +common::ThreadPool::thread_worker_() +{ + TRACE("Worker thread starting."); + + while (true) { + std::unique_lock lock(jobs_mutex_); + cv_.wait(lock, [&] { return should_stop_ || !jobs_.empty(); }); + + if (should_stop_) { + break; + } + + if (auto job = pop_from_job_queue_(); job.has_value()) { + lock.unlock(); + if (std::string err_msg; !job.value()(err_msg)) { + error_handler_(err_msg); + } + } + } + + TRACE("Worker thread exiting."); +} size_t common::bytes_per_tile(const ImageDims& tile_shape, const SampleType& type) diff --git a/src/common.hh b/src/common.hh index 4ebf8a83..0ea45edf 100644 --- a/src/common.hh +++ b/src/common.hh @@ -1,11 +1,34 @@ -#ifndef ACQUIRE_DRIVER_ZARR_COMMON_H -#define ACQUIRE_DRIVER_ZARR_COMMON_H - -#include "prelude.h" +#ifndef H_ACQUIRE_STORAGE_ZARR_COMMON_V0 +#define H_ACQUIRE_STORAGE_ZARR_COMMON_V0 +#include "logger.h" #include "device/props/components.h" +#include #include +#include +#include +#include +#include +#include +#include + +#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + LOGE(__VA_ARGS__); \ + throw std::runtime_error("Expression was false: " #e); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) + +// #define TRACE(...) LOG(__VA_ARGS__) +#define TRACE(...) + +#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V))) +#define countof(e) (sizeof(e) / sizeof(*(e))) namespace fs = std::filesystem; @@ -21,7 +44,40 @@ struct ImageDims } }; +struct Zarr; + namespace common { +struct ThreadPool final +{ + public: + using JobT = std::function; + + // The error handler `err` is called when a job returns false. This + // can happen when the job encounters an error, or otherwise fails. The + // std::string& argument to the error handler is a diagnostic message from + // the failing job and is logged to the error stream by the Zarr driver when + // the next call to `append()` is made. + ThreadPool(size_t n_threads, std::function err); + ~ThreadPool() noexcept; + + void push_to_job_queue(JobT&& job); + + void await_stop() noexcept; + + private: + std::function error_handler_; + + std::vector threads_; + mutable std::mutex jobs_mutex_; + std::condition_variable cv_; + std::queue jobs_; + + std::atomic should_stop_; + + /// Multithreading + std::optional pop_from_job_queue_() noexcept; + void thread_worker_(); +}; size_t bytes_per_tile(const ImageDims& tile_shape, const SampleType& type); @@ -56,4 +112,4 @@ write_string(const std::string& path, const std::string& value); } // namespace acquire::sink::zarr::common } // namespace acquire::sink::zarr -#endif // ACQUIRE_DRIVER_ZARR_COMMON_H +#endif // H_ACQUIRE_STORAGE_ZARR_COMMON_V0 diff --git a/src/prelude.h b/src/prelude.h deleted file mode 100644 index 603496cd..00000000 --- a/src/prelude.h +++ /dev/null @@ -1,28 +0,0 @@ -/// Do not include in public headers. -/// Contains common macros used for logging/error handling with this module - -#ifndef H_ACQUIRE_STORAGE_ZARR_PRELUDE_V0 -#define H_ACQUIRE_STORAGE_ZARR_PRELUDE_V0 - -#include "logger.h" - -#include - -#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) -#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) -#define EXPECT(e, ...) \ - do { \ - if (!(e)) { \ - LOGE(__VA_ARGS__); \ - throw std::runtime_error("Expression was false: " #e); \ - } \ - } while (0) -#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) - -//#define TRACE(...) LOG(__VA_ARGS__) -#define TRACE(...) - -#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V))) -#define countof(e) (sizeof(e) / sizeof(*(e))) - -#endif // H_ACQUIRE_STORAGE_ZARR_PRELUDE_V0 diff --git a/src/writers/chunk.writer.cpp b/src/writers/chunk.writer.cpp deleted file mode 100644 index cb650db1..00000000 --- a/src/writers/chunk.writer.cpp +++ /dev/null @@ -1,215 +0,0 @@ -#include "chunk.writer.hh" -#include "../zarr.hh" - -#include -#include - -namespace zarr = acquire::sink::zarr; - -zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims, - const ImageDims& tile_dims, - uint32_t frames_per_chunk, - const std::string& data_root, - Zarr* zarr) - : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, zarr) -{ -} - -zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims, - const ImageDims& tile_dims, - uint32_t frames_per_chunk, - const std::string& data_root, - Zarr* zarr, - const BloscCompressionParams& compression_params) - : Writer(frame_dims, - tile_dims, - frames_per_chunk, - data_root, - zarr, - compression_params) -{ -} - -bool -zarr::ChunkWriter::write(const VideoFrame* frame) noexcept -{ - using namespace std::chrono_literals; - - if (!validate_frame_(frame)) { - // log is written in validate_frame - return false; - } - - try { - if (chunk_buffers_.empty()) { - make_buffers_(); - } - - // write out - bytes_to_flush_ += - write_bytes_(frame->data, frame->bytes_of_frame - sizeof(*frame)); - - ++frames_written_; - - // rollover if necessary - const auto frames_this_chunk = frames_written_ % frames_per_chunk_; - if (frames_written_ > 0 && frames_this_chunk == 0) { - flush_(); - rollover_(); - } - return true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, sizeof(buf), "Failed to write frame: %s", exc.what()); - zarr_->set_error(buf); - } catch (...) { - char buf[32]; - snprintf(buf, sizeof(buf), "Failed to write frame (unknown)"); - zarr_->set_error(buf); - } - - return false; -} - -void -zarr::ChunkWriter::make_buffers_() noexcept -{ - const auto nchunks = tiles_per_frame_(); - chunk_buffers_.resize(nchunks); - buffers_ready_ = new bool[nchunks]; - std::fill(buffers_ready_, buffers_ready_ + nchunks, true); - - const auto bytes_per_px = bytes_of_type(pixel_type_); - const auto bytes_per_tile = - tile_dims_.cols * tile_dims_.rows * bytes_per_px; - - for (auto i = 0; i < chunk_buffers_.size(); ++i) { - auto& buf = chunk_buffers_.at(i); - buf.resize(frames_per_chunk_ * bytes_per_tile); - std::fill(buf.begin(), buf.end(), 0); - } -} - -size_t -zarr::ChunkWriter::write_bytes_(const uint8_t* buf, size_t buf_size) noexcept -{ - const auto bytes_per_px = bytes_of_type(pixel_type_); - const auto bytes_per_tile = - tile_dims_.cols * tile_dims_.rows * bytes_per_px; - const auto frames_this_chunk = frames_written_ % frames_per_chunk_; - - size_t bytes_written = 0; - - for (auto i = 0; i < tiles_per_frame_y_; ++i) { - for (auto j = 0; j < tiles_per_frame_x_; ++j) { - size_t offset = bytes_per_tile * frames_this_chunk; - - uint8_t* bytes_out = - chunk_buffers_.at(i * tiles_per_frame_x_ + j).data(); - for (auto k = 0; k < tile_dims_.rows; ++k) { - const auto frame_row = i * tile_dims_.rows + k; - if (frame_row < frame_dims_.rows) { - const auto frame_col = j * tile_dims_.cols; - - const auto buf_offset = - bytes_per_px * - (frame_row * frame_dims_.cols + frame_col); - - const auto region_width = - std::min(frame_col + tile_dims_.cols, frame_dims_.cols) - - frame_col; - - const auto nbytes = region_width * bytes_per_px; - memcpy(bytes_out + offset, buf + buf_offset, nbytes); - } - offset += tile_dims_.cols * bytes_per_px; - } - bytes_written += bytes_per_tile; - } - } - - return bytes_written; -} - -void -zarr::ChunkWriter::flush_() noexcept -{ - if (bytes_to_flush_ == 0) { - return; - } - - using namespace std::chrono_literals; - const auto bytes_per_px = bytes_of_type(pixel_type_); - const auto bytes_per_tile = - tile_dims_.cols * tile_dims_.rows * bytes_per_px; - if (bytes_to_flush_ % bytes_per_tile != 0) { - LOGE("Expected bytes to flush to be a multiple of the " - "number of bytes per tile."); - } - - // create chunk files if necessary - if (files_.empty() && !make_files_()) { - zarr_->set_error("Failed to flush."); - return; - } - - // compress buffers and write out - auto buf_sizes = compress_buffers_(); - std::fill(buffers_ready_, buffers_ready_ + chunk_buffers_.size(), false); - { - std::scoped_lock lock(buffers_mutex_); - for (auto i = 0; i < files_.size(); ++i) { - auto& buf = chunk_buffers_.at(i); - zarr_->push_to_job_queue(std::move( - [fh = &files_.at(i), - data = buf.data(), - size = buf_sizes.at(i), - finished = buffers_ready_ + i](std::string& err) -> bool { - bool success = false; - try { - success = file_write(fh, 0, data, data + size); - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to write chunk: %s", - exc.what()); - err = buf; - } catch (...) { - err = "Unknown error"; - } - *finished = true; - - return success; - })); - } - } - - // wait for all threads to finish - while (!std::all_of(buffers_ready_, - buffers_ready_ + chunk_buffers_.size(), - [](const auto& b) { return b; })) { - std::this_thread::sleep_for(500us); - } - - // reset buffers - const auto bytes_per_chunk = - tile_dims_.cols * tile_dims_.rows * bytes_per_px * frames_per_chunk_; - for (auto& buf : chunk_buffers_) { - // absurd edge case we need to account for - if (buf.size() > bytes_per_chunk) { - buf.resize(bytes_per_chunk); - } - - std::fill(buf.begin(), buf.end(), 0); - } - bytes_to_flush_ = 0; -} - -bool -zarr::ChunkWriter::make_files_() noexcept -{ - file_creator_.set_base_dir(data_root_ / std::to_string(current_chunk_)); - return file_creator_.create( - 1, tiles_per_frame_y_, tiles_per_frame_x_, files_); -} diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index f9bb9245..18c94403 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -4,140 +4,115 @@ #include #include +#include namespace zarr = acquire::sink::zarr; /// DirectoryCreator -zarr::FileCreator::FileCreator(Zarr* zarr) - : zarr_{ zarr } +zarr::FileCreator::FileCreator(std::shared_ptr thread_pool) + : thread_pool_{ thread_pool } { } -void -zarr::FileCreator::set_base_dir(const fs::path& base_dir) noexcept -{ - base_dir_ = base_dir; - fs::create_directories(base_dir_); -} - bool -zarr::FileCreator::create(int n_c, - int n_y, - int n_x, - std::vector& files) noexcept +zarr::FileCreator::create_files(const fs::path& base_dir, + int n_c, + int n_y, + int n_x, + std::vector& files) noexcept { - using namespace std::chrono_literals; + base_dir_ = base_dir; - std::vector> mutexes; - for (auto i = 0; i < n_c; ++i) { - mutexes.push_back(std::make_shared()); + std::error_code ec; + if (!fs::create_directories(base_dir_, ec)) { + LOGE("Failed to create directory %s: %s", + base_dir_.string().c_str(), + ec.message().c_str()); + return false; } - std::vector finished(n_c * n_y, 0); + if (!create_c_dirs_(n_c)) { + return false; + } - if (!create_channel_dirs_(n_c)) { + if (!create_y_dirs_(n_c, n_y)) { return false; } - files.resize(n_c * n_y * n_x); + const auto n_files = n_c * n_y * n_x; + + files.resize(n_files); + std::latch latch(n_files); + std::atomic failure{ false }; // until we support more than one channel, n_c will always be 1 for (auto c = 0; c < n_c; ++c) { for (auto y = 0; y < n_y; ++y) { - zarr_->push_to_job_queue( - [base = base_dir_, - files = files.data() + c * n_y * n_x + y * n_x, - mtx = mutexes.at(c), - c, - y, - n_x, - done = finished.data() + c * n_y + y](std::string& err) -> bool { - bool success = false; - try { - auto path = base / std::to_string(c); - { - std::unique_lock lock(*mtx); - while (!fs::exists(path)) { - lock.unlock(); - std::this_thread::sleep_for(1ms); - lock.lock(); - } - } - - path /= std::to_string(y); - - if (fs::exists(path)) { - EXPECT(fs::is_directory(path), - "%s must be a directory.", - path.c_str()); - } else { - EXPECT(fs::create_directories(path), - "Failed to create directory: %s", - path.c_str()); - } - - for (auto x = 0; x < n_x; ++x) { - auto& file = files[x]; - auto file_path = path / std::to_string(x); + for (auto x = 0; x < n_x; ++x) { + thread_pool_->push_to_job_queue( + [base = base_dir_, + &file = files.at(c * n_y * n_x + y * n_x + x), + c, + y, + x, + &latch, + &failure](std::string& err) -> bool { + bool success = false; + try { + auto path = base / std::to_string(c) / + std::to_string(y) / std::to_string(x); EXPECT(file_create(&file, - file_path.string().c_str(), - file_path.string().size()), + path.string().c_str(), + path.string().size()), "Failed to open file: '%s'", - file_path.c_str()); - } + path.c_str()); - success = true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create directory: %s", - exc.what()); - err = buf; - } catch (...) { - err = "Failed to create directory (unknown)"; - } + success = true; + } catch (const std::exception& exc) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create directory: %s", + exc.what()); + err = buf; + failure = true; + } catch (...) { + err = "Failed to create directory (unknown)"; + failure = true; + } - *done = success ? 1 : -1; - return success; - }); + latch.count_down(); + return success; // set to !failure here + }); + } } } - while (!std::all_of( - finished.begin(), finished.end(), [](const auto& f) { return f != 0; })) { - std::this_thread::sleep_for(500us); - } + latch.wait(); - return std::all_of( - finished.begin(), finished.end(), [](const auto& f) { return f == 1; }); + return !failure; } bool -zarr::FileCreator::create_channel_dirs_(int n_c) noexcept +zarr::FileCreator::create_c_dirs_(int n_c) noexcept { - using namespace std::chrono_literals; - - std::vector finished(n_c, 0); + std::latch latch(n_c); + std::atomic failure{ false }; for (auto c = 0; c < n_c; ++c) { - // create the channel directory - zarr_->push_to_job_queue( - [base = base_dir_, c, done = finished.data() + c]( - std::string& err) -> bool { - bool success = false; + thread_pool_->push_to_job_queue( + std::move([base = base_dir_, c, &latch, &failure](std::string& err) { try { const auto path = base / std::to_string(c); if (fs::exists(path)) { EXPECT(fs::is_directory(path), "%s must be a directory.", path.c_str()); - } else { + } else if (!failure) { EXPECT(fs::create_directories(path), "Failed to create directory: %s", path.c_str()); } - success = true; } catch (const std::exception& exc) { char buf[128]; snprintf(buf, @@ -145,21 +120,63 @@ zarr::FileCreator::create_channel_dirs_(int n_c) noexcept "Failed to create directory: %s", exc.what()); err = buf; + failure = true; } catch (...) { err = "Failed to create directory (unknown)"; + failure = true; } - *done = success ? 1 : -1; - return success; - }); + + latch.count_down(); + return !failure; + })); } - while (!std::all_of( - finished.begin(), finished.end(), [](const auto& f) { return f != 0; })) { - std::this_thread::sleep_for(500us); + latch.wait(); + return !failure; +} + +bool +zarr::FileCreator::create_y_dirs_(int n_c, int n_y) noexcept +{ + std::latch latch(n_c * n_y); + std::atomic failure{ false }; + for (auto c = 0; c < n_c; ++c) { + for (auto y = 0; y < n_y; ++y) { + thread_pool_->push_to_job_queue(std::move( + [base = base_dir_, c, y, &latch, &failure](std::string& err) { + try { + const auto path = + base / std::to_string(c) / std::to_string(y); + if (fs::exists(path)) { + EXPECT(fs::is_directory(path), + "%s must be a directory.", + path.c_str()); + } else if (!failure) { + EXPECT(fs::create_directories(path), + "Failed to create directory: %s", + path.c_str()); + } + } catch (const std::exception& exc) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create directory: %s", + exc.what()); + err = buf; + failure = true; + } catch (...) { + err = "Failed to create directory (unknown)"; + failure = true; + } + + latch.count_down(); + return !failure; + })); + } } - return std::all_of( - finished.begin(), finished.end(), [](const auto& f) { return f == 1; }); + latch.wait(); + return !failure; } /// Writer @@ -167,7 +184,7 @@ zarr::Writer::Writer(const ImageDims& frame_dims, const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr) + std::shared_ptr thread_pool) : frame_dims_{ frame_dims } , tile_dims_{ tile_dims } , data_root_{ data_root } @@ -176,9 +193,8 @@ zarr::Writer::Writer(const ImageDims& frame_dims, , bytes_to_flush_{ 0 } , current_chunk_{ 0 } , pixel_type_{ SampleTypeCount } - , buffers_ready_{ nullptr } - , zarr_{ zarr } - , file_creator_{ zarr } + , thread_pool_{ thread_pool } + , file_creator_{ thread_pool } { CHECK(tile_dims_.cols > 0); CHECK(tile_dims_.rows > 0); @@ -206,22 +222,40 @@ zarr::Writer::Writer(const ImageDims& frame_dims, const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr, + std::shared_ptr thread_pool, const BloscCompressionParams& compression_params) - : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, zarr) + : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, thread_pool) { blosc_compression_params_ = compression_params; } -zarr::Writer::~Writer() +bool +zarr::Writer::write(const VideoFrame* frame) { - delete[] buffers_ready_; + validate_frame_(frame); + + if (chunk_buffers_.empty()) { + make_buffers_(); + } + + // write out + bytes_to_flush_ += write_frame_to_chunks_( + frame->data, frame->bytes_of_frame - sizeof(*frame)); + + ++frames_written_; + + // rollover if necessary + const auto frames_this_chunk = frames_written_ % frames_per_chunk_; + if (frames_written_ > 0 && frames_this_chunk == 0) { + flush_(); + rollover_(); + } + return true; } void zarr::Writer::finalize() noexcept { - using namespace std::chrono_literals; finalize_chunks_(); if (bytes_to_flush_ > 0) { flush_(); @@ -236,49 +270,53 @@ zarr::Writer::frames_written() const noexcept return frames_written_; } -bool -zarr::Writer::validate_frame_(const VideoFrame* frame) noexcept +void +zarr::Writer::validate_frame_(const VideoFrame* frame) { - try { - CHECK(frame); - - if (pixel_type_ == SampleTypeCount) { - pixel_type_ = frame->shape.type; - } else { - EXPECT(pixel_type_ == frame->shape.type, - "Expected frame to have pixel type %s. Got %s.", - common::sample_type_to_string(pixel_type_), - common::sample_type_to_string(frame->shape.type)); - } + CHECK(frame); + + if (pixel_type_ == SampleTypeCount) { + pixel_type_ = frame->shape.type; + } else { + EXPECT(pixel_type_ == frame->shape.type, + "Expected frame to have pixel type %s. Got %s.", + common::sample_type_to_string(pixel_type_), + common::sample_type_to_string(frame->shape.type)); + } + + // validate the incoming frame shape against the stored frame dims + EXPECT(frame_dims_.cols == frame->shape.dims.width, + "Expected frame to have %d columns. Got %d.", + frame_dims_.cols, + frame->shape.dims.width); + EXPECT(frame_dims_.rows == frame->shape.dims.height, + "Expected frame to have %d rows. Got %d.", + frame_dims_.rows, + frame->shape.dims.height); +} + +void +zarr::Writer::make_buffers_() noexcept +{ + const auto n_chunks = tiles_per_frame_(); + + const auto bytes_per_px = bytes_of_type(pixel_type_); + const auto bytes_per_tile = + tile_dims_.cols * tile_dims_.rows * bytes_per_px; + + const auto bytes_to_reserve = + bytes_per_tile * frames_per_chunk_ + + (blosc_compression_params_.has_value() ? BLOSC_MAX_OVERHEAD : 0); - // validate the incoming frame shape against the stored frame dims - EXPECT(frame_dims_.cols == frame->shape.dims.width, - "Expected frame to have %d columns. Got %d.", - frame_dims_.cols, - frame->shape.dims.width); - EXPECT(frame_dims_.rows == frame->shape.dims.height, - "Expected frame to have %d rows. Got %d.", - frame_dims_.rows, - frame->shape.dims.height); - - return true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, sizeof(buf), "Invalid frame: %s", exc.what()); - zarr_->set_error(buf); - } catch (...) { - char buf[32]; - snprintf(buf, sizeof(buf), "Invalid frame (unknown)"); - zarr_->set_error(buf); + for (auto i = 0; i < n_chunks; ++i) { + chunk_buffers_.emplace_back(); + chunk_buffers_.back().reserve(bytes_to_reserve); } - return false; } void zarr::Writer::finalize_chunks_() noexcept { - using namespace std::chrono_literals; - const auto frames_this_chunk = frames_written_ % frames_per_chunk_; // don't write zeros if we have written less than one full chunk or if @@ -290,42 +328,40 @@ zarr::Writer::finalize_chunks_() noexcept frame_dims_.rows * frame_dims_.cols * bytes_of_type(pixel_type_); const auto frames_to_write = frames_per_chunk_ - frames_this_chunk; + const auto bytes_to_fill = + frames_to_write * common::bytes_per_tile(tile_dims_, pixel_type_); + for (auto& chunk : chunk_buffers_) { + std::fill_n(std::back_inserter(chunk), bytes_to_fill, 0); + } + bytes_to_flush_ += frames_to_write * bytes_per_frame; } -std::vector +void zarr::Writer::compress_buffers_() noexcept { - const auto nchunks = tiles_per_frame_(); + const auto n_chunks = tiles_per_frame_(); - const size_t bytes_per_chunk = bytes_to_flush_ / nchunks; - std::vector buf_sizes; + const size_t bytes_per_chunk = bytes_to_flush_ / n_chunks; if (!blosc_compression_params_.has_value()) { - for (auto& buf : chunk_buffers_) { - buf_sizes.push_back(std::min(bytes_per_chunk, buf.size())); - } - return buf_sizes; + return; } - using namespace std::chrono_literals; - - buf_sizes.resize(nchunks); - std::fill(buffers_ready_, buffers_ready_ + nchunks, false); TRACE("Compressing"); const auto bytes_per_px = bytes_of_type(pixel_type_); std::scoped_lock lock(buffers_mutex_); + std::latch latch(chunk_buffers_.size()); for (auto i = 0; i < chunk_buffers_.size(); ++i) { - auto& buf = chunk_buffers_.at(i); - - zarr_->push_to_job_queue([params = blosc_compression_params_.value(), - buf = &buf, - bytes_per_px, - bytes_per_chunk, - finished = buffers_ready_ + i, - buf_size = buf_sizes.data() + - i](std::string& err) -> bool { + auto& chunk = chunk_buffers_.at(i); + + thread_pool_->push_to_job_queue([params = + blosc_compression_params_.value(), + buf = &chunk, + bytes_per_px, + bytes_per_chunk, + &latch](std::string& err) -> bool { bool success = false; try { const auto tmp_size = bytes_per_chunk + BLOSC_MAX_OVERHEAD; @@ -341,11 +377,9 @@ zarr::Writer::compress_buffers_() noexcept params.codec_id.c_str(), 0 /* blocksize - 0:automatic */, 1); - if (nb > buf->size()) { - buf->resize(nb); - } - memcpy(buf->data(), tmp.data(), nb); - *buf_size = nb; + + tmp.resize(nb); + buf->swap(tmp); success = true; } catch (const std::exception& exc) { @@ -356,20 +390,70 @@ zarr::Writer::compress_buffers_() noexcept } catch (...) { err = "Failed to compress chunk (unknown)"; } - *finished = true; + latch.count_down(); return success; }); } // wait for all threads to finish - while (!std::all_of(buffers_ready_, - buffers_ready_ + nchunks, - [](const auto& b) { return b; })) { - std::this_thread::sleep_for(500us); + latch.wait(); +} + +size_t +zarr::Writer::write_frame_to_chunks_(const uint8_t* buf, + size_t buf_size) noexcept +{ + const auto bytes_per_px = bytes_of_type(pixel_type_); + const auto bytes_per_row = tile_dims_.cols * bytes_per_px; + const auto bytes_per_tile = tile_dims_.rows * bytes_per_row; + + const auto frames_this_chunk = frames_written_ % frames_per_chunk_; + + size_t bytes_written = 0; + + for (auto i = 0; i < tiles_per_frame_y_; ++i) { + // TODO (aliddell): we can optimize this when tiles_per_frame_x_ is 1 + for (auto j = 0; j < tiles_per_frame_x_; ++j) { + size_t offset = bytes_per_tile * frames_this_chunk; + + const auto c = i * tiles_per_frame_x_ + j; + auto& chunk = chunk_buffers_.at(c); + + for (auto k = 0; k < tile_dims_.rows; ++k) { + const auto frame_row = i * tile_dims_.rows + k; + if (frame_row < frame_dims_.rows) { + const auto frame_col = j * tile_dims_.cols; + + const auto region_width = + std::min(frame_col + tile_dims_.cols, frame_dims_.cols) - + frame_col; + + const auto region_start = + bytes_per_px * (frame_row * frame_dims_.cols + frame_col); + const auto nbytes = region_width * bytes_per_px; + const auto region_stop = region_start + nbytes; + + // copy region + std::copy(buf + region_start, + buf + region_stop, + std::back_inserter(chunk)); + + // fill remainder with zeros + std::fill_n( + std::back_inserter(chunk), bytes_per_row - nbytes, 0); + + bytes_written += bytes_per_row; + } else { + std::fill_n(std::back_inserter(chunk), bytes_per_row, 0); + bytes_written += bytes_per_row; + } + offset += tile_dims_.cols * bytes_per_px; + } + } } - return buf_sizes; + return bytes_written; } uint32_t @@ -381,11 +465,6 @@ zarr::Writer::tiles_per_frame_() const void zarr::Writer::close_files_() { - using namespace std::chrono_literals; - while (0 < zarr_->jobs_on_queue()) { - std::this_thread::sleep_for(2ms); - } - for (auto& file : files_) { file_close(&file); } diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 2653f952..ed8684c1 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -13,10 +13,6 @@ #include #include -#include -#include -#include -#include namespace fs = std::filesystem; @@ -26,20 +22,21 @@ struct Zarr; struct FileCreator { FileCreator() = delete; - explicit FileCreator(Zarr* zarr); + explicit FileCreator(std::shared_ptr thread_pool); ~FileCreator() noexcept = default; - void set_base_dir(const fs::path& base_dir) noexcept; - [[nodiscard]] bool create(int n_c, - int n_y, - int n_x, - std::vector& files) noexcept; + [[nodiscard]] bool create_files(const fs::path& base_dir, + int n_c, + int n_y, + int n_x, + std::vector& files) noexcept; private: fs::path base_dir_; - Zarr* zarr_; + std::shared_ptr thread_pool_; - bool create_channel_dirs_(int n_c) noexcept; + bool create_c_dirs_(int n_c) noexcept; + bool create_y_dirs_(int n_c, int n_y) noexcept; }; struct Writer @@ -50,18 +47,18 @@ struct Writer const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr); + std::shared_ptr thread_pool); /// Constructor with Blosc compression params Writer(const ImageDims& frame_dims, const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr, + std::shared_ptr thread_pool, const BloscCompressionParams& compression_params); - virtual ~Writer(); + virtual ~Writer() noexcept = default; - [[nodiscard]] virtual bool write(const VideoFrame* frame) noexcept = 0; + [[nodiscard]] bool write(const VideoFrame* frame); void finalize() noexcept; uint32_t frames_written() const noexcept; @@ -74,6 +71,7 @@ struct Writer uint16_t tiles_per_frame_y_; SampleType pixel_type_; uint32_t frames_per_chunk_; + std::vector> chunk_buffers_; /// Compression std::optional blosc_compression_params_; @@ -84,30 +82,26 @@ struct Writer std::vector files_; /// Multithreading - std::vector> chunk_buffers_; - bool* buffers_ready_; std::mutex buffers_mutex_; /// Bookkeeping uint64_t bytes_to_flush_; uint32_t frames_written_; uint32_t current_chunk_; - Zarr* zarr_; + std::shared_ptr thread_pool_; - [[nodiscard]] bool validate_frame_(const VideoFrame* frame) noexcept; + void validate_frame_(const VideoFrame* frame); - virtual void make_buffers_() noexcept = 0; + void make_buffers_() noexcept; void finalize_chunks_() noexcept; - std::vector compress_buffers_() noexcept; - virtual size_t write_bytes_(const uint8_t* buf, - size_t buf_size) noexcept = 0; + void compress_buffers_() noexcept; + size_t write_frame_to_chunks_(const uint8_t* buf, size_t buf_size) noexcept; virtual void flush_() noexcept = 0; uint32_t tiles_per_frame_() const; /// Files - [[nodiscard]] virtual bool make_files_() noexcept = 0; void close_files_(); void rollover_(); }; diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp new file mode 100644 index 00000000..2cd1b5cf --- /dev/null +++ b/src/writers/zarrv2.writer.cpp @@ -0,0 +1,108 @@ +#include "zarrv2.writer.hh" +#include "../zarr.hh" + +#include +#include +#include + +namespace zarr = acquire::sink::zarr; + +zarr::ZarrV2Writer::ZarrV2Writer( + const ImageDims& frame_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool) + : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, thread_pool) +{ +} + +zarr::ZarrV2Writer::ZarrV2Writer( + const ImageDims& frame_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool, + const BloscCompressionParams& compression_params) + : Writer(frame_dims, + tile_dims, + frames_per_chunk, + data_root, + thread_pool, + compression_params) +{ +} + +void +zarr::ZarrV2Writer::flush_() noexcept +{ + if (bytes_to_flush_ == 0) { + return; + } + + const auto bytes_per_px = bytes_of_type(pixel_type_); + const auto bytes_per_tile = + tile_dims_.cols * tile_dims_.rows * bytes_per_px; + + if (bytes_to_flush_ % bytes_per_tile != 0) { + LOGE("Expected bytes to flush to be a multiple of the " + "number of bytes per tile."); + } + + // create chunk files if necessary + if (files_.empty() && + !file_creator_.create_files(data_root_ / std::to_string(current_chunk_), + 1, + tiles_per_frame_y_, + tiles_per_frame_x_, + files_)) { + return; + } + + // compress buffers and write out + compress_buffers_(); + std::latch latch(chunk_buffers_.size()); + { + std::scoped_lock lock(buffers_mutex_); + for (auto i = 0; i < files_.size(); ++i) { + auto& chunk = chunk_buffers_.at(i); + thread_pool_->push_to_job_queue( + std::move([fh = &files_.at(i), + data = chunk.data(), + size = chunk.size(), + &latch](std::string& err) -> bool { + bool success = false; + try { + CHECK(file_write(fh, 0, data, data + size)); + success = true; + } catch (const std::exception& exc) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to write chunk: %s", + exc.what()); + err = buf; + } catch (...) { + err = "Unknown error"; + } + + latch.count_down(); + return success; + })); + } + } + + // wait for all threads to finish + latch.wait(); + + // reset buffers + const auto bytes_to_reserve = + bytes_per_tile * frames_per_chunk_ + + (blosc_compression_params_.has_value() ? BLOSC_MAX_OVERHEAD : 0); + + for (auto& buf : chunk_buffers_) { + buf.clear(); + buf.reserve(bytes_to_reserve); + } + bytes_to_flush_ = 0; +} diff --git a/src/writers/chunk.writer.hh b/src/writers/zarrv2.writer.hh similarity index 56% rename from src/writers/chunk.writer.hh rename to src/writers/zarrv2.writer.hh index fae0b92c..6b275844 100644 --- a/src/writers/chunk.writer.hh +++ b/src/writers/zarrv2.writer.hh @@ -1,5 +1,5 @@ -#ifndef H_ACQUIRE_ZARR_CHUNK_WRITER_V0 -#define H_ACQUIRE_ZARR_CHUNK_WRITER_V0 +#ifndef H_ACQUIRE_ZARR_V2_WRITER_V0 +#define H_ACQUIRE_ZARR_V2_WRITER_V0 #ifndef __cplusplus #error "This header requires C++20" @@ -20,33 +20,28 @@ namespace fs = std::filesystem; namespace acquire::sink::zarr { -struct ChunkWriter final : public Writer +struct ZarrV2Writer final : public Writer { public: - ChunkWriter() = delete; - ChunkWriter(const ImageDims& frame_dims, + ZarrV2Writer() = delete; + ZarrV2Writer(const ImageDims& frame_dims, const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr); + std::shared_ptr thread_pool); /// Constructor with Blosc compression params - ChunkWriter(const ImageDims& frame_dims, + ZarrV2Writer(const ImageDims& frame_dims, const ImageDims& tile_dims, uint32_t frames_per_chunk, const std::string& data_root, - Zarr* zarr, + std::shared_ptr thread_pool, const BloscCompressionParams& compression_params); - ~ChunkWriter() override = default; - - [[nodiscard]] bool write(const VideoFrame* frame) noexcept override; + ~ZarrV2Writer() override = default; private: - void make_buffers_() noexcept override; - size_t write_bytes_(const uint8_t* buf, size_t buf_size) noexcept override; void flush_() noexcept override; - [[nodiscard]] bool make_files_() noexcept override; }; } // namespace acquire::sink::zarr -#endif // H_ACQUIRE_ZARR_CHUNK_WRITER_V0 +#endif // H_ACQUIRE_ZARR_V2_WRITER_V0 diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp new file mode 100644 index 00000000..9539e667 --- /dev/null +++ b/src/writers/zarrv3.writer.cpp @@ -0,0 +1,154 @@ +#include "zarrv3.writer.hh" +#include "../zarr.hh" + +#include +#include + +namespace zarr = acquire::sink::zarr; + +zarr::ZarrV3Writer::ZarrV3Writer( + const ImageDims& frame_dims, + const ImageDims& shard_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool) + : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, thread_pool) + , shard_dims_{ shard_dims } +{ + shards_per_frame_x_ = + std::ceil((float)frame_dims.cols / (float)shard_dims.cols); + shards_per_frame_y_ = + std::ceil((float)frame_dims.rows / (float)shard_dims.rows); +} + +zarr::ZarrV3Writer::ZarrV3Writer( + const ImageDims& frame_dims, + const ImageDims& shard_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool, + const BloscCompressionParams& compression_params) + : Writer(frame_dims, + tile_dims, + frames_per_chunk, + data_root, + thread_pool, + compression_params) + , shard_dims_{ shard_dims } +{ + shards_per_frame_x_ = + std::ceil((float)frame_dims.cols / (float)shard_dims.cols); + shards_per_frame_y_ = + std::ceil((float)frame_dims.rows / (float)shard_dims.rows); +} + +uint16_t +// FIXME (aliddell): this is generalizable and doesn't need to be a method +zarr::ZarrV3Writer::chunks_per_shard_() const +{ + const uint16_t chunks_per_shard_x = shard_dims_.cols / tile_dims_.cols; + const uint16_t chunks_per_shard_y = shard_dims_.rows / tile_dims_.rows; + return chunks_per_shard_x * chunks_per_shard_y; +} + +uint16_t +zarr::ZarrV3Writer::shards_per_frame_() const +{ + return shards_per_frame_x_ * shards_per_frame_y_; +} + +void +zarr::ZarrV3Writer::flush_() noexcept +{ + if (bytes_to_flush_ == 0) { + return; + } + + const auto bytes_per_px = bytes_of_type(pixel_type_); + const auto bytes_per_tile = + tile_dims_.cols * tile_dims_.rows * bytes_per_px; + + if (bytes_to_flush_ % bytes_per_tile != 0) { + LOGE("Expected bytes to flush to be a multiple of the " + "number of bytes per tile."); + } + + // create shard files if necessary + if (files_.empty() && !file_creator_.create_files( + data_root_ / ("c" + std::to_string(current_chunk_)), + 1, + shards_per_frame_y_, + shards_per_frame_x_, + files_)) { + return; + } + + const auto chunks_per_shard = chunks_per_shard_(); + + // compress buffers + compress_buffers_(); + const size_t bytes_of_index = 2 * chunks_per_shard * sizeof(uint64_t); + + const auto max_bytes_per_chunk = + bytes_per_tile * frames_per_chunk_ + + (blosc_compression_params_.has_value() ? BLOSC_MAX_OVERHEAD : 0); + + // concatenate chunks into shards + const auto n_shards = shards_per_frame_(); + std::latch latch(n_shards); + for (auto i = 0; i < n_shards; ++i) { + thread_pool_->push_to_job_queue( + std::move([fh = &files_.at(i), chunks_per_shard, i, &latch, this]( + std::string& err) { + size_t chunk_index = 0; + std::vector chunk_indices; + size_t offset = 0; + bool success = false; + try { + for (auto j = 0; j < chunks_per_shard; ++j) { + chunk_indices.push_back(chunk_index); // chunk offset + const auto k = i * chunks_per_shard + j; + + auto& chunk = chunk_buffers_.at(k); + chunk_index += chunk.size(); + chunk_indices.push_back(chunk.size()); // chunk extent + + file_write( + fh, offset, chunk.data(), chunk.data() + chunk.size()); + offset += chunk.size(); + } + + // write the indices out at the end of the shard + const auto* indices = + reinterpret_cast(chunk_indices.data()); + success = (bool)file_write(fh, + offset, + indices, + indices + chunk_indices.size() * + sizeof(uint64_t)); + } catch (const std::exception& exc) { + char buf[128]; + snprintf( + buf, sizeof(buf), "Failed to write chunk: %s", exc.what()); + err = buf; + } catch (...) { + err = "Unknown error"; + } + + latch.count_down(); + return success; + })); + } + + // wait for all threads to finish + latch.wait(); + + // reset buffers + for (auto& buf : chunk_buffers_) { + buf.clear(); + buf.reserve(max_bytes_per_chunk); + } + bytes_to_flush_ = 0; +} diff --git a/src/writers/zarrv3.writer.hh b/src/writers/zarrv3.writer.hh new file mode 100644 index 00000000..63694d37 --- /dev/null +++ b/src/writers/zarrv3.writer.hh @@ -0,0 +1,56 @@ +#ifndef H_ACQUIRE_ZARR_V3_WRITER_V0 +#define H_ACQUIRE_ZARR_V3_WRITER_V0 + +#ifndef __cplusplus +#error "This header requires C++20" +#endif + +#include "writer.hh" + +#include "platform.h" +#include "device/props/components.h" + +#include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +namespace acquire::sink::zarr { +struct ZarrV3Writer final : public Writer +{ + public: + ZarrV3Writer() = delete; + ZarrV3Writer(const ImageDims& frame_dims, + const ImageDims& shard_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool); + + /// Constructor with Blosc compression params + ZarrV3Writer(const ImageDims& frame_dims, + const ImageDims& shard_dims, + const ImageDims& tile_dims, + uint32_t frames_per_chunk, + const std::string& data_root, + std::shared_ptr thread_pool, + const BloscCompressionParams& compression_params); + ~ZarrV3Writer() override = default; + + private: + ImageDims shard_dims_; + uint16_t shards_per_frame_x_; + uint16_t shards_per_frame_y_; + + uint16_t chunks_per_shard_() const; + uint16_t shards_per_frame_() const; + + void flush_() noexcept override; +}; +} // namespace acquire::sink::zarr + +#endif // H_ACQUIRE_ZARR_V3_WRITER_V0 diff --git a/src/zarr.cpp b/src/zarr.cpp index 1f91c5a2..7162e862 100644 --- a/src/zarr.cpp +++ b/src/zarr.cpp @@ -1,6 +1,6 @@ #include "zarr.hh" -#include "writers/chunk.writer.hh" +#include "writers/zarrv2.writer.hh" #include "json.hpp" namespace zarr = acquire::sink::zarr; @@ -84,7 +84,7 @@ zarr_set(Storage* self_, const StorageProperties* props) noexcept { try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; self->set(props); } catch (const std::exception& exc) { LOGE("Exception: %s\n", exc.what()); @@ -102,7 +102,7 @@ zarr_get(const Storage* self_, StorageProperties* props) noexcept { try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; self->get(props); } catch (const std::exception& exc) { LOGE("Exception: %s\n", exc.what()); @@ -116,7 +116,7 @@ zarr_get_meta(const Storage* self_, StoragePropertyMetadata* meta) noexcept { try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; self->get_meta(meta); } catch (const std::exception& exc) { LOGE("Exception: %s\n", exc.what()); @@ -132,7 +132,7 @@ zarr_start(Storage* self_) noexcept try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; self->start(); state = DeviceState_Running; } catch (const std::exception& exc) { @@ -150,7 +150,7 @@ zarr_append(Storage* self_, const VideoFrame* frames, size_t* nbytes) noexcept DeviceState state; try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; *nbytes = self->append(frames, *nbytes); state = DeviceState_Running; } catch (const std::exception& exc) { @@ -173,7 +173,7 @@ zarr_stop(Storage* self_) noexcept try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; CHECK(self->stop()); state = DeviceState_Armed; } catch (const std::exception& exc) { @@ -190,7 +190,7 @@ zarr_destroy(Storage* self_) noexcept { try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; if (self_->stop) self_->stop(self_); @@ -207,7 +207,7 @@ zarr_reserve_image_shape(Storage* self_, const ImageShape* shape) noexcept { try { CHECK(self_); - auto* self = (zarr::StorageInterface*)self_; + auto* self = (zarr::Zarr*)self_; self->reserve_image_shape(shape); } catch (const std::exception& exc) { LOGE("Exception: %s\n", exc.what()); @@ -319,22 +319,6 @@ average_two_frames(VideoFrame* dst, const VideoFrame* src) } } // end ::{anonymous} namespace -/// StorageInterface -zarr::StorageInterface::StorageInterface() - : Storage{ - .state = DeviceState_AwaitingConfiguration, - .set = ::zarr_set, - .get = ::zarr_get, - .get_meta = ::zarr_get_meta, - .start = ::zarr_start, - .append = ::zarr_append, - .stop = ::zarr_stop, - .destroy = ::zarr_destroy, - .reserve_image_shape = ::zarr_reserve_image_shape, - } -{ -} - void zarr::Zarr::set(const StorageProperties* props) { @@ -347,8 +331,9 @@ zarr::Zarr::set(const StorageProperties* props) validate_props(props); dataset_root_ = as_path(*props); - if (props->external_metadata_json.str) + if (props->external_metadata_json.str) { external_metadata_json_ = props->external_metadata_json.str; + } pixel_scale_um_ = props->pixel_scale_um; @@ -356,14 +341,15 @@ zarr::Zarr::set(const StorageProperties* props) image_tile_shapes_.clear(); image_tile_shapes_.emplace_back(); - set_chunking(props->chunking, meta.chunking); + set_chunking(props->chunk_dims_px, meta.chunk_dims_px); - if (props->enable_multiscale && !meta.multiscale.supported) { + if (props->enable_multiscale && !meta.multiscale.is_supported) { // TODO (aliddell): https://github.com/ome/ngff/pull/206 LOGE("OME-Zarr multiscale not yet supported in Zarr v3. " "Multiscale arrays will not be written."); } - enable_multiscale_ = meta.multiscale.supported && props->enable_multiscale; + enable_multiscale_ = + meta.multiscale.is_supported && props->enable_multiscale; } void @@ -376,14 +362,44 @@ zarr::Zarr::get(StorageProperties* props) const props->pixel_scale_um = pixel_scale_um_; if (!image_tile_shapes_.empty()) { - props->chunking.tile.width = image_tile_shapes_.at(0).second.cols; - props->chunking.tile.height = image_tile_shapes_.at(0).second.rows; + props->chunk_dims_px.width = image_tile_shapes_.at(0).second.cols; + props->chunk_dims_px.height = image_tile_shapes_.at(0).second.rows; } - props->chunking.tile.planes = 1; + props->chunk_dims_px.planes = planes_per_chunk_; props->enable_multiscale = enable_multiscale_; } +void +zarr::Zarr::get_meta(StoragePropertyMetadata* meta) const +{ + CHECK(meta); + + *meta = { + .chunk_dims_px = { + .is_supported = 1, + .width = { + .writable = 1, + .low = 32.f, + .high = (float)std::numeric_limits::max(), + .type = PropertyType_FixedPrecision + }, + .height = { + .writable = 1, + .low = 32.f, + .high = (float)std::numeric_limits::max(), + .type = PropertyType_FixedPrecision + }, + .planes = { + .writable = 1, + .low = 32.f, + .high = (float)std::numeric_limits::max(), + .type = PropertyType_FixedPrecision + }, + }, + }; +} + void zarr::Zarr::start() { @@ -422,6 +438,9 @@ zarr::Zarr::stop() noexcept writer->finalize(); } writers_.clear(); + + thread_pool_->await_stop(); + is_ok = 1; } catch (const std::exception& exc) { LOGE("Exception: %s\n", exc.what()); @@ -479,7 +498,7 @@ zarr::Zarr::reserve_image_shape(const ImageShape* shape) { StorageProperties props = { 0 }; get(&props); - uint32_t tile_width = props.chunking.tile.width; + uint32_t tile_width = props.chunk_dims_px.width; if (image_shape.cols > 0 && (tile_width == 0 || tile_width > image_shape.cols)) { LOGE("%s. Setting width to %u.", @@ -490,7 +509,7 @@ zarr::Zarr::reserve_image_shape(const ImageShape* shape) } tile_shape.cols = tile_width; - uint32_t tile_height = props.chunking.tile.height; + uint32_t tile_height = props.chunk_dims_px.height; if (image_shape.rows > 0 && (tile_height == 0 || tile_height > image_shape.rows)) { LOGE("%s. Setting height to %u.", @@ -504,17 +523,6 @@ zarr::Zarr::reserve_image_shape(const ImageShape* shape) storage_properties_destroy(&props); } - // ensure that the chunk size can accommodate at least one tile - uint64_t bytes_per_tile = common::bytes_per_tile(tile_shape, pixel_type_); - CHECK(bytes_per_tile > 0); - - if (max_bytes_per_chunk_ < bytes_per_tile) { - LOGE("Specified chunk size %llu is too small. Setting to %llu bytes.", - max_bytes_per_chunk_, - bytes_per_tile); - max_bytes_per_chunk_ = bytes_per_tile; - } - if (enable_multiscale_) { make_scales(image_tile_shapes_); } @@ -530,14 +538,26 @@ zarr::Zarr::reserve_image_shape(const ImageShape* shape) /// Zarr zarr::Zarr::Zarr() - : threads_(std::thread::hardware_concurrency()) + : Storage { + .state = DeviceState_AwaitingConfiguration, + .set = ::zarr_set, + .get = ::zarr_get, + .get_meta = ::zarr_get_meta, + .start = ::zarr_start, + .append = ::zarr_append, + .stop = ::zarr_stop, + .destroy = ::zarr_destroy, + .reserve_image_shape = ::zarr_reserve_image_shape, + } + , thread_pool_{ std::make_shared( + std::thread::hardware_concurrency(), + [this](const std::string& err) { this->set_error(err); }) } + , pixel_scale_um_{ 1, 1 } + , planes_per_chunk_{ 0 } + , enable_multiscale_{ false } + , pixel_type_{ SampleType_u8 } + , error_{ false } { - // spin up threads - for (auto& ctx_ : threads_) { - ctx_.ready = true; - ctx_.should_stop = false; - ctx_.thread = std::thread([this, ctx = &ctx_] { worker_thread_(ctx); }); - } } zarr::Zarr::Zarr(BloscCompressionParams&& compression_params) @@ -546,28 +566,21 @@ zarr::Zarr::Zarr(BloscCompressionParams&& compression_params) blosc_compression_params_ = std::move(compression_params); } -zarr::Zarr::~Zarr() noexcept -{ - // spin down threads - for (auto& ctx : threads_) { - ctx.should_stop = true; - ctx.cv.notify_one(); - ctx.thread.join(); - } -} - void zarr::Zarr::set_chunking(const ChunkingProps& props, const ChunkingMeta& meta) { - max_bytes_per_chunk_ = std::clamp(props.max_bytes_per_chunk, - (uint64_t)meta.max_bytes_per_chunk.low, - (uint64_t)meta.max_bytes_per_chunk.high); - // image shape is set *after* this is set so we verify it later image_tile_shapes_.at(0).second = { - .cols = props.tile.width, - .rows = props.tile.height, + .cols = std::clamp( + props.width, (uint32_t)meta.width.low, (uint32_t)meta.width.high), + .rows = std::clamp( + props.height, (uint32_t)meta.height.low, (uint32_t)meta.height.high), }; + + planes_per_chunk_ = std::clamp( + props.planes, (uint32_t)meta.planes.low, (uint32_t)meta.planes.high); + + CHECK(planes_per_chunk_ > 0); } void @@ -582,20 +595,6 @@ zarr::Zarr::set_error(const std::string& msg) noexcept } } -void -zarr::Zarr::push_to_job_queue(JobT&& job) -{ - std::scoped_lock lock(mutex_); - jobs_.push(std::move(job)); -} - -size_t -zarr::Zarr::jobs_on_queue() const -{ - std::scoped_lock lock(mutex_); - return jobs_.size(); -} - void zarr::Zarr::write_all_array_metadata_() const { @@ -672,56 +671,6 @@ zarr::Zarr::write_multiscale_frames_(const VideoFrame* frame) } } -std::optional -zarr::Zarr::pop_from_job_queue_() noexcept -{ - std::scoped_lock lock(mutex_); - if (jobs_.empty()) { - return std::nullopt; - } - - auto job = jobs_.front(); - jobs_.pop(); - return job; -} - -void -zarr::Zarr::worker_thread_(ThreadContext* ctx) -{ - using namespace std::chrono_literals; - - TRACE("Worker thread starting."); - if (nullptr == ctx) { - LOGE("Null context passed to worker thread."); - return; - } - - while (true) { - std::unique_lock lock(ctx->mutex); - ctx->cv.wait_for(lock, 1ms, [&] { return ctx->should_stop; }); - - if (ctx->should_stop) { - break; - } - - if (auto job = pop_from_job_queue_(); job.has_value()) { - ctx->ready = false; - std::string err_msg; - if (!job.value()(err_msg)) { - set_error(err_msg); - } - ctx->ready = true; - lock.unlock(); - ctx->cv.notify_one(); - } else { - lock.unlock(); - std::this_thread::sleep_for(1ms); - } - } - - TRACE("Worker thread exiting."); -} - #ifndef NO_UNIT_TESTS #ifdef _WIN32 #define acquire_export __declspec(dllexport) diff --git a/src/zarr.driver.c b/src/zarr.driver.c index b8e78481..75ae89ff 100644 --- a/src/zarr.driver.c +++ b/src/zarr.driver.c @@ -39,6 +39,12 @@ struct Storage* compressed_zarr_v2_zstd_init(); struct Storage* compressed_zarr_v2_lz4_init(); +struct Storage* +zarr_v3_init(); +struct Storage* +compressed_zarr_v3_zstd_init(); +struct Storage* +compressed_zarr_v3_lz4_init(); // // GLOBALS @@ -49,6 +55,9 @@ enum StorageKind Storage_Zarr, Storage_ZarrBlosc1ZstdByteShuffle, Storage_ZarrBlosc1Lz4ByteShuffle, + Storage_ZarrV3, + Storage_ZarrV3Blosc1ZstdByteShuffle, + Storage_ZarrV3Blosc1Lz4ByteShuffle, Storage_Number_Of_Kinds }; @@ -71,6 +80,9 @@ storage_kind_to_string(const enum StorageKind kind) CASE(Storage_Zarr); CASE(Storage_ZarrBlosc1ZstdByteShuffle); CASE(Storage_ZarrBlosc1Lz4ByteShuffle); + CASE(Storage_ZarrV3); + CASE(Storage_ZarrV3Blosc1ZstdByteShuffle); + CASE(Storage_ZarrV3Blosc1Lz4ByteShuffle); #undef CASE default: return "(unknown)"; @@ -99,6 +111,9 @@ zarr_describe(const struct Driver* driver, XXX(Zarr), XXX(ZarrBlosc1ZstdByteShuffle), XXX(ZarrBlosc1Lz4ByteShuffle), + XXX(ZarrV3), + XXX(ZarrV3Blosc1ZstdByteShuffle), + XXX(ZarrV3Blosc1Lz4ByteShuffle), }; // clang-format on #undef XXX @@ -160,6 +175,10 @@ acquire_driver_init_v0(acquire_reporter_t reporter) [Storage_Zarr] = zarr_v2_init, [Storage_ZarrBlosc1ZstdByteShuffle] = compressed_zarr_v2_zstd_init, [Storage_ZarrBlosc1Lz4ByteShuffle] = compressed_zarr_v2_lz4_init, + [Storage_ZarrV3] = zarr_v3_init, + [Storage_ZarrV3Blosc1ZstdByteShuffle] = + compressed_zarr_v3_zstd_init, + [Storage_ZarrV3Blosc1Lz4ByteShuffle] = compressed_zarr_v3_lz4_init, }; memcpy( globals.constructors, impls, nbytes); // cppcheck-suppress uninitvar diff --git a/src/zarr.hh b/src/zarr.hh index f9748a31..d6597a65 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -7,7 +7,6 @@ #include "device/kit/storage.h" -#include "prelude.h" #include "common.hh" #include "writers/writer.hh" #include "writers/blosc.compressor.hh" @@ -22,57 +21,26 @@ namespace fs = std::filesystem; namespace acquire::sink::zarr { -// StorageInterface -struct StorageInterface : public Storage -{ - StorageInterface(); - virtual ~StorageInterface() = default; - virtual void set(const StorageProperties* props) = 0; - virtual void get(StorageProperties* props) const = 0; - virtual void get_meta(StoragePropertyMetadata* meta) const = 0; - virtual void start() = 0; - virtual int stop() noexcept = 0; - - /// @return number of consumed bytes - virtual size_t append(const VideoFrame* frames, size_t nbytes) = 0; - - /// @brief Set the image shape for allocating chunk writers. - virtual void reserve_image_shape(const ImageShape* shape) = 0; -}; - -struct Zarr : StorageInterface +struct Zarr: public Storage { public: - using JobT = std::function; - struct ThreadContext - { - std::thread thread; - std::mutex mutex; - std::condition_variable cv; - bool should_stop; - bool ready; - }; - Zarr(); - Zarr(BloscCompressionParams&& compression_params); - ~Zarr() noexcept override; - - /// StorageInterface - void set(const StorageProperties* props) override; - void get(StorageProperties* props) const override; - void start() override; - int stop() noexcept override; - size_t append(const VideoFrame* frames, size_t nbytes) override; - void reserve_image_shape(const ImageShape* shape) override; + explicit Zarr(BloscCompressionParams&& compression_params); + virtual ~Zarr() noexcept = default; + + /// Storage interface + virtual void set(const StorageProperties* props); + void get(StorageProperties* props) const; + virtual void get_meta(StoragePropertyMetadata* meta) const; + void start(); + int stop() noexcept; + size_t append(const VideoFrame* frames, size_t nbytes); + virtual void reserve_image_shape(const ImageShape* shape); /// Error state void set_error(const std::string& msg) noexcept; - /// Multithreading - void push_to_job_queue(JobT&& job); - size_t jobs_on_queue() const; - protected: using ChunkingProps = StorageProperties::storage_properties_chunking_s; using ChunkingMeta = @@ -85,7 +53,7 @@ struct Zarr : StorageInterface fs::path dataset_root_; std::string external_metadata_json_; PixelScale pixel_scale_um_; - uint64_t max_bytes_per_chunk_; + uint32_t planes_per_chunk_; bool enable_multiscale_; /// changes on reserve_image_shape @@ -98,9 +66,8 @@ struct Zarr : StorageInterface std::unordered_map> scaled_frames_; /// Multithreading - std::vector threads_; - mutable std::mutex mutex_; // for jobs_ and error_ / error_msg_ - std::queue jobs_; + std::shared_ptr thread_pool_; + mutable std::mutex mutex_; // for error_ / error_msg_ /// Error state bool error_; @@ -122,10 +89,6 @@ struct Zarr : StorageInterface /// Multiscale void write_multiscale_frames_(const VideoFrame* frame); - - /// Multithreading - std::optional pop_from_job_queue_() noexcept; - void worker_thread_(ThreadContext* ctx); }; } // namespace acquire::sink::zarr diff --git a/src/zarr.v2.cpp b/src/zarr.v2.cpp index 6d9c2aef..e57a49fe 100644 --- a/src/zarr.v2.cpp +++ b/src/zarr.v2.cpp @@ -1,5 +1,5 @@ #include "zarr.v2.hh" -#include "writers/chunk.writer.hh" +#include "writers/zarrv2.writer.hh" #include "json.hpp" @@ -32,19 +32,11 @@ zarr::ZarrV2::ZarrV2(BloscCompressionParams&& compression_params) void zarr::ZarrV2::get_meta(StoragePropertyMetadata* meta) const { - CHECK(meta); - *meta = { - .chunking = { - .supported = 1, - .max_bytes_per_chunk = { - .writable = 1, - .low = (float)(16 << 20), - .high = (float)(1 << 30), - .type = PropertyType_FixedPrecision }, - }, - .multiscale = { - .supported = 1, - } + Zarr::get_meta(meta); + + meta->shard_dims_chunks = { 0 }; + meta->multiscale = { + .is_supported = 1, }; } @@ -55,23 +47,25 @@ zarr::ZarrV2::allocate_writers_() for (auto i = 0; i < image_tile_shapes_.size(); ++i) { const auto& image_shape = image_tile_shapes_.at(i).first; const auto& tile_shape = image_tile_shapes_.at(i).second; - uint64_t bytes_per_tile = + + const uint64_t bytes_per_tile = common::bytes_per_tile(tile_shape, pixel_type_); + if (blosc_compression_params_.has_value()) { - writers_.push_back(std::make_shared( + writers_.push_back(std::make_shared( image_shape, tile_shape, - (uint32_t)(max_bytes_per_chunk_ / bytes_per_tile), + planes_per_chunk_, (get_data_directory_() / std::to_string(i)).string(), - this, + thread_pool_, blosc_compression_params_.value())); } else { - writers_.push_back(std::make_shared( + writers_.push_back(std::make_shared( image_shape, tile_shape, - (uint32_t)(max_bytes_per_chunk_ / bytes_per_tile), + planes_per_chunk_, (get_data_directory_() / std::to_string(i)).string(), - this)); + thread_pool_)); } } } @@ -89,11 +83,8 @@ zarr::ZarrV2::write_array_metadata_(size_t level) const const ImageDims& image_dims = image_tile_shapes_.at(level).first; const ImageDims& tile_dims = image_tile_shapes_.at(level).second; - const auto frame_count = (uint64_t)writers_.at(level)->frames_written(); - const auto frames_per_chunk = - std::min(frame_count, - (uint64_t)common::frames_per_chunk( - tile_dims, pixel_type_, max_bytes_per_chunk_)); + const auto frame_count = writers_.at(level)->frames_written(); + const auto frames_per_chunk = std::min(frame_count, planes_per_chunk_); json zarray_attrs = { { "zarr_format", 2 }, @@ -189,7 +180,13 @@ zarr::ZarrV2::write_group_metadata_() const { { { "type", "scale" }, - { "scale", { 1, 1, pixel_scale_um_.y, pixel_scale_um_.x } }, + { "scale", + { + 1, // t + 1, // c + pixel_scale_um_.y, // y + pixel_scale_um_.x // x + } }, }, } }, }, @@ -205,8 +202,8 @@ zarr::ZarrV2::write_group_metadata_() const { "scale", { - std::pow(2, i), // t - 1, // c + std::pow(2, i), // t + 1, // c std::pow(2, i) * pixel_scale_um_.y, // y std::pow(2, i) * pixel_scale_um_.x // x }, diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp new file mode 100644 index 00000000..d3c060e5 --- /dev/null +++ b/src/zarr.v3.cpp @@ -0,0 +1,339 @@ +#include "zarr.v3.hh" +#include "writers/zarrv3.writer.hh" + +#include "json.hpp" + +#include + +namespace zarr = acquire::sink::zarr; + +namespace { +template +struct Storage* +compressed_zarr_v3_init() +{ + try { + zarr::BloscCompressionParams params( + zarr::compression_codec_as_string(), 1, 1); + return new zarr::ZarrV3(std::move(params)); + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + return nullptr; +} +} // end ::{anonymous} namespace + +zarr::ZarrV3::ZarrV3(BloscCompressionParams&& compression_params) + : Zarr(std::move(compression_params)) +{ +} + +void +zarr::ZarrV3::set_sharding(const ShardingProps& props, const ShardingMeta& meta) +{ + // we can't validate and convert until we know the image shape and corrected + // tile shape (see reserve_image_shape) so let's just store the raw values + // for now + ImageDims shard_dims = { + .cols = props.width, + .rows = props.height, + }; + shard_dims_.push_back(shard_dims); +} + +void +zarr::ZarrV3::allocate_writers_() +{ + writers_.clear(); + + for (auto i = 0; i < image_tile_shapes_.size(); ++i) { + const auto& frame_dims = image_tile_shapes_.at(i).first; + const auto& tile_dims = image_tile_shapes_.at(i).second; + + if (blosc_compression_params_.has_value()) { + writers_.push_back(std::make_shared( + frame_dims, + shard_dims_.at(i), + tile_dims, + planes_per_chunk_, + (get_data_directory_() / std::to_string(i)).string(), + thread_pool_, + blosc_compression_params_.value())); + } else { + writers_.push_back(std::make_shared( + frame_dims, + shard_dims_.at(i), + tile_dims, + planes_per_chunk_, + (get_data_directory_() / std::to_string(i)).string(), + thread_pool_)); + } + } +} + +void +zarr::ZarrV3::set(const StorageProperties* props) +{ + Zarr::set(props); + + StoragePropertyMetadata meta{}; + get_meta(&meta); + + const auto sharding_props = props->shard_dims_chunks; + const auto sharding_meta = meta.shard_dims_chunks; + + set_sharding(sharding_props, sharding_meta); +} + +void +zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const +{ + Zarr::get_meta(meta); + + meta->shard_dims_chunks = { + .is_supported = 1, + .width = { .writable = 1, + .low = 1.f, + .high = (float)std::numeric_limits::max(), + .type = PropertyType_FixedPrecision }, + .height = { .writable = 1, + .low = 1.f, + .high = (float)std::numeric_limits::max(), + .type = PropertyType_FixedPrecision }, + .planes = { .writable = 1, + .low = 1.f, + .high = 1.f, + .type = PropertyType_FixedPrecision }, + }; + meta->multiscale = { + .is_supported = 0, + }; +} + +void +zarr::ZarrV3::reserve_image_shape(const ImageShape* shape) +{ + // `shape` should be verified nonnull in storage_reserve_image_shape, but + // let's check anyway + CHECK(shape); + image_tile_shapes_.at(0).first = { + .cols = shape->dims.width, + .rows = shape->dims.height, + }; + pixel_type_ = shape->type; + + ImageDims& image_shape = image_tile_shapes_.at(0).first; + ImageDims& tile_shape = image_tile_shapes_.at(0).second; + + // ensure that tile dimensions are compatible with the image shape + { + StorageProperties props = { 0 }; + get(&props); + uint32_t tile_width = props.chunk_dims_px.width; + if (image_shape.cols > 0 && + (tile_width == 0 || tile_width > image_shape.cols)) { + LOGE("%s. Setting width to %u.", + tile_width == 0 ? "Tile width not specified" + : "Specified tile width is too large", + image_shape.cols); + tile_width = image_shape.cols; + } + tile_shape.cols = tile_width; + + uint32_t tile_height = props.chunk_dims_px.height; + if (image_shape.rows > 0 && + (tile_height == 0 || tile_height > image_shape.rows)) { + LOGE("%s. Setting height to %u.", + tile_height == 0 ? "Tile height not specified" + : "Specified tile height is too large", + image_shape.rows); + tile_height = image_shape.rows; + } + tile_shape.rows = tile_height; + + storage_properties_destroy(&props); + } + + auto& shard_dims = shard_dims_.at(0); + + StoragePropertyMetadata meta = { 0 }; + get_meta(&meta); + + shard_dims = { + .cols = std::clamp(shard_dims.cols, + (uint32_t)meta.shard_dims_chunks.width.low, + (uint32_t)meta.shard_dims_chunks.width.high) * + tile_shape.cols, + .rows = std::clamp(shard_dims.rows, + (uint32_t)meta.shard_dims_chunks.height.low, + (uint32_t)meta.shard_dims_chunks.height.high) * + tile_shape.rows, + }; + + EXPECT(shard_dims.cols <= image_shape.cols, + "Shard width %d exceeds frame width %d", + shard_dims.cols, + image_shape.cols); + EXPECT(shard_dims.rows <= image_shape.rows, + "Shard height %d exceeds frame height %d", + shard_dims.rows, + image_shape.rows); + + allocate_writers_(); +} + +void +zarr::ZarrV3::write_array_metadata_(size_t level) const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + if (writers_.size() <= level) { + return; + } + + const ImageDims& image_dims = image_tile_shapes_.at(level).first; + const ImageDims& tile_dims = image_tile_shapes_.at(level).second; + const ImageDims& shard_dims = shard_dims_.at(level); + + const auto frame_count = writers_.at(level)->frames_written(); + const auto frames_per_chunk = std::min(frame_count, planes_per_chunk_); + + json metadata; + metadata["attributes"] = json::object(); + metadata["chunk_grid"] = json::object({ + { "chunk_shape", + json::array({ + frames_per_chunk, // t + 1, // c + tile_dims.rows, // y + tile_dims.cols, // x + }) }, + { "separator", "/" }, + { "type", "regular" }, + }); + metadata["chunk_memory_layout"] = "C"; + metadata["data_type"] = common::sample_type_to_dtype(pixel_type_); + metadata["extensions"] = json::array(); + metadata["fill_value"] = 0; + metadata["shape"] = json::array({ + frame_count, // t + 1, // c + image_dims.rows, // y + image_dims.cols, // x + }); + + if (blosc_compression_params_.has_value()) { + auto params = blosc_compression_params_.value(); + metadata["compressor"] = json::object({ + { "codec", "https://purl.org/zarr/spec/codec/blosc/1.0" }, + { "configuration", + json::object({ + { "blocksize", 0 }, + { "clevel", params.clevel }, + { "cname", params.codec_id }, + { "shuffle", params.shuffle }, + }) }, + }); + } + + // sharding storage transformer + // TODO (aliddell): + // https://github.com/zarr-developers/zarr-python/issues/877 + metadata["storage_transformers"] = json::array(); + metadata["storage_transformers"][0] = json::object({ + { "type", "indexed" }, + { "extension", + "https://purl.org/zarr/spec/storage_transformers/sharding/1.0" }, + { "configuration", + json::object({ + { "chunks_per_shard", + json::array({ + 1, // t + 1, // c + shard_dims.rows / tile_dims.rows, // y + shard_dims.cols / tile_dims.cols, // x + }) }, + }) }, + }); + + auto path = (dataset_root_ / "meta" / "root" / + (std::to_string(level) + ".array.json")) + .string(); + common::write_string(path, metadata.dump(4)); +} + +/// @brief Write the external metadata. +/// @details This is a no-op for ZarrV3. Instead, external metadata is +/// stored in the group metadata. +void +zarr::ZarrV3::write_external_metadata_() const +{ + // no-op +} + +/// @brief Write the metadata for the dataset. +void +zarr::ZarrV3::write_base_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["extensions"] = json::array(); + metadata["metadata_encoding"] = + "https://purl.org/zarr/spec/protocol/core/3.0"; + metadata["metadata_key_suffix"] = ".json"; + metadata["zarr_format"] = "https://purl.org/zarr/spec/protocol/core/3.0"; + + auto path = (dataset_root_ / "zarr.json").string(); + common::write_string(path, metadata.dump(4)); +} + +/// @brief Write the metadata for the group. +/// @details Zarr v3 stores group metadata in +/// /meta/{group_name}.group.json. We will call the group "root". +void +zarr::ZarrV3::write_group_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["attributes"]["acquire"] = json::parse(external_metadata_json_); + + auto path = (dataset_root_ / "meta" / "root.group.json").string(); + common::write_string(path, metadata.dump(4)); +} + +fs::path +zarr::ZarrV3::get_data_directory_() const +{ + return dataset_root_ / "data" / "root"; +} + +extern "C" +{ + struct Storage* zarr_v3_init() + { + try { + return new zarr::ZarrV3(); + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + return nullptr; + } + struct Storage* compressed_zarr_v3_zstd_init() + { + return compressed_zarr_v3_init(); + } + + struct Storage* compressed_zarr_v3_lz4_init() + { + return compressed_zarr_v3_init(); + } +} diff --git a/src/zarr.v3.hh b/src/zarr.v3.hh new file mode 100644 index 00000000..6a8d0cea --- /dev/null +++ b/src/zarr.v3.hh @@ -0,0 +1,40 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_V3_V0 +#define H_ACQUIRE_STORAGE_ZARR_V3_V0 + +#include "zarr.hh" + +namespace acquire::sink::zarr { +struct ZarrV3 final : public Zarr +{ + public: + ZarrV3() = default; + explicit ZarrV3(BloscCompressionParams&& compression_params); + ~ZarrV3() override = default; + + /// Storage interface + void set(const StorageProperties* props) override; + void get_meta(StoragePropertyMetadata* meta) const override; + void reserve_image_shape(const ImageShape* shape) override; + + private: + using ShardingProps = StorageProperties::storage_properties_sharding_s; + using ShardingMeta = + StoragePropertyMetadata::storage_property_metadata_sharding_s; + + std::vector shard_dims_; + + /// Setup + void set_sharding(const ShardingProps& props, const ShardingMeta& meta); + void allocate_writers_() override; + + /// Metadata + void write_array_metadata_(size_t level) const override; + void write_external_metadata_() const override; + void write_base_metadata_() const override; + void write_group_metadata_() const override; + + /// Filesystem + fs::path get_data_directory_() const override; +}; +} // namespace acquire::sink::zarr +#endif // H_ACQUIRE_STORAGE_ZARR_V3_V0 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d8b4d959..939600b4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,6 +1,6 @@ -if(${NOTEST}) +if (${NOTEST}) message(STATUS "Skipping test targets") -else() +else () set(NOTEST "TRUE") aq_require(acquire-driver-common) aq_require(acquire-video-runtime) @@ -15,60 +15,62 @@ else() # Tests # set(tests - list-devices - get-meta - unit-tests - multiscale-with-trivial-tile-size - no-set-chunking - write-zarr-compressed-multiscale - write-zarr-compressed-with-chunking - write-zarr-compressed-with-chunking-and-rollover - write-zarr-raw-multiscale - write-zarr-raw-with-chunking - write-zarr-raw-with-chunking-and-rollover - write-zarr-raw-with-ragged-tiling - write-zarr-with-defaults - write-zarr-with-lz4-compression - write-zarr-with-zstd-compression + list-devices + get-meta + unit-tests + multiscale-with-trivial-tile-size + no-set-chunking + write-zarr-compressed-multiscale + write-zarr-compressed-with-chunking + write-zarr-compressed-with-chunking-and-rollover + write-zarr-raw-multiscale + write-zarr-raw-with-chunking + write-zarr-raw-with-chunking-and-rollover + write-zarr-raw-with-ragged-tiling + write-zarr-v3-compressed + write-zarr-v3-raw + write-zarr-with-defaults + write-zarr-with-lz4-compression + write-zarr-with-zstd-compression ) - foreach(name ${tests}) + foreach (name ${tests}) set(tgt "${project}-${name}") add_executable(${tgt} ${name}.cpp) target_compile_definitions(${tgt} PUBLIC "TEST=\"${tgt}\"") set_target_properties(${tgt} PROPERTIES - MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>" + MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>" ) target_include_directories(${tgt} PRIVATE "${CMAKE_CURRENT_LIST_DIR}/../") target_link_libraries(${tgt} - acquire-core-logger - acquire-core-platform - acquire-video-runtime + acquire-core-logger + acquire-core-platform + acquire-video-runtime ) add_test(NAME test-${tgt} COMMAND ${tgt}) set_tests_properties(test-${tgt} PROPERTIES LABELS "anyplatform;acquire-driver-zarr") - endforeach() + endforeach () # # Copy driver to tests # list(POP_FRONT tests onename) - foreach(driver - acquire-driver-common - acquire-driver-zarr + foreach (driver + acquire-driver-common + acquire-driver-zarr ) add_custom_target(${project}-copy-${driver}-for-tests - COMMAND ${CMAKE_COMMAND} -E copy - $ - $ - DEPENDS ${driver} - COMMENT "Copying ${driver} to $" + COMMAND ${CMAKE_COMMAND} -E copy + $ + $ + DEPENDS ${driver} + COMMENT "Copying ${driver} to $" ) - foreach(name ${tests}) + foreach (name ${tests}) add_dependencies(${tgt} ${project}-copy-${driver}-for-tests) - endforeach() - endforeach() -endif() + endforeach () + endforeach () +endif () diff --git a/tests/get-meta.cpp b/tests/get-meta.cpp index 9ed0d09e..50b74f3e 100644 --- a/tests/get-meta.cpp +++ b/tests/get-meta.cpp @@ -85,21 +85,11 @@ main() CHECK(Device_Ok == storage_get_meta(storage, &metadata)); - CHECK(metadata.chunking.supported); - ASSERT_EQ(int, "%d", PropertyType_FixedPrecision, - (int)metadata.chunking.max_bytes_per_chunk.type); - // minimum cap on chunk size is 16 MiB - ASSERT_EQ(int, - "%d", - 16 << 20, - (int)metadata.chunking.max_bytes_per_chunk.low); - // maximum cap on chunk size is 1 GiB - ASSERT_EQ(int, - "%d", - 1 << 30, - (int)metadata.chunking.max_bytes_per_chunk.high); - - CHECK(metadata.multiscale.supported); + CHECK(metadata.chunk_dims_px.is_supported); + CHECK((bool)metadata.shard_dims_chunks.is_supported == + name.starts_with("ZarrV3")); + CHECK((bool)metadata.multiscale.is_supported != + name.starts_with("ZarrV3")); CHECK(Device_Ok == driver_close_device(device)); } diff --git a/tests/multiscale-with-trivial-tile-size.cpp b/tests/multiscale-with-trivial-tile-size.cpp index 107f7b09..db56a991 100644 --- a/tests/multiscale-with-trivial-tile-size.cpp +++ b/tests/multiscale-with-trivial-tile-size.cpp @@ -1,4 +1,5 @@ -/// @brief Test that enabling multiscale without specifying a tile size doesn't crash. +/// @brief Test that enabling multiscale without specifying a tile size doesn't +/// crash. #include "device/hal/device.manager.h" #include "acquire.h" @@ -71,6 +72,7 @@ reporter(int is_error, const static uint32_t frame_width = 640; const static uint32_t frame_height = 480; +const static uint32_t chunk_planes = 128; const static uint32_t max_frame_count = 100; void @@ -103,8 +105,10 @@ setup(AcquireRuntime* runtime) 0, sample_spacing_um); - storage_properties_set_chunking_props( - &props.video[0].storage.settings, frame_width, frame_height, 1, 64 << 20); + storage_properties_set_chunking_props(&props.video[0].storage.settings, + frame_width, + frame_height, + chunk_planes); storage_properties_set_enable_multiscale(&props.video[0].storage.settings, 1); diff --git a/tests/no-set-chunking.cpp b/tests/no-set-chunking.cpp index e0717421..91d90426 100644 --- a/tests/no-set-chunking.cpp +++ b/tests/no-set-chunking.cpp @@ -74,11 +74,8 @@ setup(AcquireRuntime* runtime) OK(acquire_get_configuration_metadata(runtime, &metadata)); props.video[0].camera.settings.binning = 1; - props.video[0].camera.settings.pixel_type = SampleType_u16; - props.video[0].camera.settings.shape = { - .x = (uint32_t)metadata.video[0].camera.shape.x.high, - .y = (uint32_t)metadata.video[0].camera.shape.y.high, - }; + props.video[0].camera.settings.pixel_type = SampleType_u8; + props.video[0].camera.settings.shape = { .x = 64, .y = 48 }; props.video[0].camera.settings.exposure_time_us = 1e4; props.video[0].max_frame_count = 10; diff --git a/tests/write-zarr-compressed-multiscale.cpp b/tests/write-zarr-compressed-multiscale.cpp index 38cfbccf..4aa1491e 100644 --- a/tests/write-zarr-compressed-multiscale.cpp +++ b/tests/write-zarr-compressed-multiscale.cpp @@ -89,10 +89,10 @@ reporter(int is_error, const static uint32_t frame_width = 1920; const static uint32_t frame_height = 1080; -const static uint32_t tile_width = frame_width / 3; -const static uint32_t tile_height = frame_height / 3; +const static uint32_t chunk_width = frame_width / 3; +const static uint32_t chunk_height = frame_height / 3; +const static uint32_t chunk_planes = 72; -const static uint32_t max_bytes_per_chunk = 16 << 20; const static auto max_frames = 74; void @@ -127,10 +127,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); CHECK(storage_properties_set_enable_multiscale( &props.video[0].storage.settings, 1)); diff --git a/tests/write-zarr-compressed-with-chunking-and-rollover.cpp b/tests/write-zarr-compressed-with-chunking-and-rollover.cpp index 65ae1dac..dd36d2cf 100644 --- a/tests/write-zarr-compressed-with-chunking-and-rollover.cpp +++ b/tests/write-zarr-compressed-with-chunking-and-rollover.cpp @@ -72,12 +72,9 @@ reporter(int is_error, const static uint32_t frame_width = 1920; const static uint32_t frame_height = 1080; -const static uint32_t tile_width = frame_width / 2; -const static uint32_t tile_height = frame_height / 2; - -const static uint32_t max_bytes_per_chunk = 32 << 20; -const static auto expected_frames_per_chunk = - (uint32_t)std::floor(max_bytes_per_chunk / (tile_width * tile_height)); +const static uint32_t chunk_width = frame_width / 2; +const static uint32_t chunk_height = frame_height / 2; +const static uint32_t chunk_planes = 128; void acquire(AcquireRuntime* runtime, const char* filename) @@ -111,10 +108,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -123,7 +119,7 @@ acquire(AcquireRuntime* runtime, const char* filename) // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; // should trigger rollover - props.video[0].max_frame_count = expected_frames_per_chunk + 1; + props.video[0].max_frame_count = chunk_planes + 1; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -156,16 +152,16 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk + 1, shape[0]); + ASSERT_EQ(int, "%d", chunk_planes + 1, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", chunk_planes, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); - ASSERT_EQ(int, "%d", tile_height, chunks[2]); - ASSERT_EQ(int, "%d", tile_width, chunks[3]); + ASSERT_EQ(int, "%d", chunk_height, chunks[2]); + ASSERT_EQ(int, "%d", chunk_width, chunks[3]); // check chunked data auto chunk_size = chunks[0].get() * chunks[1].get() * diff --git a/tests/write-zarr-compressed-with-chunking.cpp b/tests/write-zarr-compressed-with-chunking.cpp index 097fe75f..e2943d27 100644 --- a/tests/write-zarr-compressed-with-chunking.cpp +++ b/tests/write-zarr-compressed-with-chunking.cpp @@ -72,12 +72,9 @@ reporter(int is_error, const static uint32_t frame_width = 1920; const static uint32_t frame_height = 1080; -const static uint32_t tile_width = frame_width / 2; -const static uint32_t tile_height = frame_height / 2; - -const static uint32_t max_bytes_per_chunk = 32 << 20; -const static auto expected_frames_per_chunk = - (uint32_t)std::floor(max_bytes_per_chunk / (tile_width * tile_height)); +const static uint32_t chunk_width = frame_width / 2; +const static uint32_t chunk_height = frame_height / 2; +const static uint32_t chunk_planes = 64; void acquire(AcquireRuntime* runtime, const char* filename) @@ -111,10 +108,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -122,7 +118,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = chunk_planes; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -155,16 +151,16 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", chunk_planes, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", chunk_planes, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); - ASSERT_EQ(int, "%d", tile_height, chunks[2]); - ASSERT_EQ(int, "%d", tile_width, chunks[3]); + ASSERT_EQ(int, "%d", chunk_height, chunks[2]); + ASSERT_EQ(int, "%d", chunk_width, chunks[3]); // check chunked data auto chunk_size = chunks[0].get() * chunks[1].get() * diff --git a/tests/write-zarr-raw-multiscale.cpp b/tests/write-zarr-raw-multiscale.cpp index e7e891f0..33fa99c7 100644 --- a/tests/write-zarr-raw-multiscale.cpp +++ b/tests/write-zarr-raw-multiscale.cpp @@ -79,10 +79,10 @@ reporter(int is_error, const static uint32_t frame_width = 240; const static uint32_t frame_height = 135; -const static uint32_t tile_width = frame_width / 3; -const static uint32_t tile_height = frame_height / 3; +const static uint32_t chunk_width = frame_width / 3; +const static uint32_t chunk_height = frame_height / 3; +const static uint32_t chunk_planes = 128; -const static uint32_t max_bytes_per_chunk = 1 << 16; const static auto max_frames = 100; void @@ -117,10 +117,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); CHECK(storage_properties_set_enable_multiscale( &props.video[0].storage.settings, 1)); @@ -129,7 +128,6 @@ acquire(AcquireRuntime* runtime, const char* filename) props.video[0].camera.settings.pixel_type = SampleType_u8; props.video[0].camera.settings.shape = { .x = frame_width, .y = frame_height }; -// props.video[0].camera.settings.exposure_time_us = 1e5; props.video[0].max_frame_count = max_frames; OK(acquire_configure(runtime, &props)); @@ -195,7 +193,8 @@ verify_layer(const LayerTestCase& test_case) std::to_string(layer) / "0" / "0" / std::to_string(i) / std::to_string(j); CHECK(fs::is_regular_file(chunk_file_path)); - ASSERT_EQ(int, "%d", chunk_size, fs::file_size(chunk_file_path)); + const auto file_size = fs::file_size(chunk_file_path); + ASSERT_EQ(int, "%d", chunk_size, file_size); } } diff --git a/tests/write-zarr-raw-with-chunking-and-rollover.cpp b/tests/write-zarr-raw-with-chunking-and-rollover.cpp index 23dfaa3d..f282a2f2 100644 --- a/tests/write-zarr-raw-with-chunking-and-rollover.cpp +++ b/tests/write-zarr-raw-with-chunking-and-rollover.cpp @@ -62,12 +62,9 @@ reporter(int is_error, const static uint32_t frame_width = 1920; const static uint32_t frame_height = 1080; -const static uint32_t tile_width = frame_width / 2; -const static uint32_t tile_height = frame_height / 2; - -const static uint32_t max_bytes_per_chunk = 32 << 20; -const static auto expected_frames_per_chunk = - (uint32_t)std::floor(max_bytes_per_chunk / (tile_width * tile_height)); +const static uint32_t chunk_width = frame_width / 2; +const static uint32_t chunk_height = frame_height / 2; +const static uint32_t chunk_planes = 512; void acquire(AcquireRuntime* runtime, const char* filename) @@ -101,10 +98,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -113,7 +109,7 @@ acquire(AcquireRuntime* runtime, const char* filename) // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; // should trigger rollover - props.video[0].max_frame_count = expected_frames_per_chunk + 1; + props.video[0].max_frame_count = chunk_planes + 1; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -146,16 +142,16 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk + 1, shape[0]); + ASSERT_EQ(int, "%d", chunk_planes + 1, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", chunk_planes, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); - ASSERT_EQ(int, "%d", tile_height, chunks[2]); - ASSERT_EQ(int, "%d", tile_width, chunks[3]); + ASSERT_EQ(int, "%d", chunk_height, chunks[2]); + ASSERT_EQ(int, "%d", chunk_width, chunks[3]); // check chunked data auto chunk_size = chunks[0].get() * chunks[1].get() * diff --git a/tests/write-zarr-raw-with-chunking.cpp b/tests/write-zarr-raw-with-chunking.cpp index 17c72d81..d9a5fd13 100644 --- a/tests/write-zarr-raw-with-chunking.cpp +++ b/tests/write-zarr-raw-with-chunking.cpp @@ -60,12 +60,9 @@ reporter(int is_error, const static uint32_t frame_width = 1920; const static uint32_t frame_height = 1080; -const static uint32_t tile_width = frame_width / 2; -const static uint32_t tile_height = frame_height / 2; - -const static uint32_t max_bytes_per_chunk = 32 << 20; -const static auto expected_frames_per_chunk = - (uint32_t)std::floor(max_bytes_per_chunk / (tile_width * tile_height)); +const static uint32_t chunk_width = frame_width / 2; +const static uint32_t chunk_height = frame_height / 2; +const static uint32_t chunk_planes = 128; void acquire(AcquireRuntime* runtime, const char* filename) @@ -99,10 +96,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -110,7 +106,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = chunk_planes; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -143,16 +139,16 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", chunk_planes, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", chunk_planes, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); - ASSERT_EQ(int, "%d", tile_height, chunks[2]); - ASSERT_EQ(int, "%d", tile_width, chunks[3]); + ASSERT_EQ(int, "%d", chunk_height, chunks[2]); + ASSERT_EQ(int, "%d", chunk_width, chunks[3]); // check chunked data auto chunk_size = chunks[0].get() * chunks[1].get() * diff --git a/tests/write-zarr-raw-with-ragged-tiling.cpp b/tests/write-zarr-raw-with-ragged-tiling.cpp index c6f87a21..d2d9ff8d 100644 --- a/tests/write-zarr-raw-with-ragged-tiling.cpp +++ b/tests/write-zarr-raw-with-ragged-tiling.cpp @@ -67,13 +67,15 @@ reporter(int is_error, a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ } while (0) -const static uint32_t frame_width = 128; -const static uint32_t frame_height = 96; +const static uint32_t frame_width = 256; +const static uint32_t frame_height = 192; -const static uint32_t tile_width = frame_width / 3; // 128 is not divisible by 3 -const static uint32_t tile_height = frame_height / 5; // 96 is not divisible by 5 +const static uint32_t chunk_width = + frame_width / 3; // 128 is not divisible by 3 +const static uint32_t chunk_height = + frame_height / 5; // 96 is not divisible by 5 +const static uint32_t chunk_planes = 64; -const static uint32_t max_bytes_per_chunk = 32 << 20; const static uint32_t max_frame_count = 70; void @@ -108,10 +110,9 @@ acquire(AcquireRuntime* runtime, const char* filename) CHECK( storage_properties_set_chunking_props(&props.video[0].storage.settings, - tile_width, - tile_height, - 1, - max_bytes_per_chunk)); + chunk_width, + chunk_height, + chunk_planes)); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -220,10 +221,10 @@ main() ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", max_frame_count, chunks[0]); + ASSERT_EQ(int, "%d", chunk_planes, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); - ASSERT_EQ(int, "%d", tile_height, chunks[2]); - ASSERT_EQ(int, "%d", tile_width, chunks[3]); + ASSERT_EQ(int, "%d", chunk_height, chunks[2]); + ASSERT_EQ(int, "%d", chunk_width, chunks[3]); // check chunked data auto chunk_size = chunks[0].get() * chunks[1].get() * diff --git a/tests/write-zarr-v3-compressed.cpp b/tests/write-zarr-v3-compressed.cpp new file mode 100644 index 00000000..dd2d8185 --- /dev/null +++ b/tests/write-zarr-v3-compressed.cpp @@ -0,0 +1,304 @@ +/// @brief Test the basic Zarr v3 writer. +/// @details Ensure that chunking is working as expected and metadata is written +/// correctly. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "platform.h" // clock +#include "logger.h" + +#include +#include +#include + +#include "json.hpp" + +namespace fs = std::filesystem; +using json = nlohmann::json; + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",43,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +const static uint32_t frame_width = 1920; +const static uint32_t chunk_width = frame_width / 4; +const static uint32_t frame_height = 1080; +const static uint32_t chunk_height = frame_height / 3; +const static uint32_t frames_per_chunk = 48; +const static uint32_t max_frame_count = 48; + +void +setup(AcquireRuntime* runtime) +{ + const char* filename = TEST ".zarr"; + auto dm = acquire_device_manager(runtime); + CHECK(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3Blosc1ZstdByteShuffle"), + &props.video[0].storage.identifier)); + + const char external_metadata[] = R"({"hello":"world"})"; + const struct PixelScale sample_spacing_um = { 1, 1 }; + + storage_properties_init(&props.video[0].storage.settings, + 0, + (char*)filename, + strlen(filename) + 1, + (char*)external_metadata, + sizeof(external_metadata), + sample_spacing_um); + + storage_properties_set_chunking_props(&props.video[0].storage.settings, + chunk_width, + chunk_height, + frames_per_chunk); + + storage_properties_set_sharding_props( + &props.video[0].storage.settings, 4, 3, 1); + + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u8; + props.video[0].camera.settings.shape = { .x = frame_width, + .y = frame_height }; + props.video[0].max_frame_count = max_frame_count; + props.video[0].camera.settings.exposure_time_us = 1e4; + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + const auto next = [](VideoFrame* cur) -> VideoFrame* { + return (VideoFrame*)(((uint8_t*)cur) + cur->bytes_of_frame); + }; + + const auto consumed_bytes = [](const VideoFrame* const cur, + const VideoFrame* const end) -> size_t { + return (uint8_t*)end - (uint8_t*)cur; + }; + + struct clock clock; + static double time_limit_ms = 20000.0; + clock_init(&clock); + clock_shift_ms(&clock, time_limit_ms); + OK(acquire_start(runtime)); + { + uint64_t nframes = 0; + VideoFrame *beg, *end, *cur; + do { + struct clock throttle; + clock_init(&throttle); + // EXPECT(clock_cmp_now(&clock) < 0, + // "Timeout at %f ms", + // clock_toc_ms(&clock) + time_limit_ms); + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + clock_sleep_ms(&throttle, 100.0f); + + LOG( + "stream %d nframes %d time %f", 0, nframes, clock_toc_ms(&clock)); + } while (DeviceState_Running == acquire_get_state(runtime) && + nframes < max_frame_count); + + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + + CHECK(nframes == max_frame_count); + } + + OK(acquire_stop(runtime)); +} + +void +validate(AcquireRuntime* runtime) +{ + const fs::path test_path(TEST ".zarr"); + CHECK(fs::is_directory(test_path)); + + // check the zarr.json metadata file + fs::path metadata_path = test_path / "zarr.json"; + CHECK(fs::is_regular_file(metadata_path)); + std::ifstream f(metadata_path); + json metadata = json::parse(f); + + CHECK(metadata["extensions"].empty()); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["metadata_encoding"]); + CHECK(".json" == metadata["metadata_key_suffix"]); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["zarr_format"]); + + // check the group metadata file + metadata_path = test_path / "meta" / "root.group.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + CHECK("world" == metadata["attributes"]["acquire"]["hello"]); + + // check the array metadata file + metadata_path = test_path / "meta" / "root" / "0.array.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + + const auto chunk_grid = metadata["chunk_grid"]; + CHECK("/" == chunk_grid["separator"]); + CHECK("regular" == chunk_grid["type"]); + + const auto chunk_shape = chunk_grid["chunk_shape"]; + ASSERT_EQ(int, "%d", frames_per_chunk, chunk_shape[0]); + ASSERT_EQ(int, "%d", 1, chunk_shape[1]); + ASSERT_EQ(int, "%d", chunk_height, chunk_shape[2]); + ASSERT_EQ(int, "%d", chunk_width, chunk_shape[3]); + + CHECK("C" == metadata["chunk_memory_layout"]); + CHECK("u1" == metadata["data_type"]); + CHECK(metadata["extensions"].empty()); + + const auto array_shape = metadata["shape"]; + ASSERT_EQ(int, "%d", max_frame_count, array_shape[0]); + ASSERT_EQ(int, "%d", 1, array_shape[1]); + ASSERT_EQ(int, "%d", frame_height, array_shape[2]); + ASSERT_EQ(int, "%d", frame_width, array_shape[3]); + + const auto compressor = metadata["compressor"]; + CHECK("https://purl.org/zarr/spec/codec/blosc/1.0" == compressor["codec"]); + + auto configuration = compressor["configuration"]; + ASSERT_EQ(int, "%d", 0, configuration["blocksize"]); + ASSERT_EQ(int, "%d", 1, configuration["clevel"]); + ASSERT_EQ(int, "%d", 1, configuration["shuffle"]); + CHECK("zstd" == configuration["cname"]); + + // sharding + const auto storage_transformers = metadata["storage_transformers"]; + configuration = storage_transformers[0]["configuration"]; + const auto& cps = configuration["chunks_per_shard"]; + ASSERT_EQ(int, "%d", 1, cps[0]); + ASSERT_EQ(int, "%d", 1, cps[1]); + ASSERT_EQ(int, "%d", 3, cps[2]); + ASSERT_EQ(int, "%d", 4, cps[3]); + const size_t chunks_per_shard = cps[0].get() * + cps[1].get() * + cps[2].get() * cps[3].get(); + + // check that each chunked data file is the expected size + uint32_t bytes_per_chunk = + chunk_shape[0].get() * chunk_shape[1].get() * + chunk_shape[2].get() * chunk_shape[3].get(); + for (auto t = 0; t < std::ceil(max_frame_count / frames_per_chunk); ++t) { + fs::path path = test_path / "data" / "root" / "0" / + ("c" + std::to_string(t)) / "0" / "0" / "0"; + CHECK(fs::is_regular_file(path)); + + auto file_size = fs::file_size(path); + ASSERT_GT(int, "%d", file_size, 0); + ASSERT_GT(int, "%d", chunks_per_shard* bytes_per_chunk, file_size); + } +} + +void +teardown(AcquireRuntime* runtime) +{ + LOG("Done (OK)"); + acquire_shutdown(runtime); +} + +int +main() +{ + auto runtime = acquire_init(reporter); + + setup(runtime); + acquire(runtime); + validate(runtime); + teardown(runtime); + + return 0; +} diff --git a/tests/write-zarr-v3-raw.cpp b/tests/write-zarr-v3-raw.cpp new file mode 100644 index 00000000..b70637a7 --- /dev/null +++ b/tests/write-zarr-v3-raw.cpp @@ -0,0 +1,298 @@ +/// @brief Test the basic Zarr v3 writer. +/// @details Ensure that chunking is working as expected and metadata is written +/// correctly. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "platform.h" // clock +#include "logger.h" + +#include +#include +#include + +#include "json.hpp" + +namespace fs = std::filesystem; +using json = nlohmann::json; + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",43,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +const static uint32_t frame_width = 1080; +const static uint32_t chunk_width = frame_width / 4; +const static uint32_t frame_height = 960; +const static uint32_t chunk_height = frame_height / 3; +const static uint32_t frames_per_chunk = 48; +const static uint32_t max_frame_count = 48; + +void +setup(AcquireRuntime* runtime) +{ + const char* filename = TEST ".zarr"; + auto dm = acquire_device_manager(runtime); + CHECK(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3"), + &props.video[0].storage.identifier)); + + const char external_metadata[] = R"({"hello":"world"})"; + const struct PixelScale sample_spacing_um = { 1, 1 }; + + storage_properties_init(&props.video[0].storage.settings, + 0, + (char*)filename, + strlen(filename) + 1, + (char*)external_metadata, + sizeof(external_metadata), + sample_spacing_um); + + storage_properties_set_chunking_props(&props.video[0].storage.settings, + chunk_width, + chunk_height, + frames_per_chunk); + + storage_properties_set_sharding_props( + &props.video[0].storage.settings, 4, 3, 1); + + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u8; + props.video[0].camera.settings.shape = { .x = frame_width, + .y = frame_height }; + props.video[0].max_frame_count = max_frame_count; + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + const auto next = [](VideoFrame* cur) -> VideoFrame* { + return (VideoFrame*)(((uint8_t*)cur) + cur->bytes_of_frame); + }; + + const auto consumed_bytes = [](const VideoFrame* const cur, + const VideoFrame* const end) -> size_t { + return (uint8_t*)end - (uint8_t*)cur; + }; + + struct clock clock; + static double time_limit_ms = 20000.0; + clock_init(&clock); + clock_shift_ms(&clock, time_limit_ms); + OK(acquire_start(runtime)); + { + uint64_t nframes = 0; + VideoFrame *beg, *end, *cur; + do { + struct clock throttle; + clock_init(&throttle); + EXPECT(clock_cmp_now(&clock) < 0, + "Timeout at %f ms", + clock_toc_ms(&clock) + time_limit_ms); + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + clock_sleep_ms(&throttle, 100.0f); + + LOG( + "stream %d nframes %d time %f", 0, nframes, clock_toc_ms(&clock)); + } while (DeviceState_Running == acquire_get_state(runtime) && + nframes < max_frame_count); + + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + + CHECK(nframes == max_frame_count); + } + + OK(acquire_stop(runtime)); +} + +void +validate(AcquireRuntime* runtime) +{ + const fs::path test_path(TEST ".zarr"); + CHECK(fs::is_directory(test_path)); + + // check the zarr.json metadata file + fs::path metadata_path = test_path / "zarr.json"; + CHECK(fs::is_regular_file(metadata_path)); + std::ifstream f(metadata_path); + json metadata = json::parse(f); + + CHECK(metadata["extensions"].empty()); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["metadata_encoding"]); + CHECK(".json" == metadata["metadata_key_suffix"]); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["zarr_format"]); + + // check the group metadata file + metadata_path = test_path / "meta" / "root.group.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + CHECK("world" == metadata["attributes"]["acquire"]["hello"]); + + // check the array metadata file + metadata_path = test_path / "meta" / "root" / "0.array.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + + const auto chunk_grid = metadata["chunk_grid"]; + CHECK("/" == chunk_grid["separator"]); + CHECK("regular" == chunk_grid["type"]); + + const auto chunk_shape = chunk_grid["chunk_shape"]; + ASSERT_EQ(int, "%d", frames_per_chunk, chunk_shape[0]); + ASSERT_EQ(int, "%d", 1, chunk_shape[1]); + ASSERT_EQ(int, "%d", chunk_height, chunk_shape[2]); + ASSERT_EQ(int, "%d", chunk_width, chunk_shape[3]); + + CHECK("C" == metadata["chunk_memory_layout"]); + CHECK("u1" == metadata["data_type"]); + CHECK(metadata["extensions"].empty()); + + const auto array_shape = metadata["shape"]; + ASSERT_EQ(int, "%d", max_frame_count, array_shape[0]); + ASSERT_EQ(int, "%d", 1, array_shape[1]); + ASSERT_EQ(int, "%d", frame_height, array_shape[2]); + ASSERT_EQ(int, "%d", frame_width, array_shape[3]); + + // sharding + const auto storage_transformers = metadata["storage_transformers"]; + const auto configuration = storage_transformers[0]["configuration"]; + const auto& cps = configuration["chunks_per_shard"]; + ASSERT_EQ(int, "%d", 1, cps[0]); + ASSERT_EQ(int, "%d", 1, cps[1]); + ASSERT_EQ(int, "%d", 3, cps[2]); + ASSERT_EQ(int, "%d", 4, cps[3]); + const size_t chunks_per_shard = cps[0].get() * + cps[1].get() * + cps[2].get() * cps[3].get(); + + const auto index_size = 2 * chunks_per_shard * sizeof(uint64_t); + + // check that each chunked data file is the expected size + const uint32_t bytes_per_chunk = + chunk_shape[0].get() * chunk_shape[1].get() * + chunk_shape[2].get() * chunk_shape[3].get(); + for (auto t = 0; t < std::ceil(max_frame_count / frames_per_chunk); ++t) { + fs::path path = test_path / "data" / "root" / "0" / + ("c" + std::to_string(t)) / "0" / "0" / "0"; + + CHECK(fs::is_regular_file(path)); + + auto file_size = fs::file_size(path); + + ASSERT_EQ( + int, "%d", chunks_per_shard* bytes_per_chunk + index_size, file_size); + } +} + +void +teardown(AcquireRuntime* runtime) +{ + LOG("Done (OK)"); + acquire_shutdown(runtime); +} + +int +main() +{ + auto runtime = acquire_init(reporter); + + setup(runtime); + acquire(runtime); + validate(runtime); + teardown(runtime); + + return 0; +} diff --git a/tests/write-zarr-with-defaults.cpp b/tests/write-zarr-with-defaults.cpp index 159e086c..66f8c882 100644 --- a/tests/write-zarr-with-defaults.cpp +++ b/tests/write-zarr-with-defaults.cpp @@ -69,7 +69,7 @@ reporter(int is_error, const static uint32_t frame_width = 64; const static uint32_t frame_height = 48; -const static uint32_t expected_frames_per_chunk = 70; +const static uint32_t frames_per_chunk = 70; void acquire(AcquireRuntime* runtime, const char* filename) @@ -101,8 +101,10 @@ acquire(AcquireRuntime* runtime, const char* filename) sizeof(external_metadata), sample_spacing_um); - storage_properties_set_chunking_props( - &props.video[0].storage.settings, frame_width, frame_height, 1, 64 << 20); + storage_properties_set_chunking_props(&props.video[0].storage.settings, + frame_width, + frame_height, + frames_per_chunk); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -110,7 +112,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = frames_per_chunk; OK(acquire_configure(runtime, &props)); @@ -209,13 +211,13 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); ASSERT_EQ(int, "%d", frame_height, chunks[2]); ASSERT_EQ(int, "%d", frame_width, chunks[3]); diff --git a/tests/write-zarr-with-lz4-compression.cpp b/tests/write-zarr-with-lz4-compression.cpp index 9dd0d713..5c1a3d20 100644 --- a/tests/write-zarr-with-lz4-compression.cpp +++ b/tests/write-zarr-with-lz4-compression.cpp @@ -68,9 +68,9 @@ reporter(int is_error, a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ } while (0) -static const uint32_t expected_frames_per_chunk = 70; static const uint32_t frame_width = 64; static const uint32_t frame_height = 48; +static const uint32_t frames_per_chunk = 70; void acquire(AcquireRuntime* runtime, const char* filename) @@ -102,8 +102,10 @@ acquire(AcquireRuntime* runtime, const char* filename) sizeof(external_metadata), sample_spacing_um)); - storage_properties_set_chunking_props( - &props.video[0].storage.settings, frame_width, frame_height, 1, 64 << 20); + storage_properties_set_chunking_props(&props.video[0].storage.settings, + frame_width, + frame_height, + frames_per_chunk); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -111,7 +113,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = frames_per_chunk; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -144,13 +146,13 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); ASSERT_EQ(int, "%d", frame_height, chunks[2]); ASSERT_EQ(int, "%d", frame_width, chunks[3]); diff --git a/tests/write-zarr-with-zstd-compression.cpp b/tests/write-zarr-with-zstd-compression.cpp index 427fec9c..da960bd7 100644 --- a/tests/write-zarr-with-zstd-compression.cpp +++ b/tests/write-zarr-with-zstd-compression.cpp @@ -68,9 +68,9 @@ reporter(int is_error, a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ } while (0) -static const uint32_t expected_frames_per_chunk = 71; static const uint32_t frame_width = 64; static const uint32_t frame_height = 48; +static const uint32_t frames_per_chunk = 64; void acquire(AcquireRuntime* runtime, const char* filename) @@ -102,8 +102,10 @@ acquire(AcquireRuntime* runtime, const char* filename) sizeof(external_metadata), sample_spacing_um)); - storage_properties_set_chunking_props( - &props.video[0].storage.settings, frame_width, frame_height, 1, 64 << 20); + storage_properties_set_chunking_props(&props.video[0].storage.settings, + frame_width, + frame_height, + frames_per_chunk); props.video[0].camera.settings.binning = 1; props.video[0].camera.settings.pixel_type = SampleType_u8; @@ -111,7 +113,7 @@ acquire(AcquireRuntime* runtime, const char* filename) .y = frame_height }; // we may drop frames with lower exposure props.video[0].camera.settings.exposure_time_us = 1e4; - props.video[0].max_frame_count = expected_frames_per_chunk; + props.video[0].max_frame_count = frames_per_chunk; OK(acquire_configure(runtime, &props)); OK(acquire_start(runtime)); @@ -144,13 +146,13 @@ main() json zarray = json::parse(f); auto shape = zarray["shape"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, shape[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, shape[0]); ASSERT_EQ(int, "%d", 1, shape[1]); ASSERT_EQ(int, "%d", frame_height, shape[2]); ASSERT_EQ(int, "%d", frame_width, shape[3]); auto chunks = zarray["chunks"]; - ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunks[0]); + ASSERT_EQ(int, "%d", frames_per_chunk, chunks[0]); ASSERT_EQ(int, "%d", 1, chunks[1]); ASSERT_EQ(int, "%d", frame_height, chunks[2]); ASSERT_EQ(int, "%d", frame_width, chunks[3]);