From 175f45b1ae06cc4690baceca709115cbc4b7ea47 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Tue, 7 May 2024 10:14:48 -0400 Subject: [PATCH] Migrate filesystem writers to `Sink` concept (#240) ### Changed - Calling `acquire_get_configuration()` with a Zarr storage device now returns a URI of the storage device, with file:// scheme indicator and absolute path, assuming localhost. --- CHANGELOG.md | 7 + acquire-common | 2 +- src/CMakeLists.txt | 3 + src/writers/file.sink.cpp | 419 ++++++++++++++++++++++++++++++++++ src/writers/file.sink.hh | 63 +++++ src/writers/sink.hh | 54 +++++ src/writers/writer.cpp | 345 +--------------------------- src/writers/writer.hh | 49 +--- src/writers/zarrv2.writer.cpp | 25 +- src/writers/zarrv3.writer.cpp | 34 +-- src/zarr.cpp | 71 ++++-- src/zarr.hh | 28 ++- src/zarr.v2.cpp | 155 +++++++------ src/zarr.v2.hh | 12 +- src/zarr.v3.cpp | 132 ++++++----- src/zarr.v3.hh | 12 +- tests/get.cpp | 9 +- tests/repeat-start.cpp | 4 +- tests/unit-tests.cpp | 4 +- 19 files changed, 844 insertions(+), 584 deletions(-) create mode 100644 src/writers/file.sink.cpp create mode 100644 src/writers/file.sink.hh create mode 100644 src/writers/sink.hh diff --git a/CHANGELOG.md b/CHANGELOG.md index 89315ce4..5af8d99c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Changed + +- Calling `acquire_get_configuration` with a Zarr storage device now returns a URI of the storage device, with file:// + scheme indicator and absolute path, assuming localhost. + ## [0.1.11](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.10..v0.1.11) - 2024-04-22 ### Fixed diff --git a/acquire-common b/acquire-common index 4444f5c9..85a5d90a 160000 --- a/acquire-common +++ b/acquire-common @@ -1 +1 @@ -Subproject commit 4444f5c95f699e22b47e9a6e321359016faa03e6 +Subproject commit 85a5d90a857ce5b272d77dc09553e653cb709a5b diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4f2faf83..98a55e53 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,6 +8,9 @@ set(tgt acquire-driver-zarr) add_library(${tgt} MODULE common.hh common.cpp + writers/sink.hh + writers/file.sink.hh + writers/file.sink.cpp writers/writer.hh writers/writer.cpp writers/zarrv2.writer.hh diff --git a/src/writers/file.sink.cpp b/src/writers/file.sink.cpp new file mode 100644 index 00000000..b222c197 --- /dev/null +++ b/src/writers/file.sink.cpp @@ -0,0 +1,419 @@ +#include "file.sink.hh" + +#include + +namespace zarr = acquire::sink::zarr; + +template<> +zarr::Sink* +zarr::sink_open(const std::string& uri) +{ + return (Sink*)new FileSink(uri); +} + +template<> +void +zarr::sink_close(Sink* sink_) +{ + if (!sink_) { + return; + } + + auto* sink = static_cast(sink_); + file_close(sink->file_); + sink->file_ = nullptr; + delete sink; +} + +zarr::FileSink::FileSink(const std::string& uri) + : file_{ new struct file } +{ + CHECK(file_create(file_, uri.c_str(), uri.size() + 1)); +} + +zarr::FileSink::~FileSink() +{ + if (file_) { + file_close(file_); + file_ = nullptr; + } +} + +bool +zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) +{ + if (!file_) { + return false; + } + + return file_write(file_, offset, buf, buf + bytes_of_buf); +} + +zarr::FileCreator::FileCreator(std::shared_ptr thread_pool) + : thread_pool_(thread_pool) +{ +} + +bool +zarr::FileCreator::create_chunk_sinks(const std::string& base_uri, + const std::vector& dimensions, + std::vector& chunk_sinks) +{ + const std::string base_dir = + base_uri.starts_with("file://") ? base_uri.substr(7) : base_uri; + + std::queue paths; + paths.push(base_dir); + + if (!make_dirs_(paths)) { + return false; + } + + // create directories + for (auto i = dimensions.size() - 2; i >= 1; --i) { + const auto& dim = dimensions.at(i); + const auto n_chunks = common::chunks_along_dimension(dim); + + auto n_paths = paths.size(); + for (auto j = 0; j < n_paths; ++j) { + const auto path = paths.front(); + paths.pop(); + + for (auto k = 0; k < n_chunks; ++k) { + paths.push(path / std::to_string(k)); + } + } + + if (!make_dirs_(paths)) { + return false; + } + } + + // create files + { + const auto& dim = dimensions.front(); + const auto n_chunks = common::chunks_along_dimension(dim); + + auto n_paths = paths.size(); + for (auto i = 0; i < n_paths; ++i) { + const auto path = paths.front(); + paths.pop(); + for (auto j = 0; j < n_chunks; ++j) { + paths.push(path / std::to_string(j)); + } + } + } + + return make_files_(paths, chunk_sinks); +} + +bool +zarr::FileCreator::create_shard_sinks(const std::string& base_uri, + const std::vector& dimensions, + std::vector& shard_sinks) +{ + const std::string base_dir = + base_uri.starts_with("file://") ? base_uri.substr(7) : base_uri; + + std::queue paths; + paths.push(base_dir); + + if (!make_dirs_(paths)) { + return false; + } + + // create directories + for (auto i = dimensions.size() - 2; i >= 1; --i) { + const auto& dim = dimensions.at(i); + const auto n_shards = common::shards_along_dimension(dim); + CHECK(n_shards); + + auto n_paths = paths.size(); + for (auto j = 0; j < n_paths; ++j) { + const auto path = paths.front(); + paths.pop(); + + for (auto k = 0; k < n_shards; ++k) { + paths.push(path / std::to_string(k)); + } + } + + if (!make_dirs_(paths)) { + return false; + } + } + + // create files + { + const auto& dim = dimensions.front(); + const auto n_shards = common::shards_along_dimension(dim); + CHECK(n_shards); + + auto n_paths = paths.size(); + for (auto i = 0; i < n_paths; ++i) { + const auto path = paths.front(); + paths.pop(); + for (auto j = 0; j < n_shards; ++j) { + paths.push(path / std::to_string(j)); + } + } + } + + return make_files_(paths, shard_sinks); +} + +bool +zarr::FileCreator::create_metadata_sinks(const std::vector& paths, + std::vector& metadata_sinks) +{ + if (paths.empty()) { + return true; + } + + std::queue file_paths; + for (const auto& path : paths) { + fs::path p = path; + fs::create_directories(p.parent_path()); + file_paths.push(p); + } + + return make_files_(file_paths, metadata_sinks); +} + +bool +zarr::FileCreator::make_dirs_(std::queue& dir_paths) +{ + if (dir_paths.empty()) { + return true; + } + + std::atomic all_successful = true; + + const auto n_dirs = dir_paths.size(); + std::latch latch(n_dirs); + + for (auto i = 0; i < n_dirs; ++i) { + const auto dirname = dir_paths.front(); + dir_paths.pop(); + + thread_pool_->push_to_job_queue( + [dirname, &latch, &all_successful](std::string& err) -> bool { + bool success = false; + + try { + if (fs::exists(dirname)) { + EXPECT(fs::is_directory(dirname), + "'%s' exists but is not a directory", + dirname.c_str()); + } else if (all_successful) { + std::error_code ec; + EXPECT(fs::create_directories(dirname, ec), + "%s", + ec.message().c_str()); + } + success = true; + } catch (const std::exception& exc) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create directory '%s': %s.", + dirname.string().c_str(), + exc.what()); + err = buf; + } catch (...) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create directory '%s': (unknown).", + dirname.string().c_str()); + err = buf; + } + + latch.count_down(); + all_successful = all_successful && success; + + return success; + }); + + dir_paths.push(dirname); + } + + latch.wait(); + + return all_successful; +} + +bool +zarr::FileCreator::make_files_(std::queue& file_paths, + std::vector& files) +{ + if (file_paths.empty()) { + return true; + } + + std::atomic all_successful = true; + + const auto n_files = file_paths.size(); + files.resize(n_files); + std::fill(files.begin(), files.end(), nullptr); + std::latch latch(n_files); + + for (auto i = 0; i < n_files; ++i) { + const auto filename = file_paths.front(); + file_paths.pop(); + + Sink** psink = files.data() + i; + + thread_pool_->push_to_job_queue( + [filename, psink, &latch, &all_successful](std::string& err) -> bool { + bool success = false; + + try { + if (all_successful) { + *psink = + (FileSink*)sink_open(filename.string()); + } + success = true; + } catch (const std::exception& exc) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create file '%s': %s.", + filename.string().c_str(), + exc.what()); + err = buf; + } catch (...) { + char buf[128]; + snprintf(buf, + sizeof(buf), + "Failed to create file '%s': (unknown).", + filename.string().c_str()); + err = buf; + } + + latch.count_down(); + all_successful = all_successful && success; + + return success; + }); + } + + latch.wait(); + + return all_successful; +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif + +namespace common = zarr::common; + +extern "C" +{ + acquire_export int unit_test__file_creator__create_chunk_sinks() + { + const fs::path base_dir = fs::temp_directory_path() / "acquire"; + int retval = 0; + + try { + auto thread_pool = std::make_shared( + std::thread::hardware_concurrency(), + [](const std::string& err) { LOGE("Error: %s\n", err.c_str()); }); + zarr::FileCreator file_creator{ thread_pool }; + + std::vector dims; + dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks + dims.emplace_back("y", DimensionType_Space, 4, 2, 0); // 2 chunks + dims.emplace_back( + "z", DimensionType_Space, 0, 3, 0); // 3 timepoints per chunk + + std::vector files; + CHECK( + file_creator.create_chunk_sinks(base_dir.string(), dims, files)); + + CHECK(files.size() == 5 * 2); + std::for_each(files.begin(), files.end(), [](zarr::Sink* f) { + sink_close(f); + }); + + CHECK(fs::is_directory(base_dir)); + for (auto y = 0; y < 2; ++y) { + CHECK(fs::is_directory(base_dir / std::to_string(y))); + for (auto x = 0; x < 5; ++x) { + CHECK(fs::is_regular_file(base_dir / std::to_string(y) / + std::to_string(x))); + } + } + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + // cleanup + if (fs::exists(base_dir)) { + fs::remove_all(base_dir); + } + return retval; + } + + acquire_export int unit_test__file_creator__create_shard_sinks() + { + const fs::path base_dir = fs::temp_directory_path() / "acquire"; + int retval = 0; + + try { + auto thread_pool = std::make_shared( + std::thread::hardware_concurrency(), + [](const std::string& err) { LOGE("Error: %s", err.c_str()); }); + zarr::FileCreator file_creator{ thread_pool }; + + std::vector dims; + dims.emplace_back( + "x", DimensionType_Space, 10, 2, 5); // 5 chunks, 1 shard + dims.emplace_back( + "y", DimensionType_Space, 4, 2, 1); // 2 chunks, 2 shards + dims.emplace_back( + "z", DimensionType_Space, 8, 2, 2); // 4 chunks, 2 shards + + std::vector files; + CHECK( + file_creator.create_shard_sinks(base_dir.string(), dims, files)); + + CHECK(files.size() == 2); + std::for_each(files.begin(), files.end(), [](zarr::Sink* f) { + sink_close(f); + }); + + CHECK(fs::is_directory(base_dir)); + for (auto y = 0; y < 2; ++y) { + CHECK(fs::is_directory(base_dir / std::to_string(y))); + for (auto x = 0; x < 1; ++x) { + CHECK(fs::is_regular_file(base_dir / std::to_string(y) / + std::to_string(x))); + } + } + + // cleanup + fs::remove_all(base_dir); + + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + // cleanup + if (fs::exists(base_dir)) { + fs::remove_all(base_dir); + } + return retval; + } +} // extern "C" +#endif // NO_UNIT_TESTS \ No newline at end of file diff --git a/src/writers/file.sink.hh b/src/writers/file.sink.hh new file mode 100644 index 00000000..4be362f6 --- /dev/null +++ b/src/writers/file.sink.hh @@ -0,0 +1,63 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 +#define H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 + +#include "sink.hh" +#include "platform.h" + +#include + +namespace fs = std::filesystem; + +namespace acquire::sink::zarr { +struct FileSink : public Sink +{ + explicit FileSink(const std::string& uri); + ~FileSink() override; + + [[nodiscard]] bool write(size_t offset, + const uint8_t* buf, + size_t bytes_of_buf) override; + + struct file* file_; +}; + +struct FileCreator +{ + public: + FileCreator() = delete; + explicit FileCreator(std::shared_ptr thread_pool); + ~FileCreator() noexcept = default; + + [[nodiscard]] bool create_chunk_sinks( + const std::string& base_uri, + const std::vector& dimensions, + std::vector& chunk_sinks); + + [[nodiscard]] bool create_shard_sinks( + const std::string& base_uri, + const std::vector& dimensions, + std::vector& shard_sinks); + + [[nodiscard]] bool create_metadata_sinks( + const std::vector& paths, + std::vector& metadata_sinks); + + private: + std::shared_ptr thread_pool_; + + /// @brief Parallel create a collection of directories. + /// @param[in] dir_paths The directories to create. + /// @return True iff all directories were created successfully. + [[nodiscard]] bool make_dirs_(std::queue& dir_paths); + + /// @brief Parallel create a collection of files. + /// @param[in,out] file_paths The files to create. Unlike `make_dirs_`, + /// this function drains the queue. + /// @param[out] files The files created. + /// @return True iff all files were created successfully. + [[nodiscard]] bool make_files_(std::queue& file_paths, + std::vector& files); +}; +} // namespace acquire::sink::zarr + +#endif // H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 diff --git a/src/writers/sink.hh b/src/writers/sink.hh new file mode 100644 index 00000000..f48b8524 --- /dev/null +++ b/src/writers/sink.hh @@ -0,0 +1,54 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_SINK_V0 +#define H_ACQUIRE_STORAGE_ZARR_SINK_V0 + +#include +#include +#include +#include + +#include "../common.hh" + +namespace acquire::sink::zarr { +struct Sink +{ + virtual ~Sink() noexcept = default; + + virtual bool write(size_t offset, + const uint8_t* buf, + size_t bytes_of_buf) = 0; +}; + +template +concept SinkType = + requires(SinkT sink, size_t offset, const uint8_t* buf, size_t bytes_of_buf) { + { sink.write(offset, buf, bytes_of_buf) } -> std::convertible_to; + }; + +template +concept SinkCreator = requires(SinkCreatorT sink_creator, + const std::string& base_uri, + std::vector dimensions, + const std::vector& paths, + std::vector& sinks) { + { + sink_creator.create_chunk_sinks(base_uri, dimensions, sinks) + } -> std::convertible_to; + { + sink_creator.create_shard_sinks(base_uri, dimensions, sinks) + } -> std::convertible_to; + { + sink_creator.create_metadata_sinks(paths, sinks) + } -> std::convertible_to; +}; + +template +Sink* +sink_open(const std::string& uri); + +template +void +sink_close(Sink* sink); + +} // namespace acquire::sink::zarr + +#endif // H_ACQUIRE_STORAGE_ZARR_SINK_V0 diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index f8f06827..c9f137d9 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -103,240 +103,6 @@ chunk_internal_offset(size_t frame_id, } } // end ::{anonymous} namespace -/// FileCreator -zarr::FileCreator::FileCreator(std::shared_ptr thread_pool) - : thread_pool_{ thread_pool } -{ - EXPECT(thread_pool_, "Thread pool must not be null."); -} - -bool -zarr::FileCreator::create_chunk_files(const fs::path& base_dir, - const std::vector& dimensions, - std::vector& files) -{ - std::queue paths; - paths.push(base_dir); - - if (!make_dirs_(paths)) { - return false; - } - - // create directories - for (auto i = dimensions.size() - 2; i >= 1; --i) { - const auto& dim = dimensions.at(i); - const auto n_chunks = common::chunks_along_dimension(dim); - - auto n_paths = paths.size(); - for (auto j = 0; j < n_paths; ++j) { - const auto path = paths.front(); - paths.pop(); - - for (auto k = 0; k < n_chunks; ++k) { - paths.push(path / std::to_string(k)); - } - } - - if (!make_dirs_(paths)) { - return false; - } - } - - // create files - { - const auto& dim = dimensions.front(); - const auto n_chunks = common::chunks_along_dimension(dim); - - auto n_paths = paths.size(); - for (auto i = 0; i < n_paths; ++i) { - const auto path = paths.front(); - paths.pop(); - for (auto j = 0; j < n_chunks; ++j) { - paths.push(path / std::to_string(j)); - } - } - } - - return make_files_(paths, files); -} - -bool -zarr::FileCreator::create_shard_files(const fs::path& base_dir, - const std::vector& dimensions, - std::vector& files) -{ - std::queue paths; - paths.push(base_dir); - - if (!make_dirs_(paths)) { - return false; - } - - // create directories - for (auto i = dimensions.size() - 2; i >= 1; --i) { - const auto& dim = dimensions.at(i); - const auto n_shards = common::shards_along_dimension(dim); - CHECK(n_shards); - - auto n_paths = paths.size(); - for (auto j = 0; j < n_paths; ++j) { - const auto path = paths.front(); - paths.pop(); - - for (auto k = 0; k < n_shards; ++k) { - paths.push(path / std::to_string(k)); - } - } - - if (!make_dirs_(paths)) { - return false; - } - } - - // create files - { - const auto& dim = dimensions.front(); - const auto n_shards = common::shards_along_dimension(dim); - CHECK(n_shards); - - auto n_paths = paths.size(); - for (auto i = 0; i < n_paths; ++i) { - const auto path = paths.front(); - paths.pop(); - for (auto j = 0; j < n_shards; ++j) { - paths.push(path / std::to_string(j)); - } - } - } - - return make_files_(paths, files); -} - -bool -zarr::FileCreator::make_dirs_(std::queue& dir_paths) -{ - if (dir_paths.empty()) { - return true; - } - - std::atomic all_successful = true; - - const auto n_dirs = dir_paths.size(); - std::latch latch(n_dirs); - - for (auto i = 0; i < n_dirs; ++i) { - const auto dirname = dir_paths.front(); - dir_paths.pop(); - - thread_pool_->push_to_job_queue( - [dirname, &latch, &all_successful](std::string& err) -> bool { - bool success = false; - - try { - if (fs::exists(dirname)) { - EXPECT(fs::is_directory(dirname), - "'%s' exists but is not a directory", - dirname.c_str()); - } else if (all_successful) { - std::error_code ec; - EXPECT(fs::create_directories(dirname, ec), - "%s", - ec.message().c_str()); - } - success = true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create directory '%s': %s.", - dirname.string().c_str(), - exc.what()); - err = buf; - } catch (...) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create directory '%s': (unknown).", - dirname.string().c_str()); - err = buf; - } - - latch.count_down(); - all_successful = all_successful && success; - - return success; - }); - - dir_paths.push(dirname); - } - - latch.wait(); - - return all_successful; -} - -bool -zarr::FileCreator::make_files_(std::queue& file_paths, - std::vector& files) -{ - if (file_paths.empty()) { - return true; - } - - std::atomic all_successful = true; - - const auto n_files = file_paths.size(); - files.resize(n_files); - std::latch latch(n_files); - - for (auto i = 0; i < n_files; ++i) { - const auto filename = file_paths.front(); - file_paths.pop(); - - struct file* pfile = files.data() + i; - - thread_pool_->push_to_job_queue( - [filename, pfile, &latch, &all_successful](std::string& err) -> bool { - bool success = false; - - try { - if (all_successful) { - EXPECT(file_create(pfile, - filename.string().c_str(), - filename.string().length()), - "Failed to open file: '%s'", - filename.string().c_str()); - } - success = true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create file '%s': %s.", - filename.string().c_str(), - exc.what()); - err = buf; - } catch (...) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create file '%s': (unknown).", - filename.string().c_str()); - err = buf; - } - - latch.count_down(); - all_successful = all_successful && success; - - return success; - }); - } - - latch.wait(); - - return all_successful; -} - bool zarr::downsample(const ArrayConfig& config, ArrayConfig& downsampled_config) { @@ -414,13 +180,12 @@ zarr::Writer::Writer(const ArrayConfig& config, std::shared_ptr thread_pool) : config_{ config } , thread_pool_{ thread_pool } - , file_creator_{ thread_pool } , bytes_to_flush_{ 0 } , frames_written_{ 0 } , append_chunk_index_{ 0 } , is_finalizing_{ false } { - data_root_ = fs::path(config_.data_root); + data_root_ = config_.data_root; } bool @@ -672,10 +437,12 @@ zarr::Writer::flush_() void zarr::Writer::close_files_() { - for (auto& file : files_) { - file_close(&file); + for (Sink* sink_ : sinks_) { + if (auto* sink = dynamic_cast(sink_)) { + sink_close(sink_); + } } - files_.clear(); + sinks_.clear(); } void @@ -985,106 +752,6 @@ extern "C" return retval; } - acquire_export int unit_test__file_creator__create_chunk_files() - { - const auto base_dir = fs::temp_directory_path() / "acquire"; - int retval = 0; - - try { - auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), - [](const std::string& err) { LOGE("Error: %s\n", err.c_str()); }); - zarr::FileCreator file_creator{ thread_pool }; - - std::vector dims; - dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks - dims.emplace_back("y", DimensionType_Space, 4, 2, 0); // 2 chunks - dims.emplace_back( - "z", DimensionType_Space, 0, 3, 0); // 3 timepoints per chunk - - std::vector files; - CHECK(file_creator.create_chunk_files(base_dir, dims, files)); - - CHECK(files.size() == 5 * 2); - std::for_each(files.begin(), files.end(), [](const struct file& f) { - file_close(const_cast(&f)); - }); - - CHECK(fs::is_directory(base_dir)); - for (auto y = 0; y < 2; ++y) { - CHECK(fs::is_directory(base_dir / std::to_string(y))); - for (auto x = 0; x < 5; ++x) { - CHECK(fs::is_regular_file(base_dir / std::to_string(y) / - std::to_string(x))); - } - } - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - // cleanup - if (fs::exists(base_dir)) { - fs::remove_all(base_dir); - } - return retval; - } - - acquire_export int unit_test__file_creator__create_shard_files() - { - const auto base_dir = fs::temp_directory_path() / "acquire"; - int retval = 0; - - try { - auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), - [](const std::string& err) { LOGE("Error: %s", err.c_str()); }); - zarr::FileCreator file_creator{ thread_pool }; - - std::vector dims; - dims.emplace_back( - "x", DimensionType_Space, 10, 2, 5); // 5 chunks, 1 shard - dims.emplace_back( - "y", DimensionType_Space, 4, 2, 1); // 2 chunks, 2 shards - dims.emplace_back( - "z", DimensionType_Space, 8, 2, 2); // 4 chunks, 2 shards - - std::vector files; - CHECK(file_creator.create_shard_files(base_dir, dims, files)); - - CHECK(files.size() == 2); - std::for_each(files.begin(), files.end(), [](const struct file& f) { - file_close(const_cast(&f)); - }); - - CHECK(fs::is_directory(base_dir)); - for (auto y = 0; y < 2; ++y) { - CHECK(fs::is_directory(base_dir / std::to_string(y))); - for (auto x = 0; x < 1; ++x) { - CHECK(fs::is_regular_file(base_dir / std::to_string(y) / - std::to_string(x))); - } - } - - // cleanup - fs::remove_all(base_dir); - - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - // cleanup - if (fs::exists(base_dir)) { - fs::remove_all(base_dir); - } - return retval; - } - acquire_export int unit_test__writer__write_frame_to_chunks() { const auto base_dir = fs::temp_directory_path() / "acquire"; diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 030645ed..d71a8e3e 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -10,6 +10,7 @@ #include "../common.hh" #include "blosc.compressor.hh" +#include "file.sink.hh" #include #include @@ -19,49 +20,6 @@ namespace fs = std::filesystem; namespace acquire::sink::zarr { struct Zarr; -struct FileCreator -{ - FileCreator() = delete; - explicit FileCreator(std::shared_ptr thread_pool); - ~FileCreator() noexcept = default; - - /// @brief Create the directory structure for a Zarr v2 dataset. - /// @param[in] base_dir The root directory for the dataset. - /// @param[in] dimensions The dimensions of the dataset. - /// @param[out] files The chunk files created. - /// @return True iff the directory structure was created successfully. - [[nodiscard]] bool create_chunk_files( - const fs::path& base_dir, - const std::vector& dimensions, - std::vector& files); - - /// @brief Create the directory structure for a Zarr v3 dataset. - /// @param[in] base_dir The root directory for the dataset. - /// @param[in] dimensions The dimensions of the dataset. - /// @param[out] files The shard files created. - /// @return True iff the directory structure was created successfully. - [[nodiscard]] bool create_shard_files( - const fs::path& base_dir, - const std::vector& dimensions, - std::vector& files); - - private: - std::shared_ptr thread_pool_; - - /// @brief Parallel create a collection of directories. - /// @param[in] dir_paths The directories to create. - /// @return True iff all directories were created successfully. - [[nodiscard]] bool make_dirs_(std::queue& dir_paths); - - /// @brief Parallel create a collection of files. - /// @param[in,out] file_paths The files to create. Unlike `make_dirs_`, - /// this function drains the queue. - /// @param[out] files The files created. - /// @return True iff all files were created successfully. - [[nodiscard]] bool make_files_(std::queue& file_paths, - std::vector& files); -}; - struct ArrayConfig { ImageShape image_shape; @@ -103,9 +61,8 @@ struct Writer std::vector> chunk_buffers_; /// Filesystem - FileCreator file_creator_; - fs::path data_root_; - std::vector files_; + std::string data_root_; + std::vector sinks_; /// Multithreading std::shared_ptr thread_pool_; diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp index 7dc7f8ef..f40d15e1 100644 --- a/src/writers/zarrv2.writer.cpp +++ b/src/writers/zarrv2.writer.cpp @@ -18,28 +18,33 @@ bool zarr::ZarrV2Writer::flush_impl_() { // create chunk files - CHECK(files_.empty()); - if (!file_creator_.create_chunk_files(data_root_ / - std::to_string(append_chunk_index_), - config_.dimensions, - files_)) { - return false; + CHECK(sinks_.empty()); + const std::string data_root = + (fs::path(data_root_) / std::to_string(append_chunk_index_)).string(); + + { + FileCreator file_creator(thread_pool_); + if (!file_creator.create_chunk_sinks( + data_root, config_.dimensions, sinks_)) { + return false; + } } - CHECK(files_.size() == chunk_buffers_.size()); + + CHECK(sinks_.size() == chunk_buffers_.size()); std::latch latch(chunk_buffers_.size()); { std::scoped_lock lock(buffers_mutex_); - for (auto i = 0; i < files_.size(); ++i) { + for (auto i = 0; i < sinks_.size(); ++i) { auto& chunk = chunk_buffers_.at(i); thread_pool_->push_to_job_queue( - std::move([fh = &files_.at(i), + std::move([sink = sinks_.at(i), data = chunk.data(), size = chunk.size(), &latch](std::string& err) -> bool { bool success = false; try { - CHECK(file_write(fh, 0, data, data + size)); + CHECK(sink->write(0, data, size)); success = true; } catch (const std::exception& exc) { char buf[128]; diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp index 6ca4664a..44fc3f90 100644 --- a/src/writers/zarrv3.writer.cpp +++ b/src/writers/zarrv3.writer.cpp @@ -119,15 +119,20 @@ bool zarr::ZarrV3Writer::flush_impl_() { // create shard files if they don't exist - if (files_.empty() && - !file_creator_.create_shard_files( - data_root_ / ("c" + std::to_string(append_chunk_index_)), - config_.dimensions, - files_)) { - return false; + const std::string data_root = + (fs::path(data_root_) / ("c" + std::to_string(append_chunk_index_))) + .string(); + + { + FileCreator file_creator(thread_pool_); + if (sinks_.empty() && !file_creator.create_shard_sinks( + data_root, config_.dimensions, sinks_)) { + return false; + } } + const auto n_shards = common::number_of_shards(config_.dimensions); - CHECK(files_.size() == n_shards); + CHECK(sinks_.size() == n_shards); // get shard indices for each chunk std::vector> chunk_in_shards(n_shards); @@ -144,7 +149,7 @@ zarr::ZarrV3Writer::flush_impl_() auto& chunk_table = shard_tables_.at(i); size_t* file_offset = &shard_file_offsets_.at(i); - thread_pool_->push_to_job_queue([fh = &files_.at(i), + thread_pool_->push_to_job_queue([sink = sinks_.at(i), &chunks, &chunk_table, file_offset, @@ -157,10 +162,8 @@ zarr::ZarrV3Writer::flush_impl_() for (const auto& chunk_idx : chunks) { auto& chunk = chunk_buffers_.at(chunk_idx); - success = file_write(fh, - *file_offset, - chunk.data(), - chunk.data() + chunk.size()); + success = + sink->write(*file_offset, chunk.data(), chunk.size()); if (!success) { break; } @@ -177,10 +180,9 @@ zarr::ZarrV3Writer::flush_impl_() const auto* table = reinterpret_cast(chunk_table.data()); success = - file_write(fh, - *file_offset, - table, - table + chunk_table.size() * sizeof(uint64_t)); + sink->write(*file_offset, + table, + chunk_table.size() * sizeof(uint64_t)); } } catch (const std::exception& exc) { char buf[128]; diff --git a/src/zarr.cpp b/src/zarr.cpp index 57cb7d86..4853fecc 100644 --- a/src/zarr.cpp +++ b/src/zarr.cpp @@ -16,8 +16,15 @@ namespace { fs::path as_path(const StorageProperties& props) { - return { props.filename.str, - props.filename.str + props.filename.nbytes - 1 }; + if (!props.uri.str) { + return {}; + } + + const size_t offset = + strlen(props.uri.str) > 7 && strcmp(props.uri.str, "file://") == 0 ? 7 + : 0; + return { props.uri.str + offset, + props.uri.str + offset + props.uri.nbytes - (offset + 1) }; } /// \brief Check that the JSON string is valid. (Valid can mean empty.) @@ -47,18 +54,20 @@ validate_json(const char* str, size_t nbytes) void validate_props(const StorageProperties* props) { - EXPECT(props->filename.str, "Filename string is NULL."); - EXPECT(props->filename.nbytes, "Filename string is zero size."); + EXPECT(props->uri.str, "URI string is NULL."); + EXPECT(props->uri.nbytes, "URI string is zero size."); // check that JSON is correct (throw std::exception if not) validate_json(props->external_metadata_json.str, props->external_metadata_json.nbytes); - // check that the filename value points to a writable directory - { + std::string uri{ props->uri.str, props->uri.nbytes - 1 }; + EXPECT(!uri.starts_with("s3://"), "S3 URIs are not yet supported."); - auto path = as_path(*props); - auto parent_path = path.parent_path().string(); + // check that the URI value points to a writable directory + { + const fs::path path = as_path(*props); + fs::path parent_path = path.parent_path().string(); if (parent_path.empty()) parent_path = "."; @@ -335,6 +344,8 @@ zarr::Zarr::set(const StorageProperties* props) // checks the directory exists and is writable validate_props(props); + // TODO (aliddell): we will eventually support S3 URIs, + // dataset_root_ should be a string dataset_root_ = as_path(*props); if (props->external_metadata_json.str) { @@ -362,9 +373,13 @@ zarr::Zarr::get(StorageProperties* props) const storage_properties_destroy(props); const std::string dataset_root = dataset_root_.string(); - const char* filename = - dataset_root.empty() ? nullptr : dataset_root.c_str(); - const size_t bytes_of_filename = filename ? dataset_root.size() + 1 : 0; + + std::string uri; + if (!dataset_root_.empty()) { + fs::path dataset_root_abs = fs::absolute(dataset_root_); + uri = "file://" + dataset_root_abs.string(); + } + const size_t bytes_of_filename = uri.empty() ? 0 : uri.size() + 1; const char* metadata = external_metadata_json_.empty() ? nullptr @@ -374,7 +389,7 @@ zarr::Zarr::get(StorageProperties* props) const CHECK(storage_properties_init(props, 0, - filename, + uri.c_str(), bytes_of_filename, metadata, bytes_of_metadata, @@ -420,17 +435,18 @@ zarr::Zarr::start() } fs::create_directories(dataset_root_); - write_base_metadata_(); - write_group_metadata_(); - write_all_array_metadata_(); - write_external_metadata_(); - thread_pool_ = std::make_shared( std::thread::hardware_concurrency(), [this](const std::string& err) { this->set_error(err); }); allocate_writers_(); + if (!dataset_root_.string().starts_with("s3://")) { + make_metadata_sinks_(); + } + + write_fixed_metadata_(); + state = DeviceState_Running; error_ = false; } @@ -445,8 +461,14 @@ zarr::Zarr::stop() noexcept is_ok = 0; try { - write_all_array_metadata_(); // must precede close of chunk file - write_group_metadata_(); + // must precede close of chunk file + write_mutable_metadata_(); + for (Sink* sink_ : metadata_sinks_) { + if (auto* sink = dynamic_cast(sink_)) { + sink_close(sink); + } + } + metadata_sinks_.clear(); for (auto& writer : writers_) { writer->finalize(); @@ -545,7 +567,6 @@ zarr::Zarr::Zarr() } , thread_pool_{ nullptr } , pixel_scale_um_{ 1, 1 } - , planes_per_chunk_{ 0 } , enable_multiscale_{ false } , error_{ false } { @@ -587,8 +608,16 @@ zarr::Zarr::set_error(const std::string& msg) noexcept } void -zarr::Zarr::write_all_array_metadata_() const +zarr::Zarr::write_fixed_metadata_() const { + write_base_metadata_(); + write_external_metadata_(); +} + +void +zarr::Zarr::write_mutable_metadata_() const +{ + write_group_metadata_(); for (auto i = 0; i < writers_.size(); ++i) { write_array_metadata_(i); } diff --git a/src/zarr.hh b/src/zarr.hh index 1d35c3bb..0ba50556 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -49,7 +49,6 @@ struct Zarr : public Storage fs::path dataset_root_; std::string external_metadata_json_; PixelScale pixel_scale_um_; - uint32_t planes_per_chunk_; bool enable_multiscale_; /// changes on reserve_image_shape @@ -61,6 +60,9 @@ struct Zarr : public Storage // scaled frames, keyed by level-of-detail std::unordered_map> scaled_frames_; + // changes on flush + std::vector metadata_sinks_; + /// Multithreading std::shared_ptr thread_pool_; mutable std::mutex mutex_; // for error_ / error_msg_ @@ -74,14 +76,26 @@ struct Zarr : public Storage virtual void allocate_writers_() = 0; /// Metadata - void write_all_array_metadata_() const; - virtual void write_array_metadata_(size_t level) const = 0; - virtual void write_external_metadata_() const = 0; + virtual std::vector make_metadata_sink_paths_() = 0; + + template + void make_metadata_sinks_() + { + const auto metadata_sink_paths = make_metadata_sink_paths_(); + SinkCreatorT creator(thread_pool_); + CHECK( + creator.create_metadata_sinks(metadata_sink_paths, metadata_sinks_)); + } + + // fixed metadata + void write_fixed_metadata_() const; virtual void write_base_metadata_() const = 0; - virtual void write_group_metadata_() const = 0; + virtual void write_external_metadata_() const = 0; - /// Filesystem - virtual fs::path get_data_directory_() const = 0; + // mutable metadata, changes on flush + void write_mutable_metadata_() const; + virtual void write_group_metadata_() const = 0; + virtual void write_array_metadata_(size_t level) const = 0; /// Multiscale void write_multiscale_frames_(const VideoFrame* frame); diff --git a/src/zarr.v2.cpp b/src/zarr.v2.cpp index baf61545..3e46c6fe 100644 --- a/src/zarr.v2.cpp +++ b/src/zarr.v2.cpp @@ -67,79 +67,54 @@ zarr::ZarrV2::allocate_writers_() } } -void -zarr::ZarrV2::write_array_metadata_(size_t level) const +std::vector +zarr::ZarrV2::make_metadata_sink_paths_() { - namespace fs = std::filesystem; - using json = nlohmann::json; - - CHECK(level < writers_.size()); - const auto& writer = writers_.at(level); - - const ArrayConfig& config = writer->config(); - const auto& image_shape = config.image_shape; - - std::vector array_shape; - array_shape.push_back(writer->frames_written()); - for (auto dim = config.dimensions.rbegin() + 1; - dim != config.dimensions.rend(); - ++dim) { - array_shape.push_back(dim->array_size_px); - } - - std::vector chunk_shape; - for (auto dim = config.dimensions.rbegin(); dim != config.dimensions.rend(); - ++dim) { - chunk_shape.push_back(dim->chunk_size_px); - } - - json metadata; - metadata["zarr_format"] = 2; - metadata["shape"] = array_shape; - metadata["chunks"] = chunk_shape; - metadata["dtype"] = common::sample_type_to_dtype(image_shape.type); - metadata["fill_value"] = 0; - metadata["order"] = "C"; - metadata["filters"] = nullptr; - metadata["dimension_separator"] = "/"; + std::vector metadata_sink_paths = { + (dataset_root_ / ".metadata").string(), // base metadata + (dataset_root_ / "0" / ".zattrs").string(), // external metadata + (dataset_root_ / ".zattrs").string(), // group metadata + }; - if (config.compression_params.has_value()) { - metadata["compressor"] = config.compression_params.value(); - } else { - metadata["compressor"] = nullptr; + for (auto i = 0; i < writers_.size(); ++i) { + metadata_sink_paths.push_back( + (dataset_root_ / std::to_string(i) / ".zarray").string()); } - std::string zarray_path = (fs::path(config.data_root) / ".zarray").string(); - common::write_string(zarray_path, metadata.dump()); + return metadata_sink_paths; } void -zarr::ZarrV2::write_external_metadata_() const +zarr::ZarrV2::write_base_metadata_() const { namespace fs = std::filesystem; using json = nlohmann::json; - std::string zattrs_path = (dataset_root_ / "0" / ".zattrs").string(); - std::string external_metadata = external_metadata_json_.empty() - ? "{}" - : json::parse(external_metadata_json_, - nullptr, // callback - true, // allow exceptions - true // ignore comments - ) - .dump(); - common::write_string(zattrs_path, external_metadata); + const json metadata = { { "zarr_format", 2 } }; + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(0); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } void -zarr::ZarrV2::write_base_metadata_() const +zarr::ZarrV2::write_external_metadata_() const { namespace fs = std::filesystem; using json = nlohmann::json; - const json zgroup = { { "zarr_format", 2 } }; - std::string zgroup_path = (dataset_root_ / ".zgroup").string(); - common::write_string(zgroup_path, zgroup.dump()); + std::string zattrs_path = (dataset_root_ / "0" / ".zattrs").string(); + std::string metadata_str = external_metadata_json_.empty() + ? "{}" + : json::parse(external_metadata_json_, + nullptr, // callback + true, // allow exceptions + true // ignore comments + ) + .dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(1); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } void @@ -148,11 +123,11 @@ zarr::ZarrV2::write_group_metadata_() const namespace fs = std::filesystem; using json = nlohmann::json; - json zgroup_attrs; - zgroup_attrs["multiscales"] = json::array({ json::object() }); - zgroup_attrs["multiscales"][0]["version"] = "0.4"; + json metadata; + metadata["multiscales"] = json::array({ json::object() }); + metadata["multiscales"][0]["version"] = "0.4"; - auto& axes = zgroup_attrs["multiscales"][0]["axes"]; + auto& axes = metadata["multiscales"][0]["axes"]; for (auto dim = acquisition_dimensions_.rbegin(); dim != acquisition_dimensions_.rend(); ++dim) { @@ -192,7 +167,7 @@ zarr::ZarrV2::write_group_metadata_() const scales.push_back(pixel_scale_um_.y); scales.push_back(pixel_scale_um_.x); - zgroup_attrs["multiscales"][0]["datasets"] = { + metadata["multiscales"][0]["datasets"] = { { { "path", "0" }, { "coordinateTransformations", @@ -214,7 +189,7 @@ zarr::ZarrV2::write_group_metadata_() const scales.push_back(std::pow(2, i) * pixel_scale_um_.y); // y scales.push_back(std::pow(2, i) * pixel_scale_um_.x); // x - zgroup_attrs["multiscales"][0]["datasets"].push_back({ + metadata["multiscales"][0]["datasets"].push_back({ { "path", std::to_string(i) }, { "coordinateTransformations", { @@ -227,8 +202,8 @@ zarr::ZarrV2::write_group_metadata_() const } // downsampling metadata - zgroup_attrs["multiscales"][0]["type"] = "local_mean"; - zgroup_attrs["multiscales"][0]["metadata"] = { + metadata["multiscales"][0]["type"] = "local_mean"; + metadata["multiscales"][0]["metadata"] = { { "description", "The fields in the metadata describe how to reproduce this " "multiscaling in scikit-image. The method and its parameters are " @@ -240,14 +215,58 @@ zarr::ZarrV2::write_group_metadata_() const }; } - std::string zattrs_path = (dataset_root_ / ".zattrs").string(); - common::write_string(zattrs_path, zgroup_attrs.dump(4)); + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(2); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } -fs::path -zarr::ZarrV2::get_data_directory_() const +void +zarr::ZarrV2::write_array_metadata_(size_t level) const { - return dataset_root_; + namespace fs = std::filesystem; + using json = nlohmann::json; + + CHECK(level < writers_.size()); + const auto& writer = writers_.at(level); + + const ArrayConfig& config = writer->config(); + const auto& image_shape = config.image_shape; + + std::vector array_shape; + array_shape.push_back(writer->frames_written()); + for (auto dim = config.dimensions.rbegin() + 1; + dim != config.dimensions.rend(); + ++dim) { + array_shape.push_back(dim->array_size_px); + } + + std::vector chunk_shape; + for (auto dim = config.dimensions.rbegin(); dim != config.dimensions.rend(); + ++dim) { + chunk_shape.push_back(dim->chunk_size_px); + } + + json metadata; + metadata["zarr_format"] = 2; + metadata["shape"] = array_shape; + metadata["chunks"] = chunk_shape; + metadata["dtype"] = common::sample_type_to_dtype(image_shape.type); + metadata["fill_value"] = 0; + metadata["order"] = "C"; + metadata["filters"] = nullptr; + metadata["dimension_separator"] = "/"; + + if (config.compression_params.has_value()) { + metadata["compressor"] = config.compression_params.value(); + } else { + metadata["compressor"] = nullptr; + } + + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(3 + level); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } extern "C" diff --git a/src/zarr.v2.hh b/src/zarr.v2.hh index d281b207..b07acfc7 100644 --- a/src/zarr.v2.hh +++ b/src/zarr.v2.hh @@ -19,13 +19,15 @@ struct ZarrV2 final : public Zarr void allocate_writers_() override; /// Metadata - void write_array_metadata_(size_t level) const override; - void write_external_metadata_() const override; + std::vector make_metadata_sink_paths_() override; + + // fixed metadata void write_base_metadata_() const override; - void write_group_metadata_() const override; + void write_external_metadata_() const override; - /// Filesystem - fs::path get_data_directory_() const override; + // mutable metadata, changes on flush + void write_group_metadata_() const override; + void write_array_metadata_(size_t level) const override; }; } // namespace acquire::sink::zarr diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp index 98815f25..f0322a0b 100644 --- a/src/zarr.v3.cpp +++ b/src/zarr.v3.cpp @@ -68,6 +68,75 @@ zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const meta->multiscale_is_supported = 0; } +std::vector +zarr::ZarrV3::make_metadata_sink_paths_() +{ + std::vector metadata_sink_paths; + metadata_sink_paths.push_back((dataset_root_ / "zarr.json").string()); + metadata_sink_paths.push_back( + (dataset_root_ / "meta" / "root.group.json").string()); + for (auto i = 0; i < writers_.size(); ++i) { + metadata_sink_paths.push_back((dataset_root_ / "meta" / "root" / + (std::to_string(i) + ".array.json")) + .string()); + } + + return metadata_sink_paths; +} + +/// @brief Write the metadata for the dataset. +void +zarr::ZarrV3::write_base_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["extensions"] = json::array(); + metadata["metadata_encoding"] = + "https://purl.org/zarr/spec/protocol/core/3.0"; + metadata["metadata_key_suffix"] = ".json"; + metadata["zarr_format"] = "https://purl.org/zarr/spec/protocol/core/3.0"; + + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(0); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); +} + +/// @brief Write the external metadata. +/// @details This is a no-op for ZarrV3. Instead, external metadata is +/// stored in the group metadata. +void +zarr::ZarrV3::write_external_metadata_() const +{ + // no-op +} + +/// @brief Write the metadata for the group. +/// @details Zarr v3 stores group metadata in +/// /meta/{group_name}.group.json. We will call the group "root". +void +zarr::ZarrV3::write_group_metadata_() const +{ + namespace fs = std::filesystem; + using json = nlohmann::json; + + json metadata; + metadata["attributes"]["acquire"] = + external_metadata_json_.empty() ? "" + : json::parse(external_metadata_json_, + nullptr, // callback + true, // allow exceptions + true // ignore comments + ); + + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(1); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); +} + void zarr::ZarrV3::write_array_metadata_(size_t level) const { @@ -143,65 +212,10 @@ zarr::ZarrV3::write_array_metadata_(size_t level) const }) }, }); - auto path = (dataset_root_ / "meta" / "root" / - (std::to_string(level) + ".array.json")) - .string(); - common::write_string(path, metadata.dump(4)); -} - -/// @brief Write the external metadata. -/// @details This is a no-op for ZarrV3. Instead, external metadata is -/// stored in the group metadata. -void -zarr::ZarrV3::write_external_metadata_() const -{ - // no-op -} - -/// @brief Write the metadata for the dataset. -void -zarr::ZarrV3::write_base_metadata_() const -{ - namespace fs = std::filesystem; - using json = nlohmann::json; - - json metadata; - metadata["extensions"] = json::array(); - metadata["metadata_encoding"] = - "https://purl.org/zarr/spec/protocol/core/3.0"; - metadata["metadata_key_suffix"] = ".json"; - metadata["zarr_format"] = "https://purl.org/zarr/spec/protocol/core/3.0"; - - auto path = (dataset_root_ / "zarr.json").string(); - common::write_string(path, metadata.dump(4)); -} - -/// @brief Write the metadata for the group. -/// @details Zarr v3 stores group metadata in -/// /meta/{group_name}.group.json. We will call the group "root". -void -zarr::ZarrV3::write_group_metadata_() const -{ - namespace fs = std::filesystem; - using json = nlohmann::json; - - json metadata; - metadata["attributes"]["acquire"] = - external_metadata_json_.empty() ? "" - : json::parse(external_metadata_json_, - nullptr, // callback - true, // allow exceptions - true // ignore comments - ); - - auto path = (dataset_root_ / "meta" / "root.group.json").string(); - common::write_string(path, metadata.dump(4)); -} - -fs::path -zarr::ZarrV3::get_data_directory_() const -{ - return dataset_root_ / "data" / "root"; + const std::string metadata_str = metadata.dump(4); + const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); + Sink* sink = metadata_sinks_.at(2 + level); + CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } extern "C" diff --git a/src/zarr.v3.hh b/src/zarr.v3.hh index d546cdf5..319dee01 100644 --- a/src/zarr.v3.hh +++ b/src/zarr.v3.hh @@ -19,13 +19,15 @@ struct ZarrV3 final : public Zarr void allocate_writers_() override; /// Metadata - void write_array_metadata_(size_t level) const override; - void write_external_metadata_() const override; + std::vector make_metadata_sink_paths_() override; + + // fixed metadata void write_base_metadata_() const override; - void write_group_metadata_() const override; + void write_external_metadata_() const override; - /// Filesystem - fs::path get_data_directory_() const override; + // mutable metadata, changes on flush + void write_group_metadata_() const override; + void write_array_metadata_(size_t level) const override; }; } // namespace acquire::sink::zarr #endif // H_ACQUIRE_STORAGE_ZARR_V3_V0 diff --git a/tests/get.cpp b/tests/get.cpp index cb387fc6..d8325e29 100644 --- a/tests/get.cpp +++ b/tests/get.cpp @@ -92,9 +92,9 @@ main() // unconfigured behavior CHECK(storage_get(storage, &props) == Device_Ok); - CHECK(props.filename.str); - CHECK(strcmp(props.filename.str, "") == 0); - CHECK(props.filename.nbytes == 1); + CHECK(props.uri.str); + CHECK(strcmp(props.uri.str, "") == 0); + CHECK(props.uri.nbytes == 1); CHECK(props.external_metadata_json.str); CHECK(strcmp(props.external_metadata_json.str, "") == 0); @@ -132,7 +132,8 @@ main() CHECK(Device_Ok == storage_set(storage, &props)); CHECK(Device_Ok == storage_get(storage, &props)); - CHECK(strcmp(props.filename.str, TEST ".zarr") == 0); + std::string uri{ props.uri.str }; + CHECK(uri.ends_with(TEST ".zarr")); CHECK(strcmp(props.external_metadata_json.str, R"({"foo":"bar"})") == 0); diff --git a/tests/repeat-start.cpp b/tests/repeat-start.cpp index 9f1b05ea..c9a50b13 100644 --- a/tests/repeat-start.cpp +++ b/tests/repeat-start.cpp @@ -141,7 +141,9 @@ validate(AcquireRuntime* runtime) AcquireProperties props = { 0 }; OK(acquire_get_configuration(runtime, &props)); - const fs::path test_path(props.video[0].storage.settings.filename.str); + std::string uri{ props.video[0].storage.settings.uri.str }; + uri.replace(uri.find("file://"), 7, ""); + const fs::path test_path(uri); EXPECT(fs::is_directory(test_path), "Expected %s to be a directory", test_path.string().c_str()); diff --git a/tests/unit-tests.cpp b/tests/unit-tests.cpp index 130abf92..32b13be8 100644 --- a/tests/unit-tests.cpp +++ b/tests/unit-tests.cpp @@ -83,8 +83,8 @@ main() const std::vector tests{ #define CASE(e) { .name = #e, .test = (int (*)())lib_load(&lib, #e) } CASE(unit_test__average_frame), - CASE(unit_test__file_creator__create_chunk_files), - CASE(unit_test__file_creator__create_shard_files), + CASE(unit_test__file_creator__create_chunk_sinks), + CASE(unit_test__file_creator__create_shard_sinks), CASE(unit_test__chunk_lattice_index), CASE(unit_test__tile_group_offset), CASE(unit_test__chunk_internal_offset),