diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ee0375e..d82eec48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Support for [Zarr v3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html). +- Support for + the [sharding storage transformer](https://web.archive.org/web/20230213221154/https://zarr-specs.readthedocs.io/en/latest/extensions/storage-transformers/sharding/v1.0.html) + in Zarr v3. - Ship debug libs for C-Blosc on Linux and Mac. ### Changed diff --git a/README.md b/README.md index 8fc700b6..45404a43 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,9 @@ This is an Acquire Driver that supports chunked streaming to [zarr][]. - **Zarr** - **ZarrBlosc1ZstdByteShuffle** - **ZarrBlosc1Lz4ByteShuffle** +- **ZarrV3** +- **ZarrV3Blosc1ZstdByteShuffle** +- **ZarrV3Blosc1Lz4ByteShuffle** ## Using the Zarr storage device @@ -24,6 +27,10 @@ Chunking is configured using `storage_properties_set_chunking_props()` when conf Multiscale storage can be enabled or disabled by calling `storage_properties_set_enable_multiscale()` when configuring the video stream. +For the [Zarr v3] version of each device, you can use the `ZarrV3*` devices. +**Note:** Zarr v3 is not [yet](https://github.com/ome/ngff/pull/206) supported by the Python OME-Zarr library, so you +will not be able to read multiscale metadata from the resulting dataset. + ### Configuring chunking You can configure chunking by calling `storage_properties_set_chunking_props()` on your `StorageProperties` object @@ -41,21 +48,21 @@ storage_properties_set_chunking_props(struct StorageProperties* out, ``` | ![frames](https://github.com/aliddell/acquire-driver-zarr/assets/844464/3510d468-4751-4fa0-b2bf-0e29a5f3ea1c) | -|:--:| -| A collection of frames. | +|:-------------------------------------------------------------------------------------------------------------:| +| A collection of frames. | A _tile_ is a contiguous section, or region of interest, of a _frame_. | ![tiles](https://github.com/aliddell/acquire-driver-zarr/assets/844464/f8d16139-e0ac-44db-855f-2f5ef305c98b) | -|:--:| -| A collection of frames, divided into tiles. | +|:------------------------------------------------------------------------------------------------------------:| +| A collection of frames, divided into tiles. | A _chunk_ is nothing more than some number of stacked tiles from subsequent frames, with each tile in a chunk having the same ROI in its respective frame. -| ![chunks](https://github.com/aliddell/acquire-driver-zarr/assets/844464/653e4d82-363e-4e04-9a42-927b052fb6e7) | -|:--:| -| A collection of frames, divided into tiles. A single chunk has been highlighted in red. | +| ![chunks](https://github.com/aliddell/acquire-driver-zarr/assets/844464/653e4d82-363e-4e04-9a42-927b052fb6e7) | +|:-------------------------------------------------------------------------------------------------------------:| +| A collection of frames, divided into tiles. A single chunk has been highlighted in red. | You can specify the width and height, in pixels, of each tile, and if your frame size has more than one plane, you can specify the number of planes you want per tile as well. @@ -120,3 +127,5 @@ Then the sequence of levels will have dimensions 1920 x 1080, 960 x 540, 480 x 2 [Blosc]: https://github.com/Blosc/c-blosc [Blosc docs]: https://www.blosc.org/ + +[Zarr v3]: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5ac73553..071aa9c3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,6 +19,8 @@ add_library(${tgt} MODULE zarr.cpp zarr.v2.hh zarr.v2.cpp + zarr.v3.hh + zarr.v3.cpp zarr.driver.c ) target_enable_simd(${tgt}) diff --git a/src/zarr.driver.c b/src/zarr.driver.c index b8e78481..75ae89ff 100644 --- a/src/zarr.driver.c +++ b/src/zarr.driver.c @@ -39,6 +39,12 @@ struct Storage* compressed_zarr_v2_zstd_init(); struct Storage* compressed_zarr_v2_lz4_init(); +struct Storage* +zarr_v3_init(); +struct Storage* +compressed_zarr_v3_zstd_init(); +struct Storage* +compressed_zarr_v3_lz4_init(); // // GLOBALS @@ -49,6 +55,9 @@ enum StorageKind Storage_Zarr, Storage_ZarrBlosc1ZstdByteShuffle, Storage_ZarrBlosc1Lz4ByteShuffle, + Storage_ZarrV3, + Storage_ZarrV3Blosc1ZstdByteShuffle, + Storage_ZarrV3Blosc1Lz4ByteShuffle, Storage_Number_Of_Kinds }; @@ -71,6 +80,9 @@ storage_kind_to_string(const enum StorageKind kind) CASE(Storage_Zarr); CASE(Storage_ZarrBlosc1ZstdByteShuffle); CASE(Storage_ZarrBlosc1Lz4ByteShuffle); + CASE(Storage_ZarrV3); + CASE(Storage_ZarrV3Blosc1ZstdByteShuffle); + CASE(Storage_ZarrV3Blosc1Lz4ByteShuffle); #undef CASE default: return "(unknown)"; @@ -99,6 +111,9 @@ zarr_describe(const struct Driver* driver, XXX(Zarr), XXX(ZarrBlosc1ZstdByteShuffle), XXX(ZarrBlosc1Lz4ByteShuffle), + XXX(ZarrV3), + XXX(ZarrV3Blosc1ZstdByteShuffle), + XXX(ZarrV3Blosc1Lz4ByteShuffle), }; // clang-format on #undef XXX @@ -160,6 +175,10 @@ acquire_driver_init_v0(acquire_reporter_t reporter) [Storage_Zarr] = zarr_v2_init, [Storage_ZarrBlosc1ZstdByteShuffle] = compressed_zarr_v2_zstd_init, [Storage_ZarrBlosc1Lz4ByteShuffle] = compressed_zarr_v2_lz4_init, + [Storage_ZarrV3] = zarr_v3_init, + [Storage_ZarrV3Blosc1ZstdByteShuffle] = + compressed_zarr_v3_zstd_init, + [Storage_ZarrV3Blosc1Lz4ByteShuffle] = compressed_zarr_v3_lz4_init, }; memcpy( globals.constructors, impls, nbytes); // cppcheck-suppress uninitvar diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp new file mode 100644 index 00000000..243f8262 --- /dev/null +++ b/src/zarr.v3.cpp @@ -0,0 +1,299 @@ +#include "zarr.v3.hh" +#include "writers/shard.writer.hh" + +#include "json.hpp" + +#include + +namespace zarr = acquire::sink::zarr; + +namespace { +template +struct Storage* +compressed_zarr_v3_init() +{ + try { + zarr::BloscCompressionParams params( + zarr::compression_codec_as_string(), 1, 1); + return new zarr::ZarrV3(std::move(params)); + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + return nullptr; +} + +uint32_t +smallest_prime_factor(uint32_t n) +{ + if (n < 2) { + return 1; + } else if (n % 2 == 0) { + return 2; + } + + // collect additional primes + std::vector primes = { 3, 5, 7, 11, 13, 17, 19, 23 }; + for (auto i = 27; i * i <= n; i += 2) { + bool is_prime = true; + for (auto p : primes) { + if (i % p == 0) { + is_prime = false; + break; + } + } + if (is_prime) { + primes.push_back(i); + } + } + + for (auto p : primes) { + if (n % p == 0) { + return p; + } + } + + return n; +} + +zarr::ImageDims +make_shard_dims(const zarr::ImageDims& frame_dims, + const zarr::ImageDims& tile_dims) +{ + zarr::ImageDims shard_dims = { + .cols = frame_dims.cols, + .rows = frame_dims.rows, + }; + + const auto h_rat = (float)frame_dims.rows / (float)tile_dims.rows; + auto shard_rows = (uint32_t)std::ceil(h_rat * tile_dims.rows); + if (shard_rows > frame_dims.rows) { + auto n_shards_rows = smallest_prime_factor(shard_rows / tile_dims.rows); + shard_dims.rows = n_shards_rows * tile_dims.rows; + } + + const auto w_rat = (float)frame_dims.cols / (float)tile_dims.cols; + auto shard_cols = (uint32_t)std::ceil(w_rat * tile_dims.cols); + if (shard_cols > frame_dims.cols) { + auto n_shards_cols = smallest_prime_factor(shard_cols / tile_dims.cols); + shard_dims.cols = n_shards_cols * tile_dims.cols; + } + + return shard_dims; +} +} // end ::{anonymous} namespace + +zarr::ZarrV3::ZarrV3(BloscCompressionParams&& compression_params) + : Zarr(std::move(compression_params)) + , shard_dims_{} +{ +} + +void +zarr::ZarrV3::allocate_writers_() +{ + const ImageDims& frame_dims = image_tile_shapes_.at(0).first; + const ImageDims& tile_dims = image_tile_shapes_.at(0).second; + shard_dims_ = make_shard_dims(frame_dims, tile_dims); + + uint64_t bytes_per_tile = common::bytes_per_tile(tile_dims, pixel_type_); + + writers_.clear(); + if (blosc_compression_params_.has_value()) { + writers_.push_back(std::make_shared( + frame_dims, + shard_dims_, + tile_dims, + (uint32_t)(max_bytes_per_chunk_ / bytes_per_tile), + (get_data_directory_() / "0").string(), + this, + blosc_compression_params_.value())); + } else { + writers_.push_back(std::make_shared( + frame_dims, + shard_dims_, + tile_dims, + (uint32_t)(max_bytes_per_chunk_ / bytes_per_tile), + (get_data_directory_() / "0").string(), + this)); + } +} + +void +zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const +{ + CHECK(meta); + *meta = { + .chunking = { + .supported = 1, + .max_bytes_per_chunk = { + .writable = 1, + .low = (float)(16 << 20), + .high = (float)(1 << 30), + .type = PropertyType_FixedPrecision }, + }, + .multiscale = { + .supported = 0, + } + }; +} + +void +zarr::ZarrV3::write_array_metadata_(size_t level) const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + if (writers_.size() <= level) { + return; + } + + const ImageDims& image_dims = image_tile_shapes_.at(level).first; + const ImageDims& tile_dims = image_tile_shapes_.at(level).second; + + const uint64_t frame_count = writers_.at(level)->frames_written(); + const auto frames_per_chunk = + std::min(frame_count, + (uint64_t)common::frames_per_chunk( + tile_dims, pixel_type_, max_bytes_per_chunk_)); + + json metadata; + metadata["attributes"] = json::object(); + metadata["chunk_grid"] = json::object({ + { "chunk_shape", + json::array({ + frames_per_chunk, // t + // TODO (aliddell): c? + 1, // z + tile_dims.rows, // y + tile_dims.cols, // x + }) }, + { "separator", "/" }, + { "type", "regular" }, + }); + metadata["chunk_memory_layout"] = "C"; + metadata["data_type"] = common::sample_type_to_dtype(pixel_type_); + metadata["extensions"] = json::array(); + metadata["fill_value"] = 0; + metadata["shape"] = json::array({ + frame_count, // t + // TODO (aliddell): c? + 1, // z + image_dims.rows, // y + image_dims.cols, // x + }); + + if (blosc_compression_params_.has_value()) { + auto params = blosc_compression_params_.value(); + metadata["compressor"] = json::object({ + { "codec", "https://purl.org/zarr/spec/codec/blosc/1.0" }, + { "configuration", + json::object({ + { "blocksize", 0 }, + { "clevel", params.clevel }, + { "cname", params.codec_id }, + { "shuffle", params.shuffle }, + }) }, + }); + } + + // sharding storage transformer + // TODO (aliddell): + // https://github.com/zarr-developers/zarr-python/issues/877 + metadata["storage_transformers"] = json::array(); + metadata["storage_transformers"][0] = json::object({ + { "type", "indexed" }, + { "extension", + "https://purl.org/zarr/spec/storage_transformers/sharding/1.0" }, + { "configuration", + json::object({ + { "chunks_per_shard", + json::array({ + 1, // t + // TODO (aliddell): c? + 1, // z + shard_dims_.rows / tile_dims.rows, // y + shard_dims_.cols / tile_dims.cols, // x + }) }, + }) }, + }); + + auto path = (dataset_root_ / "meta" / "root" / + (std::to_string(level) + ".array.json")) + .string(); + common::write_string(path, metadata.dump(4)); +} + +/// @brief Write the external metadata. +/// @details This is a no-op for ZarrV3. Instead, external metadata is +/// stored in the group metadata. +void +zarr::ZarrV3::write_external_metadata_() const +{ + // no-op +} + +/// @brief Write the metadata for the dataset. +void +zarr::ZarrV3::write_base_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["extensions"] = json::array(); + metadata["metadata_encoding"] = + "https://purl.org/zarr/spec/protocol/core/3.0"; + metadata["metadata_key_suffix"] = ".json"; + metadata["zarr_format"] = "https://purl.org/zarr/spec/protocol/core/3.0"; + + auto path = (dataset_root_ / "zarr.json").string(); + common::write_string(path, metadata.dump(4)); +} + +/// @brief Write the metadata for the group. +/// @details Zarr v3 stores group metadata in +/// /meta/{group_name}.group.json. We will call the group "root". +void +zarr::ZarrV3::write_group_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["attributes"]["acquire"] = json::parse(external_metadata_json_); + + auto path = (dataset_root_ / "meta" / "root.group.json").string(); + common::write_string(path, metadata.dump(4)); +} + +fs::path +zarr::ZarrV3::get_data_directory_() const +{ + return dataset_root_ / "data" / "root"; +} + +extern "C" +{ + struct Storage* zarr_v3_init() + { + try { + return new zarr::ZarrV3(); + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + return nullptr; + } + struct Storage* compressed_zarr_v3_zstd_init() + { + return compressed_zarr_v3_init(); + } + + struct Storage* compressed_zarr_v3_lz4_init() + { + return compressed_zarr_v3_init(); + } +} diff --git a/src/zarr.v3.hh b/src/zarr.v3.hh new file mode 100644 index 00000000..1a6ec919 --- /dev/null +++ b/src/zarr.v3.hh @@ -0,0 +1,33 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_V3_V0 +#define H_ACQUIRE_STORAGE_ZARR_V3_V0 + +#include "zarr.hh" + +namespace acquire::sink::zarr { +struct ZarrV3 final : public Zarr +{ + public: + ZarrV3() = default; + ZarrV3(BloscCompressionParams&& compression_params); + ~ZarrV3() override = default; + + /// StorageInterface + void get_meta(StoragePropertyMetadata* meta) const override; + + private: + ImageDims shard_dims_; + + /// Setup + void allocate_writers_() override; + + /// Metadata + void write_array_metadata_(size_t level) const override; + void write_external_metadata_() const override; + void write_base_metadata_() const override; + void write_group_metadata_() const override; + + /// Filesystem + fs::path get_data_directory_() const override; +}; +} // namespace acquire::sink::zarr +#endif // H_ACQUIRE_STORAGE_ZARR_V3_V0 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d8b4d959..3d479530 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,6 +27,8 @@ else() write-zarr-raw-with-chunking write-zarr-raw-with-chunking-and-rollover write-zarr-raw-with-ragged-tiling + write-zarr-v3-compressed + write-zarr-v3-raw write-zarr-with-defaults write-zarr-with-lz4-compression write-zarr-with-zstd-compression diff --git a/tests/write-zarr-v3-compressed.cpp b/tests/write-zarr-v3-compressed.cpp new file mode 100644 index 00000000..136a3c40 --- /dev/null +++ b/tests/write-zarr-v3-compressed.cpp @@ -0,0 +1,299 @@ +/// @brief Test the basic Zarr v3 writer. +/// @details Ensure that chunking is working as expected and metadata is written +/// correctly. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "platform.h" // clock +#include "logger.h" + +#include +#include +#include + +#include "json.hpp" + +namespace fs = std::filesystem; +using json = nlohmann::json; + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",43,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +const static uint32_t frame_width = 1920; +const static uint32_t tile_width = frame_width / 4; +const static uint32_t frame_height = 1080; +const static uint32_t tile_height = frame_height / 3; +const static uint32_t expected_frames_per_chunk = 48; +const static uint32_t max_frame_count = 48; + +void +setup(AcquireRuntime* runtime) +{ + const char* filename = TEST ".zarr"; + auto dm = acquire_device_manager(runtime); + CHECK(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3Blosc1ZstdByteShuffle"), + &props.video[0].storage.identifier)); + + const char external_metadata[] = R"({"hello":"world"})"; + const struct PixelScale sample_spacing_um = { 1, 1 }; + + storage_properties_init(&props.video[0].storage.settings, + 0, + (char*)filename, + strlen(filename) + 1, + (char*)external_metadata, + sizeof(external_metadata), + sample_spacing_um); + + storage_properties_set_chunking_props( + &props.video[0].storage.settings, tile_width, tile_height, 1, 16 << 20); + + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u8; + props.video[0].camera.settings.shape = { .x = frame_width, + .y = frame_height }; + props.video[0].max_frame_count = max_frame_count; + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + const auto next = [](VideoFrame* cur) -> VideoFrame* { + return (VideoFrame*)(((uint8_t*)cur) + cur->bytes_of_frame); + }; + + const auto consumed_bytes = [](const VideoFrame* const cur, + const VideoFrame* const end) -> size_t { + return (uint8_t*)end - (uint8_t*)cur; + }; + + struct clock clock; + static double time_limit_ms = 20000.0; + clock_init(&clock); + clock_shift_ms(&clock, time_limit_ms); + OK(acquire_start(runtime)); + { + uint64_t nframes = 0; + VideoFrame *beg, *end, *cur; + do { + struct clock throttle; + clock_init(&throttle); +// EXPECT(clock_cmp_now(&clock) < 0, +// "Timeout at %f ms", +// clock_toc_ms(&clock) + time_limit_ms); + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + clock_sleep_ms(&throttle, 100.0f); + + LOG( + "stream %d nframes %d time %f", 0, nframes, clock_toc_ms(&clock)); + } while (DeviceState_Running == acquire_get_state(runtime) && + nframes < max_frame_count); + + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + + CHECK(nframes == max_frame_count); + } + + OK(acquire_stop(runtime)); +} + +void +validate(AcquireRuntime* runtime) +{ + const fs::path test_path(TEST ".zarr"); + CHECK(fs::is_directory(test_path)); + + // check the zarr.json metadata file + fs::path metadata_path = test_path / "zarr.json"; + CHECK(fs::is_regular_file(metadata_path)); + std::ifstream f(metadata_path); + json metadata = json::parse(f); + + CHECK(metadata["extensions"].empty()); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["metadata_encoding"]); + CHECK(".json" == metadata["metadata_key_suffix"]); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["zarr_format"]); + + // check the group metadata file + metadata_path = test_path / "meta" / "root.group.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + CHECK("world" == metadata["attributes"]["acquire"]["hello"]); + + // check the array metadata file + metadata_path = test_path / "meta" / "root" / "0.array.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + + const auto chunk_grid = metadata["chunk_grid"]; + CHECK("/" == chunk_grid["separator"]); + CHECK("regular" == chunk_grid["type"]); + + const auto chunk_shape = chunk_grid["chunk_shape"]; + ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunk_shape[0]); + ASSERT_EQ(int, "%d", 1, chunk_shape[1]); + ASSERT_EQ(int, "%d", tile_height, chunk_shape[2]); + ASSERT_EQ(int, "%d", tile_width, chunk_shape[3]); + + CHECK("C" == metadata["chunk_memory_layout"]); + CHECK("u1" == metadata["data_type"]); + CHECK(metadata["extensions"].empty()); + + const auto array_shape = metadata["shape"]; + ASSERT_EQ(int, "%d", max_frame_count, array_shape[0]); + ASSERT_EQ(int, "%d", 1, array_shape[1]); + ASSERT_EQ(int, "%d", frame_height, array_shape[2]); + ASSERT_EQ(int, "%d", frame_width, array_shape[3]); + + const auto compressor = metadata["compressor"]; + CHECK("https://purl.org/zarr/spec/codec/blosc/1.0" == compressor["codec"]); + + auto configuration = compressor["configuration"]; + ASSERT_EQ(int, "%d", 0, configuration["blocksize"]); + ASSERT_EQ(int, "%d", 1, configuration["clevel"]); + ASSERT_EQ(int, "%d", 1, configuration["shuffle"]); + CHECK("zstd" == configuration["cname"]); + + // sharding + const auto storage_transformers = metadata["storage_transformers"]; + configuration = storage_transformers[0]["configuration"]; + const auto& cps = configuration["chunks_per_shard"]; + ASSERT_EQ(int, "%d", 1, cps[0]); + ASSERT_EQ(int, "%d", 1, cps[1]); + ASSERT_EQ(int, "%d", 3, cps[2]); + ASSERT_EQ(int, "%d", 4, cps[3]); + const size_t chunks_per_shard = cps[0].get() * + cps[1].get() * + cps[2].get() * cps[3].get(); + + // check that each chunked data file is the expected size + uint32_t bytes_per_chunk = + chunk_shape[0].get() * chunk_shape[1].get() * + chunk_shape[2].get() * chunk_shape[3].get(); + for (auto t = 0; t < std::ceil(max_frame_count / expected_frames_per_chunk); + ++t) { + fs::path path = test_path / "data" / "root" / "0" / + ("c" + std::to_string(t)) / "0" / "0" / "0"; + CHECK(fs::is_regular_file(path)); + + auto file_size = fs::file_size(path); + ASSERT_GT(int, "%d", file_size, 0); + ASSERT_GT(int, "%d", chunks_per_shard* bytes_per_chunk, file_size); + } +} + +void +teardown(AcquireRuntime* runtime) +{ + LOG("Done (OK)"); + acquire_shutdown(runtime); +} + +int +main() +{ + auto runtime = acquire_init(reporter); + + setup(runtime); + acquire(runtime); + validate(runtime); + teardown(runtime); + + return 0; +} diff --git a/tests/write-zarr-v3-raw.cpp b/tests/write-zarr-v3-raw.cpp new file mode 100644 index 00000000..404efa51 --- /dev/null +++ b/tests/write-zarr-v3-raw.cpp @@ -0,0 +1,294 @@ +/// @brief Test the basic Zarr v3 writer. +/// @details Ensure that chunking is working as expected and metadata is written +/// correctly. + +#include "device/hal/device.manager.h" +#include "acquire.h" +#include "platform.h" // clock +#include "logger.h" + +#include +#include +#include + +#include "json.hpp" + +namespace fs = std::filesystem; +using json = nlohmann::json; + +void +reporter(int is_error, + const char* file, + int line, + const char* function, + const char* msg) +{ + fprintf(is_error ? stderr : stdout, + "%s%s(%d) - %s: %s\n", + is_error ? "ERROR " : "", + file, + line, + function, + msg); +} + +/// Helper for passing size static strings as function args. +/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`. +/// Expands to `f("hello",5)`. +#define SIZED(str) str, sizeof(str) - 1 + +#define L (aq_logger) +#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + char buf[1 << 8] = { 0 }; \ + ERR(__VA_ARGS__); \ + snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \ + throw std::runtime_error(buf); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e) +#define DEVOK(e) CHECK(Device_Ok == (e)) +#define OK(e) CHECK(AcquireStatus_Ok == (e)) + +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define ASSERT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +/// Check that a>b +/// example: `ASSERT_GT(int,"%d",43,meaning_of_life())` +#define ASSERT_GT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT( \ + a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ + } while (0) + +const static uint32_t frame_width = 1080; +const static uint32_t tile_width = frame_width / 4; +const static uint32_t frame_height = 960; +const static uint32_t tile_height = frame_height / 3; +const static uint32_t expected_frames_per_chunk = 48; +const static uint32_t max_frame_count = 48; + +void +setup(AcquireRuntime* runtime) +{ + const char* filename = TEST ".zarr"; + auto dm = acquire_device_manager(runtime); + CHECK(runtime); + CHECK(dm); + + AcquireProperties props = {}; + OK(acquire_get_configuration(runtime, &props)); + + DEVOK(device_manager_select(dm, + DeviceKind_Camera, + SIZED("simulated.*empty.*"), + &props.video[0].camera.identifier)); + DEVOK(device_manager_select(dm, + DeviceKind_Storage, + SIZED("ZarrV3"), + &props.video[0].storage.identifier)); + + const char external_metadata[] = R"({"hello":"world"})"; + const struct PixelScale sample_spacing_um = { 1, 1 }; + + storage_properties_init(&props.video[0].storage.settings, + 0, + (char*)filename, + strlen(filename) + 1, + (char*)external_metadata, + sizeof(external_metadata), + sample_spacing_um); + + storage_properties_set_chunking_props( + &props.video[0].storage.settings, tile_width, tile_height, 1, 16 << 20); + + props.video[0].camera.settings.binning = 1; + props.video[0].camera.settings.pixel_type = SampleType_u8; + props.video[0].camera.settings.shape = { .x = frame_width, + .y = frame_height }; + props.video[0].max_frame_count = max_frame_count; + + OK(acquire_configure(runtime, &props)); +} + +void +acquire(AcquireRuntime* runtime) +{ + const auto next = [](VideoFrame* cur) -> VideoFrame* { + return (VideoFrame*)(((uint8_t*)cur) + cur->bytes_of_frame); + }; + + const auto consumed_bytes = [](const VideoFrame* const cur, + const VideoFrame* const end) -> size_t { + return (uint8_t*)end - (uint8_t*)cur; + }; + + struct clock clock; + static double time_limit_ms = 20000.0; + clock_init(&clock); + clock_shift_ms(&clock, time_limit_ms); + OK(acquire_start(runtime)); + { + uint64_t nframes = 0; + VideoFrame *beg, *end, *cur; + do { + struct clock throttle; + clock_init(&throttle); + // EXPECT(clock_cmp_now(&clock) < 0, + // "Timeout at %f ms", + // clock_toc_ms(&clock) + time_limit_ms); + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + clock_sleep_ms(&throttle, 100.0f); + + LOG( + "stream %d nframes %d time %f", 0, nframes, clock_toc_ms(&clock)); + } while (DeviceState_Running == acquire_get_state(runtime) && + nframes < max_frame_count); + + OK(acquire_map_read(runtime, 0, &beg, &end)); + for (cur = beg; cur < end; cur = next(cur)) { + LOG("stream %d counting frame w id %d", 0, cur->frame_id); + CHECK(cur->shape.dims.width == frame_width); + CHECK(cur->shape.dims.height == frame_height); + ++nframes; + } + { + uint32_t n = consumed_bytes(beg, end); + OK(acquire_unmap_read(runtime, 0, n)); + if (n) + LOG("stream %d consumed bytes %d", 0, n); + } + + CHECK(nframes == max_frame_count); + } + + OK(acquire_stop(runtime)); +} + +void +validate(AcquireRuntime* runtime) +{ + const fs::path test_path(TEST ".zarr"); + CHECK(fs::is_directory(test_path)); + + // check the zarr.json metadata file + fs::path metadata_path = test_path / "zarr.json"; + CHECK(fs::is_regular_file(metadata_path)); + std::ifstream f(metadata_path); + json metadata = json::parse(f); + + CHECK(metadata["extensions"].empty()); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["metadata_encoding"]); + CHECK(".json" == metadata["metadata_key_suffix"]); + CHECK("https://purl.org/zarr/spec/protocol/core/3.0" == + metadata["zarr_format"]); + + // check the group metadata file + metadata_path = test_path / "meta" / "root.group.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + CHECK("world" == metadata["attributes"]["acquire"]["hello"]); + + // check the array metadata file + metadata_path = test_path / "meta" / "root" / "0.array.json"; + CHECK(fs::is_regular_file(metadata_path)); + + f = std::ifstream(metadata_path); + metadata = json::parse(f); + + const auto chunk_grid = metadata["chunk_grid"]; + CHECK("/" == chunk_grid["separator"]); + CHECK("regular" == chunk_grid["type"]); + + const auto chunk_shape = chunk_grid["chunk_shape"]; + ASSERT_EQ(int, "%d", expected_frames_per_chunk, chunk_shape[0]); + ASSERT_EQ(int, "%d", 1, chunk_shape[1]); + ASSERT_EQ(int, "%d", tile_height, chunk_shape[2]); + ASSERT_EQ(int, "%d", tile_width, chunk_shape[3]); + + CHECK("C" == metadata["chunk_memory_layout"]); + CHECK("u1" == metadata["data_type"]); + CHECK(metadata["extensions"].empty()); + + const auto array_shape = metadata["shape"]; + ASSERT_EQ(int, "%d", max_frame_count, array_shape[0]); + ASSERT_EQ(int, "%d", 1, array_shape[1]); + ASSERT_EQ(int, "%d", frame_height, array_shape[2]); + ASSERT_EQ(int, "%d", frame_width, array_shape[3]); + + // sharding + const auto storage_transformers = metadata["storage_transformers"]; + const auto configuration = storage_transformers[0]["configuration"]; + const auto& cps = configuration["chunks_per_shard"]; + ASSERT_EQ(int, "%d", 1, cps[0]); + ASSERT_EQ(int, "%d", 1, cps[1]); + ASSERT_EQ(int, "%d", 3, cps[2]); + ASSERT_EQ(int, "%d", 4, cps[3]); + const size_t chunks_per_shard = cps[0].get() * + cps[1].get() * + cps[2].get() * cps[3].get(); + + const auto index_size = 2 * chunks_per_shard * sizeof(uint64_t); + + // check that each chunked data file is the expected size + const uint32_t bytes_per_chunk = + chunk_shape[0].get() * chunk_shape[1].get() * + chunk_shape[2].get() * chunk_shape[3].get(); + for (auto t = 0; t < std::ceil(max_frame_count / expected_frames_per_chunk); + ++t) { + fs::path path = test_path / "data" / "root" / "0" / + ("c" + std::to_string(t)) / "0" / "0" / "0"; + + CHECK(fs::is_regular_file(path)); + + auto file_size = fs::file_size(path); + + ASSERT_EQ( + int, "%d", chunks_per_shard* bytes_per_chunk + index_size, file_size); + } +} + +void +teardown(AcquireRuntime* runtime) +{ + LOG("Done (OK)"); + acquire_shutdown(runtime); +} + +int +main() +{ + auto runtime = acquire_init(reporter); + + setup(runtime); + acquire(runtime); + validate(runtime); + teardown(runtime); + + return 0; +}