From 0b6065bc97f84719dad0a110055e78711461a9e5 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Tue, 2 Jul 2024 13:00:54 -0400 Subject: [PATCH] 238: Implement S3 Sinks, part 1 (no S3 sinks yet implemented) (#252) - Updates acquire-common to latest `main` - Accounts for redefinition of `VideoFrame::bytes_of_frame` for 8-aligned `VideoFrame` pointers - Removes `SinkCreator` and `SinkType` concepts - Replaces `FileCreator` with `SinkCreator` object - Replaces bare pointers to `Sink` with shared pointers thereto - Factors code common to creating both chunk and shard `Sink`s - Creates metadata `Sink`s directly in the Zarr object - Adds test/README.md --- acquire-common | 2 +- src/CMakeLists.txt | 2 + src/common.cpp | 21 +- src/common.hh | 11 +- src/writers/file.sink.cpp | 405 +--------------------------------- src/writers/file.sink.hh | 56 +---- src/writers/sink.creator.cpp | 382 ++++++++++++++++++++++++++++++++ src/writers/sink.creator.hh | 77 +++++++ src/writers/sink.hh | 50 +---- src/writers/writer.cpp | 47 ++-- src/writers/writer.hh | 14 +- src/writers/zarrv2.writer.cpp | 67 ++++-- src/writers/zarrv2.writer.hh | 2 +- src/writers/zarrv3.writer.cpp | 66 ++++-- src/writers/zarrv3.writer.hh | 2 +- src/zarr.cpp | 27 +-- src/zarr.hh | 15 +- src/zarr.v2.cpp | 50 ++--- src/zarr.v2.hh | 2 +- src/zarr.v3.cpp | 43 ++-- src/zarr.v3.hh | 2 +- tests/README.md | 10 + tests/unit-tests.cpp | 4 +- 23 files changed, 689 insertions(+), 668 deletions(-) create mode 100644 src/writers/sink.creator.cpp create mode 100644 src/writers/sink.creator.hh create mode 100644 tests/README.md diff --git a/acquire-common b/acquire-common index 85a5d90a..7afe044c 160000 --- a/acquire-common +++ b/acquire-common @@ -1 +1 @@ -Subproject commit 85a5d90a857ce5b272d77dc09553e653cb709a5b +Subproject commit 7afe044c0bb91c24bf8f397205b42f47ccade05a diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e436f9f3..5a74ce85 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,8 @@ add_library(${tgt} MODULE common.hh common.cpp writers/sink.hh + writers/sink.creator.hh + writers/sink.creator.cpp writers/file.sink.hh writers/file.sink.cpp writers/writer.hh diff --git a/src/common.cpp b/src/common.cpp index 5869e4e4..d79ebd30 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -224,20 +224,9 @@ common::sample_type_to_string(SampleType t) noexcept } } -void -common::write_string(const std::string& path, const std::string& str) -{ - if (auto p = fs::path(path); !fs::exists(p.parent_path())) - fs::create_directories(p.parent_path()); - - struct file f = { 0 }; - auto is_ok = file_create(&f, path.c_str(), path.size()); - is_ok &= file_write(&f, // file - 0, // offset - (uint8_t*)str.c_str(), // cur - (uint8_t*)(str.c_str() + str.size()) // end - ); - EXPECT(is_ok, "Write to \"%s\" failed.", path.c_str()); - TRACE("Wrote %d bytes to \"%s\".", str.size(), path.c_str()); - file_close(&f); +size_t +common::align_up(size_t n, size_t align) +{ + EXPECT(align > 0, "Alignment must be greater than zero."); + return align * ((n + align - 1) / align); } diff --git a/src/common.hh b/src/common.hh index 98bff0fe..2e10efe2 100644 --- a/src/common.hh +++ b/src/common.hh @@ -144,11 +144,12 @@ sample_type_to_dtype(SampleType t); const char* sample_type_to_string(SampleType t) noexcept; -/// @brief Write a string to a file. -/// @param path The path of the file to write. -/// @param str The string to write. -void -write_string(const std::string& path, const std::string& value); +/// @brief Align a size to a given alignment. +/// @param n Size to align. +/// @param align Alignment. +/// @return Aligned size. +size_t +align_up(size_t n, size_t align); } // namespace acquire::sink::zarr::common } // namespace acquire::sink::zarr diff --git a/src/writers/file.sink.cpp b/src/writers/file.sink.cpp index b222c197..d580cf5c 100644 --- a/src/writers/file.sink.cpp +++ b/src/writers/file.sink.cpp @@ -4,39 +4,11 @@ namespace zarr = acquire::sink::zarr; -template<> -zarr::Sink* -zarr::sink_open(const std::string& uri) -{ - return (Sink*)new FileSink(uri); -} - -template<> -void -zarr::sink_close(Sink* sink_) -{ - if (!sink_) { - return; - } - - auto* sink = static_cast(sink_); - file_close(sink->file_); - sink->file_ = nullptr; - delete sink; -} - zarr::FileSink::FileSink(const std::string& uri) - : file_{ new struct file } + : file_(new struct file, &file_close) { - CHECK(file_create(file_, uri.c_str(), uri.size() + 1)); -} - -zarr::FileSink::~FileSink() -{ - if (file_) { - file_close(file_); - file_ = nullptr; - } + CHECK(file_); + CHECK(file_create(file_.get(), uri.c_str(), uri.size() + 1)); } bool @@ -46,374 +18,5 @@ zarr::FileSink::write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) return false; } - return file_write(file_, offset, buf, buf + bytes_of_buf); -} - -zarr::FileCreator::FileCreator(std::shared_ptr thread_pool) - : thread_pool_(thread_pool) -{ -} - -bool -zarr::FileCreator::create_chunk_sinks(const std::string& base_uri, - const std::vector& dimensions, - std::vector& chunk_sinks) -{ - const std::string base_dir = - base_uri.starts_with("file://") ? base_uri.substr(7) : base_uri; - - std::queue paths; - paths.push(base_dir); - - if (!make_dirs_(paths)) { - return false; - } - - // create directories - for (auto i = dimensions.size() - 2; i >= 1; --i) { - const auto& dim = dimensions.at(i); - const auto n_chunks = common::chunks_along_dimension(dim); - - auto n_paths = paths.size(); - for (auto j = 0; j < n_paths; ++j) { - const auto path = paths.front(); - paths.pop(); - - for (auto k = 0; k < n_chunks; ++k) { - paths.push(path / std::to_string(k)); - } - } - - if (!make_dirs_(paths)) { - return false; - } - } - - // create files - { - const auto& dim = dimensions.front(); - const auto n_chunks = common::chunks_along_dimension(dim); - - auto n_paths = paths.size(); - for (auto i = 0; i < n_paths; ++i) { - const auto path = paths.front(); - paths.pop(); - for (auto j = 0; j < n_chunks; ++j) { - paths.push(path / std::to_string(j)); - } - } - } - - return make_files_(paths, chunk_sinks); -} - -bool -zarr::FileCreator::create_shard_sinks(const std::string& base_uri, - const std::vector& dimensions, - std::vector& shard_sinks) -{ - const std::string base_dir = - base_uri.starts_with("file://") ? base_uri.substr(7) : base_uri; - - std::queue paths; - paths.push(base_dir); - - if (!make_dirs_(paths)) { - return false; - } - - // create directories - for (auto i = dimensions.size() - 2; i >= 1; --i) { - const auto& dim = dimensions.at(i); - const auto n_shards = common::shards_along_dimension(dim); - CHECK(n_shards); - - auto n_paths = paths.size(); - for (auto j = 0; j < n_paths; ++j) { - const auto path = paths.front(); - paths.pop(); - - for (auto k = 0; k < n_shards; ++k) { - paths.push(path / std::to_string(k)); - } - } - - if (!make_dirs_(paths)) { - return false; - } - } - - // create files - { - const auto& dim = dimensions.front(); - const auto n_shards = common::shards_along_dimension(dim); - CHECK(n_shards); - - auto n_paths = paths.size(); - for (auto i = 0; i < n_paths; ++i) { - const auto path = paths.front(); - paths.pop(); - for (auto j = 0; j < n_shards; ++j) { - paths.push(path / std::to_string(j)); - } - } - } - - return make_files_(paths, shard_sinks); -} - -bool -zarr::FileCreator::create_metadata_sinks(const std::vector& paths, - std::vector& metadata_sinks) -{ - if (paths.empty()) { - return true; - } - - std::queue file_paths; - for (const auto& path : paths) { - fs::path p = path; - fs::create_directories(p.parent_path()); - file_paths.push(p); - } - - return make_files_(file_paths, metadata_sinks); -} - -bool -zarr::FileCreator::make_dirs_(std::queue& dir_paths) -{ - if (dir_paths.empty()) { - return true; - } - - std::atomic all_successful = true; - - const auto n_dirs = dir_paths.size(); - std::latch latch(n_dirs); - - for (auto i = 0; i < n_dirs; ++i) { - const auto dirname = dir_paths.front(); - dir_paths.pop(); - - thread_pool_->push_to_job_queue( - [dirname, &latch, &all_successful](std::string& err) -> bool { - bool success = false; - - try { - if (fs::exists(dirname)) { - EXPECT(fs::is_directory(dirname), - "'%s' exists but is not a directory", - dirname.c_str()); - } else if (all_successful) { - std::error_code ec; - EXPECT(fs::create_directories(dirname, ec), - "%s", - ec.message().c_str()); - } - success = true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create directory '%s': %s.", - dirname.string().c_str(), - exc.what()); - err = buf; - } catch (...) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create directory '%s': (unknown).", - dirname.string().c_str()); - err = buf; - } - - latch.count_down(); - all_successful = all_successful && success; - - return success; - }); - - dir_paths.push(dirname); - } - - latch.wait(); - - return all_successful; + return file_write(file_.get(), offset, buf, buf + bytes_of_buf); } - -bool -zarr::FileCreator::make_files_(std::queue& file_paths, - std::vector& files) -{ - if (file_paths.empty()) { - return true; - } - - std::atomic all_successful = true; - - const auto n_files = file_paths.size(); - files.resize(n_files); - std::fill(files.begin(), files.end(), nullptr); - std::latch latch(n_files); - - for (auto i = 0; i < n_files; ++i) { - const auto filename = file_paths.front(); - file_paths.pop(); - - Sink** psink = files.data() + i; - - thread_pool_->push_to_job_queue( - [filename, psink, &latch, &all_successful](std::string& err) -> bool { - bool success = false; - - try { - if (all_successful) { - *psink = - (FileSink*)sink_open(filename.string()); - } - success = true; - } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create file '%s': %s.", - filename.string().c_str(), - exc.what()); - err = buf; - } catch (...) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to create file '%s': (unknown).", - filename.string().c_str()); - err = buf; - } - - latch.count_down(); - all_successful = all_successful && success; - - return success; - }); - } - - latch.wait(); - - return all_successful; -} - -#ifndef NO_UNIT_TESTS -#ifdef _WIN32 -#define acquire_export __declspec(dllexport) -#else -#define acquire_export -#endif - -namespace common = zarr::common; - -extern "C" -{ - acquire_export int unit_test__file_creator__create_chunk_sinks() - { - const fs::path base_dir = fs::temp_directory_path() / "acquire"; - int retval = 0; - - try { - auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), - [](const std::string& err) { LOGE("Error: %s\n", err.c_str()); }); - zarr::FileCreator file_creator{ thread_pool }; - - std::vector dims; - dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks - dims.emplace_back("y", DimensionType_Space, 4, 2, 0); // 2 chunks - dims.emplace_back( - "z", DimensionType_Space, 0, 3, 0); // 3 timepoints per chunk - - std::vector files; - CHECK( - file_creator.create_chunk_sinks(base_dir.string(), dims, files)); - - CHECK(files.size() == 5 * 2); - std::for_each(files.begin(), files.end(), [](zarr::Sink* f) { - sink_close(f); - }); - - CHECK(fs::is_directory(base_dir)); - for (auto y = 0; y < 2; ++y) { - CHECK(fs::is_directory(base_dir / std::to_string(y))); - for (auto x = 0; x < 5; ++x) { - CHECK(fs::is_regular_file(base_dir / std::to_string(y) / - std::to_string(x))); - } - } - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - // cleanup - if (fs::exists(base_dir)) { - fs::remove_all(base_dir); - } - return retval; - } - - acquire_export int unit_test__file_creator__create_shard_sinks() - { - const fs::path base_dir = fs::temp_directory_path() / "acquire"; - int retval = 0; - - try { - auto thread_pool = std::make_shared( - std::thread::hardware_concurrency(), - [](const std::string& err) { LOGE("Error: %s", err.c_str()); }); - zarr::FileCreator file_creator{ thread_pool }; - - std::vector dims; - dims.emplace_back( - "x", DimensionType_Space, 10, 2, 5); // 5 chunks, 1 shard - dims.emplace_back( - "y", DimensionType_Space, 4, 2, 1); // 2 chunks, 2 shards - dims.emplace_back( - "z", DimensionType_Space, 8, 2, 2); // 4 chunks, 2 shards - - std::vector files; - CHECK( - file_creator.create_shard_sinks(base_dir.string(), dims, files)); - - CHECK(files.size() == 2); - std::for_each(files.begin(), files.end(), [](zarr::Sink* f) { - sink_close(f); - }); - - CHECK(fs::is_directory(base_dir)); - for (auto y = 0; y < 2; ++y) { - CHECK(fs::is_directory(base_dir / std::to_string(y))); - for (auto x = 0; x < 1; ++x) { - CHECK(fs::is_regular_file(base_dir / std::to_string(y) / - std::to_string(x))); - } - } - - // cleanup - fs::remove_all(base_dir); - - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - // cleanup - if (fs::exists(base_dir)) { - fs::remove_all(base_dir); - } - return retval; - } -} // extern "C" -#endif // NO_UNIT_TESTS \ No newline at end of file diff --git a/src/writers/file.sink.hh b/src/writers/file.sink.hh index 4be362f6..87d98031 100644 --- a/src/writers/file.sink.hh +++ b/src/writers/file.sink.hh @@ -1,63 +1,23 @@ -#ifndef H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 -#define H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 +#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_FILE_SINK_V0 +#define H_ACQUIRE_STORAGE_ZARR_WRITERS_FILE_SINK_V0 #include "sink.hh" #include "platform.h" -#include - -namespace fs = std::filesystem; +#include namespace acquire::sink::zarr { struct FileSink : public Sink -{ - explicit FileSink(const std::string& uri); - ~FileSink() override; - - [[nodiscard]] bool write(size_t offset, - const uint8_t* buf, - size_t bytes_of_buf) override; - - struct file* file_; -}; - -struct FileCreator { public: - FileCreator() = delete; - explicit FileCreator(std::shared_ptr thread_pool); - ~FileCreator() noexcept = default; - - [[nodiscard]] bool create_chunk_sinks( - const std::string& base_uri, - const std::vector& dimensions, - std::vector& chunk_sinks); - - [[nodiscard]] bool create_shard_sinks( - const std::string& base_uri, - const std::vector& dimensions, - std::vector& shard_sinks); + FileSink() = delete; + explicit FileSink(const std::string& uri); - [[nodiscard]] bool create_metadata_sinks( - const std::vector& paths, - std::vector& metadata_sinks); + bool write(size_t offset, const uint8_t* buf, size_t bytes_of_buf) override; private: - std::shared_ptr thread_pool_; - - /// @brief Parallel create a collection of directories. - /// @param[in] dir_paths The directories to create. - /// @return True iff all directories were created successfully. - [[nodiscard]] bool make_dirs_(std::queue& dir_paths); - - /// @brief Parallel create a collection of files. - /// @param[in,out] file_paths The files to create. Unlike `make_dirs_`, - /// this function drains the queue. - /// @param[out] files The files created. - /// @return True iff all files were created successfully. - [[nodiscard]] bool make_files_(std::queue& file_paths, - std::vector& files); + std::unique_ptr file_; }; } // namespace acquire::sink::zarr -#endif // H_ACQUIRE_STORAGE_ZARR_FILESYSTEM_SINK_V0 +#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_FILE_SINK_V0 diff --git a/src/writers/sink.creator.cpp b/src/writers/sink.creator.cpp new file mode 100644 index 00000000..bcde5ae9 --- /dev/null +++ b/src/writers/sink.creator.cpp @@ -0,0 +1,382 @@ +#include "sink.creator.hh" +#include "file.sink.hh" + +#include +#include + +namespace zarr = acquire::sink::zarr; +namespace common = zarr::common; + +zarr::SinkCreator::SinkCreator(std::shared_ptr thread_pool_) + : thread_pool_{ thread_pool_ } +{ +} + +bool +zarr::SinkCreator::make_data_sinks( + const std::string& base_uri, + const std::vector& dimensions, + const std::function& parts_along_dimension, + std::vector>& part_sinks) +{ + std::queue paths; + + std::string base_dir = base_uri; + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + paths.emplace(base_dir); + + if (!make_dirs_(paths)) { + return false; + } + + // create directories + for (auto i = dimensions.size() - 2; i >= 1; --i) { + const auto& dim = dimensions.at(i); + const auto n_parts = parts_along_dimension(dim); + CHECK(n_parts); + + auto n_paths = paths.size(); + for (auto j = 0; j < n_paths; ++j) { + const auto path = paths.front(); + paths.pop(); + + for (auto k = 0; k < n_parts; ++k) { + const auto kstr = std::to_string(k); + paths.push(path + (path.empty() ? kstr : "/" + kstr)); + } + } + + if (!make_dirs_(paths)) { + return false; + } + } + + // create files + { + const auto& dim = dimensions.front(); + const auto n_parts = parts_along_dimension(dim); + CHECK(n_parts); + + auto n_paths = paths.size(); + for (auto i = 0; i < n_paths; ++i) { + const auto path = paths.front(); + paths.pop(); + for (auto j = 0; j < n_parts; ++j) { + paths.push(path + "/" + std::to_string(j)); + } + } + } + + return make_files_(paths, part_sinks); +} + +bool +zarr::SinkCreator::create_v2_metadata_sinks( + const std::string& base_uri, + size_t n_arrays, + std::vector>& metadata_sinks) +{ + if (base_uri.empty()) { + LOGE("Base URI is empty."); + return false; + } + + std::queue dir_paths, file_paths; + + file_paths.emplace(".metadata"); // base metadata + file_paths.emplace("0/.zattrs"); // external metadata + file_paths.emplace(".zattrs"); // group metadata + + for (auto i = 0; i < n_arrays; ++i) { + const auto idx_string = std::to_string(i); + dir_paths.push(idx_string); + file_paths.push(idx_string + "/.zarray"); // array metadata + } + + return make_metadata_sinks_( + base_uri, dir_paths, file_paths, metadata_sinks); +} + +bool +zarr::SinkCreator::create_v3_metadata_sinks( + const std::string& base_uri, + size_t n_arrays, + std::vector>& metadata_sinks) +{ + if (base_uri.empty()) { + LOGE("Base URI is empty."); + return false; + } + + std::queue dir_paths, file_paths; + + dir_paths.emplace("meta"); + dir_paths.emplace("meta/root"); + + file_paths.emplace("zarr.json"); + file_paths.emplace("meta/root.group.json"); + for (auto i = 0; i < n_arrays; ++i) { + file_paths.push("meta/root/" + std::to_string(i) + ".array.json"); + } + + return make_metadata_sinks_( + base_uri, dir_paths, file_paths, metadata_sinks); +} + +bool +zarr::SinkCreator::make_dirs_(std::queue& dir_paths) +{ + if (dir_paths.empty()) { + return true; + } + + std::atomic all_successful = 1; + + const auto n_dirs = dir_paths.size(); + std::latch latch(n_dirs); + + for (auto i = 0; i < n_dirs; ++i) { + const auto dirname = dir_paths.front(); + dir_paths.pop(); + + thread_pool_->push_to_job_queue( + [dirname, &latch, &all_successful](std::string& err) -> bool { + bool success = false; + + try { + if (fs::exists(dirname)) { + EXPECT(fs::is_directory(dirname), + "'%s' exists but is not a directory", + dirname.c_str()); + } else if (all_successful) { + std::error_code ec; + EXPECT(fs::create_directories(dirname, ec), + "%s", + ec.message().c_str()); + } + success = true; + } catch (const std::exception& exc) { + err = "Failed to create directory '" + dirname + + "': " + exc.what(); + } catch (...) { + err = + "Failed to create directory '" + dirname + "': (unknown)."; + } + + latch.count_down(); + all_successful.fetch_and((char)success); + + return success; + }); + + dir_paths.push(dirname); + } + + latch.wait(); + + return (bool)all_successful; +} + +bool +zarr::SinkCreator::make_files_(std::queue& file_paths, + std::vector>& sinks) +{ + if (file_paths.empty()) { + return true; + } + + std::atomic all_successful = 1; + + const auto n_files = file_paths.size(); + sinks.resize(n_files); + std::fill(sinks.begin(), sinks.end(), nullptr); + std::latch latch(n_files); + + for (auto i = 0; i < n_files; ++i) { + const auto filename = file_paths.front(); + file_paths.pop(); + + std::unique_ptr* psink = sinks.data() + i; + + thread_pool_->push_to_job_queue( + [filename, psink, &latch, &all_successful](std::string& err) -> bool { + bool success = false; + + try { + if (all_successful) { + *psink = std::make_unique(filename); + } + success = true; + } catch (const std::exception& exc) { + err = + "Failed to create file '" + filename + "': " + exc.what(); + } catch (...) { + err = "Failed to create file '" + filename + "': (unknown)."; + } + + latch.count_down(); + all_successful.fetch_and((char)success); + + return success; + }); + } + + latch.wait(); + + return (bool)all_successful; +} + +bool +zarr::SinkCreator::make_metadata_sinks_( + const std::string& base_uri, + std::queue& dir_paths, + std::queue& file_paths, + std::vector>& metadata_sinks) +{ + std::string base_dir = base_uri; + if (base_uri.starts_with("file://")) { + base_dir = base_uri.substr(7); + } + + // remove trailing slashes + if (base_uri.ends_with("/") || base_uri.ends_with("\\")) { + base_dir = base_dir.substr(0, base_dir.size() - 1); + } + + // create the base directories if they don't already exist + // we create them in serial because + // 1. there are only a few of them; and + // 2. they may be nested + while (!dir_paths.empty()) { + const auto dir = base_dir + "/" + dir_paths.front(); + dir_paths.pop(); + if (!fs::is_directory(dir) && !fs::create_directories(dir)) { + return false; + } + } + + // make files + size_t n_paths = file_paths.size(); + for (auto i = 0; i < n_paths; ++i) { + const auto path = base_dir + "/" + file_paths.front(); + file_paths.pop(); + file_paths.push(path); + } + + if (!make_files_(file_paths, metadata_sinks)) { + return false; + } + + return true; +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif + +extern "C" +{ + acquire_export int unit_test__sink_creator__create_chunk_file_sinks() + { + const fs::path base_dir = fs::temp_directory_path() / "acquire"; + int retval = 0; + + try { + auto thread_pool = std::make_shared( + std::thread::hardware_concurrency(), + [](const std::string& err) { LOGE("Error: %s\n", err.c_str()); }); + zarr::SinkCreator creator{ thread_pool }; + + std::vector dims; + dims.emplace_back("x", DimensionType_Space, 10, 2, 0); // 5 chunks + dims.emplace_back("y", DimensionType_Space, 4, 2, 0); // 2 chunks + dims.emplace_back( + "z", DimensionType_Space, 0, 3, 0); // 3 timepoints per chunk + + std::vector> files; + CHECK(creator.make_data_sinks( + base_dir.string(), dims, common::chunks_along_dimension, files)); + + CHECK(files.size() == 5 * 2); + files.clear(); // closes files + + CHECK(fs::is_directory(base_dir)); + for (auto y = 0; y < 2; ++y) { + CHECK(fs::is_directory(base_dir / std::to_string(y))); + for (auto x = 0; x < 5; ++x) { + CHECK(fs::is_regular_file(base_dir / std::to_string(y) / + std::to_string(x))); + } + } + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + // cleanup + if (fs::exists(base_dir)) { + fs::remove_all(base_dir); + } + return retval; + } + + acquire_export int unit_test__sink_creator__create_shard_file_sinks() + { + const fs::path base_dir = fs::temp_directory_path() / "acquire"; + int retval = 0; + + try { + auto thread_pool = std::make_shared( + std::thread::hardware_concurrency(), + [](const std::string& err) { LOGE("Error: %s", err.c_str()); }); + zarr::SinkCreator creator{ thread_pool }; + + std::vector dims; + dims.emplace_back( + "x", DimensionType_Space, 10, 2, 5); // 5 chunks, 1 shard + dims.emplace_back( + "y", DimensionType_Space, 4, 2, 1); // 2 chunks, 2 shards + dims.emplace_back( + "z", DimensionType_Space, 8, 2, 2); // 4 chunks, 2 shards + + std::vector> files; + CHECK(creator.make_data_sinks( + base_dir.string(), dims, common::shards_along_dimension, files)); + + CHECK(files.size() == 2); + files.clear(); // closes files + + CHECK(fs::is_directory(base_dir)); + for (auto y = 0; y < 2; ++y) { + CHECK(fs::is_directory(base_dir / std::to_string(y))); + for (auto x = 0; x < 1; ++x) { + CHECK(fs::is_regular_file(base_dir / std::to_string(y) / + std::to_string(x))); + } + } + + // cleanup + fs::remove_all(base_dir); + + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + // cleanup + if (fs::exists(base_dir)) { + fs::remove_all(base_dir); + } + return retval; + } +} // extern "C" +#endif // NO_UNIT_TESTS diff --git a/src/writers/sink.creator.hh b/src/writers/sink.creator.hh new file mode 100644 index 00000000..07d60632 --- /dev/null +++ b/src/writers/sink.creator.hh @@ -0,0 +1,77 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 +#define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 + +#include "sink.hh" +#include "../common.hh" + +#include +#include + +namespace acquire::sink::zarr { +struct SinkCreator final +{ + public: + SinkCreator() = delete; + explicit SinkCreator(std::shared_ptr thread_pool); + ~SinkCreator() noexcept = default; + + /// @brief Create a collection of data sinks, either chunk or shard. + /// @param[in] base_uri The base URI for the sinks. + /// @param[in] dimensions The dimensions of the data. + /// @param[in] parts_along_dimension Function for computing the number of + /// parts (either chunk or shard) along each dimension. + /// @param[out] part_sinks The sinks created. + [[nodiscard]] bool make_data_sinks( + const std::string& base_uri, + const std::vector& dimensions, + const std::function& parts_along_dimension, + std::vector>& part_sinks); + + /// @brief Create a collection of metadata sinks for a Zarr V2 dataset. + /// @param[in] base_uri The base URI for the dataset. + /// @param[in] n_arrays The number of data arrays. + /// @param[out] metadata_sinks The sinks created. + [[nodiscard]] bool create_v2_metadata_sinks( + const std::string& base_uri, + size_t n_arrays, + std::vector>& metadata_sinks); + + /// @brief Create a collection of metadata sinks for a Zarr V3 dataset. + /// @param[in] base_uri The base URI for the dataset. + /// @param[in] n_arrays The number of data arrays. + /// @param[out] metadata_sinks The sinks created. + [[nodiscard]] bool create_v3_metadata_sinks( + const std::string& base_uri, + size_t n_arrays, + std::vector>& metadata_sinks); + + private: + std::shared_ptr thread_pool_; + + /// @brief Parallel create a collection of directories. + /// @param[in] dir_paths The directories to create. + /// @return True iff all directories were created successfully. + [[nodiscard]] bool make_dirs_(std::queue& dir_paths); + + /// @brief Parallel create a collection of files. + /// @param[in,out] file_paths The files to create. Unlike `make_dirs_`, + /// this function drains the queue. + /// @param[out] files The files created. + /// @return True iff all files were created successfully. + [[nodiscard]] bool make_files_(std::queue& file_paths, + std::vector>& sinks); + + /// @brief Create a collection of metadata sinks. + /// @param[in] base_uri The base URI for the sinks. + /// @param[in] dir_paths The directories to create. + /// @param[in] file_paths The files to create. + /// @param[out] metadata_sinks The sinks created. + [[nodiscard]] bool make_metadata_sinks_( + const std::string& base_uri, + std::queue& dir_paths, + std::queue& file_paths, + std::vector>& metadata_sinks); +}; +} // namespace acquire::sink::zarr + +#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 \ No newline at end of file diff --git a/src/writers/sink.hh b/src/writers/sink.hh index f48b8524..e90f4601 100644 --- a/src/writers/sink.hh +++ b/src/writers/sink.hh @@ -1,5 +1,5 @@ -#ifndef H_ACQUIRE_STORAGE_ZARR_SINK_V0 -#define H_ACQUIRE_STORAGE_ZARR_SINK_V0 +#ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_V0 +#define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_V0 #include #include @@ -13,42 +13,14 @@ struct Sink { virtual ~Sink() noexcept = default; - virtual bool write(size_t offset, - const uint8_t* buf, - size_t bytes_of_buf) = 0; + /// @brief Write data to the sink. + /// @param offset The offset in the sink to write to. + /// @param buf The buffer to write to the sink. + /// @param bytes_of_buf The number of bytes to write from @p buf. + /// @return True if the write was successful, false otherwise. + [[nodiscard]] virtual bool write(size_t offset, + const uint8_t* buf, + size_t bytes_of_buf) = 0; }; - -template -concept SinkType = - requires(SinkT sink, size_t offset, const uint8_t* buf, size_t bytes_of_buf) { - { sink.write(offset, buf, bytes_of_buf) } -> std::convertible_to; - }; - -template -concept SinkCreator = requires(SinkCreatorT sink_creator, - const std::string& base_uri, - std::vector dimensions, - const std::vector& paths, - std::vector& sinks) { - { - sink_creator.create_chunk_sinks(base_uri, dimensions, sinks) - } -> std::convertible_to; - { - sink_creator.create_shard_sinks(base_uri, dimensions, sinks) - } -> std::convertible_to; - { - sink_creator.create_metadata_sinks(paths, sinks) - } -> std::convertible_to; -}; - -template -Sink* -sink_open(const std::string& uri); - -template -void -sink_close(Sink* sink); - } // namespace acquire::sink::zarr - -#endif // H_ACQUIRE_STORAGE_ZARR_SINK_V0 +#endif // H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_V0 diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index c9f137d9..e919d19a 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -104,7 +104,7 @@ chunk_internal_offset(size_t frame_id, } // end ::{anonymous} namespace bool -zarr::downsample(const ArrayConfig& config, ArrayConfig& downsampled_config) +zarr::downsample(const WriterConfig& config, WriterConfig& downsampled_config) { // downsample dimensions downsampled_config.dimensions.clear(); @@ -176,7 +176,7 @@ zarr::downsample(const ArrayConfig& config, ArrayConfig& downsampled_config) } /// Writer -zarr::Writer::Writer(const ArrayConfig& config, +zarr::Writer::Writer(const WriterConfig& config, std::shared_ptr thread_pool) : config_{ config } , thread_pool_{ thread_pool } @@ -199,9 +199,9 @@ zarr::Writer::write(const VideoFrame* frame) // split the incoming frame into tiles and write them to the chunk buffers const auto& dimensions = config_.dimensions; - const auto bytes_written = write_frame_to_chunks_( - frame->data, frame->bytes_of_frame - sizeof(*frame)); - const auto bytes_of_frame = frame->bytes_of_frame - sizeof(*frame); + const auto bytes_written = + write_frame_to_chunks_(frame->data, bytes_of_image(&frame->shape)); + const auto bytes_of_frame = bytes_of_image(&frame->shape); CHECK(bytes_written == bytes_of_frame); bytes_to_flush_ += bytes_written; ++frames_written_; @@ -218,11 +218,11 @@ zarr::Writer::finalize() { is_finalizing_ = true; flush_(); - close_files_(); + close_sinks_(); is_finalizing_ = false; } -const zarr::ArrayConfig& +const zarr::WriterConfig& zarr::Writer::config() const noexcept { return config_; @@ -435,13 +435,8 @@ zarr::Writer::flush_() } void -zarr::Writer::close_files_() +zarr::Writer::close_sinks_() { - for (Sink* sink_ : sinks_) { - if (auto* sink = dynamic_cast(sink_)) { - sink_close(sink_); - } - } sinks_.clear(); } @@ -450,7 +445,7 @@ zarr::Writer::rollover_() { TRACE("Rolling over"); - close_files_(); + close_sinks_(); ++append_chunk_index_; } @@ -466,7 +461,7 @@ namespace common = zarr::common; class TestWriter : public zarr::Writer { public: - TestWriter(const zarr::ArrayConfig& array_spec, + TestWriter(const zarr::WriterConfig& array_spec, std::shared_ptr thread_pool) : zarr::Writer(array_spec, thread_pool) { @@ -776,22 +771,30 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u16, }; - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - TestWriter writer(array_spec, thread_pool); + TestWriter writer(config, thread_pool); + + const size_t frame_size = 64 * 48 * 2; - frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48 * 2); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48 * 2; + frame = (VideoFrame*)malloc(sizeof(VideoFrame) + frame_size); + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + frame_size, 8); frame->shape = shape; - memset(frame->data, 0, 64 * 48 * 2); + memset(frame->data, 0, frame_size); for (auto i = 0; i < 2 * 1 * 2; ++i) { CHECK(writer.write(frame)); @@ -820,7 +823,7 @@ extern "C" try { const fs::path base_dir = "acquire"; - zarr::ArrayConfig config { + zarr::WriterConfig config { .image_shape = { .dims = { .channels = 1, @@ -855,7 +858,7 @@ extern "C" 5, 1); // 5 timepoints / chunk, 1 shard - zarr::ArrayConfig downsampled_config; + zarr::WriterConfig downsampled_config; CHECK(zarr::downsample(config, downsampled_config)); // check dimensions diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 5239a6b2..83ea63fb 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -16,7 +16,7 @@ namespace fs = std::filesystem; namespace acquire::sink::zarr { struct Zarr; -struct ArrayConfig +struct WriterConfig final { ImageShape image_shape; std::vector dimensions; @@ -32,13 +32,13 @@ struct ArrayConfig /// false if and only if downsampling brings one or more dimensions lower than /// the chunk size along that dimension. [[nodiscard]] bool -downsample(const ArrayConfig& config, ArrayConfig& downsampled_config); +downsample(const WriterConfig& config, WriterConfig& downsampled_config); struct Writer { public: Writer() = delete; - Writer(const ArrayConfig& config, + Writer(const WriterConfig& config, std::shared_ptr thread_pool); virtual ~Writer() noexcept = default; @@ -46,19 +46,19 @@ struct Writer [[nodiscard]] bool write(const VideoFrame* frame); void finalize(); - const ArrayConfig& config() const noexcept; + const WriterConfig& config() const noexcept; uint32_t frames_written() const noexcept; protected: - ArrayConfig config_; + WriterConfig config_; /// Chunking std::vector> chunk_buffers_; /// Filesystem std::string data_root_; - std::vector sinks_; + std::vector> sinks_; /// Multithreading std::shared_ptr thread_pool_; @@ -78,7 +78,7 @@ struct Writer void flush_(); [[nodiscard]] virtual bool flush_impl_() = 0; virtual bool should_rollover_() const = 0; - void close_files_(); + void close_sinks_(); void rollover_(); }; } // namespace acquire::sink::zarr diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp index f40d15e1..af985640 100644 --- a/src/writers/zarrv2.writer.cpp +++ b/src/writers/zarrv2.writer.cpp @@ -1,4 +1,5 @@ #include "zarrv2.writer.hh" +#include "sink.creator.hh" #include "../zarr.hh" #include @@ -8,7 +9,7 @@ namespace zarr = acquire::sink::zarr; zarr::ZarrV2Writer::ZarrV2Writer( - const ArrayConfig& config, + const WriterConfig& config, std::shared_ptr thread_pool) : Writer(config, thread_pool) { @@ -23,9 +24,11 @@ zarr::ZarrV2Writer::flush_impl_() (fs::path(data_root_) / std::to_string(append_chunk_index_)).string(); { - FileCreator file_creator(thread_pool_); - if (!file_creator.create_chunk_sinks( - data_root, config_.dimensions, sinks_)) { + SinkCreator creator(thread_pool_); + if (!creator.make_data_sinks(data_root, + config_.dimensions, + common::chunks_along_dimension, + sinks_)) { return false; } } @@ -38,7 +41,7 @@ zarr::ZarrV2Writer::flush_impl_() for (auto i = 0; i < sinks_.size(); ++i) { auto& chunk = chunk_buffers_.at(i); thread_pool_->push_to_job_queue( - std::move([sink = sinks_.at(i), + std::move([&sink = sinks_.at(i), data = chunk.data(), size = chunk.size(), &latch](std::string& err) -> bool { @@ -47,14 +50,9 @@ zarr::ZarrV2Writer::flush_impl_() CHECK(sink->write(0, data, size)); success = true; } catch (const std::exception& exc) { - char buf[128]; - snprintf(buf, - sizeof(buf), - "Failed to write chunk: %s", - exc.what()); - err = buf; + err = "Failed to write chunk: " + std::string(exc.what()); } catch (...) { - err = "Unknown error"; + err = "Failed to write chunk: (unknown)"; } latch.count_down(); @@ -110,22 +108,31 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u16, }; - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(array_spec, thread_pool); + zarr::ZarrV2Writer writer(config, thread_pool); + + const size_t frame_size = 64 * 48 * 2; - frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48 * 2); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48 * 2; + frame = (VideoFrame*)malloc(sizeof(VideoFrame) + frame_size); + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + frame_size, 8); frame->shape = shape; - memset(frame->data, 0, 64 * 48 * 2); + memset(frame->data, 0, frame_size); for (auto i = 0; i < 6 * 8 * 5 * 2; ++i) { // 2 time points frame->frame_id = i; @@ -211,6 +218,12 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u8, }; @@ -220,17 +233,18 @@ extern "C" dims.emplace_back( "z", DimensionType_Space, 5, 2, 0); // 3 chunks, ragged - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(array_spec, thread_pool); + zarr::ZarrV2Writer writer(config, thread_pool); frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48; + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + 64 * 48, 8); frame->shape = shape; memset(frame->data, 0, 64 * 48); @@ -301,6 +315,12 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u8, }; @@ -312,17 +332,18 @@ extern "C" dims.emplace_back( "t", DimensionType_Time, 0, 5, 0); // 5 timepoints / chunk - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV2Writer writer(array_spec, thread_pool); + zarr::ZarrV2Writer writer(config, thread_pool); frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48; + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + 64 * 48, 8); frame->shape = shape; memset(frame->data, 0, 64 * 48); diff --git a/src/writers/zarrv2.writer.hh b/src/writers/zarrv2.writer.hh index 7179fc51..c11baa2c 100644 --- a/src/writers/zarrv2.writer.hh +++ b/src/writers/zarrv2.writer.hh @@ -20,7 +20,7 @@ struct ZarrV2Writer final : public Writer { public: ZarrV2Writer() = delete; - ZarrV2Writer(const ArrayConfig& config, + ZarrV2Writer(const WriterConfig& config, std::shared_ptr thread_pool); ~ZarrV2Writer() override = default; diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp index 44fc3f90..d21d1bd1 100644 --- a/src/writers/zarrv3.writer.cpp +++ b/src/writers/zarrv3.writer.cpp @@ -1,4 +1,5 @@ #include "zarrv3.writer.hh" +#include "sink.creator.hh" #include "../zarr.hh" #include @@ -99,7 +100,7 @@ shard_internal_index(size_t chunk_idx, } // namespace zarr::ZarrV3Writer::ZarrV3Writer( - const ArrayConfig& array_spec, + const WriterConfig& array_spec, std::shared_ptr thread_pool) : Writer(array_spec, thread_pool) , shard_file_offsets_(common::number_of_shards(array_spec.dimensions), 0) @@ -124,9 +125,12 @@ zarr::ZarrV3Writer::flush_impl_() .string(); { - FileCreator file_creator(thread_pool_); - if (sinks_.empty() && !file_creator.create_shard_sinks( - data_root, config_.dimensions, sinks_)) { + SinkCreator creator(thread_pool_); + if (sinks_.empty() && + !creator.make_data_sinks(data_root, + config_.dimensions, + common::shards_along_dimension, + sinks_)) { return false; } } @@ -149,7 +153,7 @@ zarr::ZarrV3Writer::flush_impl_() auto& chunk_table = shard_tables_.at(i); size_t* file_offset = &shard_file_offsets_.at(i); - thread_pool_->push_to_job_queue([sink = sinks_.at(i), + thread_pool_->push_to_job_queue([&sink = sinks_.at(i), &chunks, &chunk_table, file_offset, @@ -185,12 +189,9 @@ zarr::ZarrV3Writer::flush_impl_() chunk_table.size() * sizeof(uint64_t)); } } catch (const std::exception& exc) { - char buf[128]; - snprintf( - buf, sizeof(buf), "Failed to write chunk: %s", exc.what()); - err = buf; + err = "Failed to write chunk: " + std::string(exc.what()); } catch (...) { - err = "Unknown error"; + err = "Failed to write chunk: (unknown)"; } latch.count_down(); @@ -536,22 +537,31 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u16, }; - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(array_spec, thread_pool); + zarr::ZarrV3Writer writer(config, thread_pool); + + const size_t frame_size = 64 * 48 * 2; - frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48 * 2); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48 * 2; + frame = (VideoFrame*)malloc(sizeof(VideoFrame) + frame_size); + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + frame_size, 8); frame->shape = shape; - memset(frame->data, 0, 64 * 48 * 2); + memset(frame->data, 0, frame_size); for (auto i = 0; i < 6 * 8 * 5 * 2; ++i) { frame->frame_id = i; @@ -664,20 +674,27 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u8, }; - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(array_spec, thread_pool); + zarr::ZarrV3Writer writer(config, thread_pool); frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48; + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + 64 * 48, 8); frame->shape = shape; memset(frame->data, 0, 64 * 48); @@ -756,6 +773,12 @@ extern "C" .width = 64, .height = 48, }, + .strides = { + .channels = 1, + .width = 1, + .height = 64, + .planes = 64 * 48 + }, .type = SampleType_u8, }; @@ -781,17 +804,18 @@ extern "C" 5, // 5 timepoints / chunk 2); // 2 chunks / shard - zarr::ArrayConfig array_spec = { + zarr::WriterConfig config = { .image_shape = shape, .dimensions = dims, .data_root = base_dir.string(), .compression_params = std::nullopt, }; - zarr::ZarrV3Writer writer(array_spec, thread_pool); + zarr::ZarrV3Writer writer(config, thread_pool); frame = (VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48); - frame->bytes_of_frame = sizeof(VideoFrame) + 64 * 48; + frame->bytes_of_frame = + common::align_up(sizeof(VideoFrame) + 64 * 48, 8); frame->shape = shape; memset(frame->data, 0, 64 * 48); diff --git a/src/writers/zarrv3.writer.hh b/src/writers/zarrv3.writer.hh index 37961d20..4243b79e 100644 --- a/src/writers/zarrv3.writer.hh +++ b/src/writers/zarrv3.writer.hh @@ -20,7 +20,7 @@ struct ZarrV3Writer final : public Writer { public: ZarrV3Writer() = delete; - ZarrV3Writer(const ArrayConfig& array_spec, + ZarrV3Writer(const WriterConfig& array_spec, std::shared_ptr thread_pool); ~ZarrV3Writer() override = default; diff --git a/src/zarr.cpp b/src/zarr.cpp index 388c8106..6cc0264e 100644 --- a/src/zarr.cpp +++ b/src/zarr.cpp @@ -153,8 +153,7 @@ scale_image(const VideoFrame* src) dst->shape.strides.height * dst->shape.dims.height; dst->bytes_of_frame = - dst->shape.dims.planes * dst->shape.strides.planes * sizeof(T) + - sizeof(*dst); + common::align_up(bytes_of_image(&dst->shape) + sizeof(*dst), 8); const auto* src_img = (T*)src->data; auto* dst_img = (T*)dst->data; @@ -190,8 +189,8 @@ average_two_frames(VideoFrame* dst, const VideoFrame* src) CHECK(src); CHECK(dst->bytes_of_frame == src->bytes_of_frame); - const auto bytes_of_image = dst->bytes_of_frame - sizeof(*dst); - const auto num_pixels = bytes_of_image / sizeof(T); + const auto nbytes_image = bytes_of_image(&dst->shape); + const auto num_pixels = nbytes_image / sizeof(T); for (auto i = 0; i < num_pixels; ++i) { dst->data[i] = (T)(0.5f * ((float)dst->data[i] + (float)src->data[i])); } @@ -346,7 +345,7 @@ zarr::Zarr::set(const StorageProperties* props) validate_props(props); // TODO (aliddell): we will eventually support S3 URIs, // dataset_root_ should be a string - dataset_root_ = as_path(*props); + dataset_root_ = as_path(*props).string(); if (props->external_metadata_json.str) { external_metadata_json_ = props->external_metadata_json.str; @@ -372,8 +371,6 @@ zarr::Zarr::get(StorageProperties* props) const CHECK(props); storage_properties_destroy(props); - const std::string dataset_root = dataset_root_.string(); - std::string uri; if (!dataset_root_.empty()) { fs::path dataset_root_abs = fs::absolute(dataset_root_); @@ -441,10 +438,7 @@ zarr::Zarr::start() allocate_writers_(); - if (!dataset_root_.string().starts_with("s3://")) { - make_metadata_sinks_(); - } - + make_metadata_sinks_(); write_fixed_metadata_(); state = DeviceState_Running; @@ -463,11 +457,6 @@ zarr::Zarr::stop() noexcept try { // must precede close of chunk file write_mutable_metadata_(); - for (Sink* sink_ : metadata_sinks_) { - if (auto* sink = dynamic_cast(sink_)) { - sink_close(sink); - } - } metadata_sinks_.clear(); for (auto& writer : writers_) { @@ -484,8 +473,8 @@ zarr::Zarr::stop() noexcept // should be empty, but just in case for (auto& [_, frame] : scaled_frames_) { - if (frame.has_value() && frame.value()) { - free(frame.value()); + if (frame && *frame) { + free(*frame); } } scaled_frames_.clear(); @@ -702,7 +691,7 @@ void test_average_frame_inner(const SampleType& stype) { auto* src = (VideoFrame*)malloc(sizeof(VideoFrame) + 9 * sizeof(T)); - src->bytes_of_frame = sizeof(*src) + 9 * sizeof(T); + src->bytes_of_frame = common::align_up(sizeof(*src) + 9 * sizeof(T), 8); src->shape = { .dims = { .channels = 1, diff --git a/src/zarr.hh b/src/zarr.hh index e104854f..70c859ee 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -42,7 +42,7 @@ struct Zarr : public Storage std::optional blosc_compression_params_; /// changes on set - fs::path dataset_root_; + std::string dataset_root_; std::string external_metadata_json_; PixelScale pixel_scale_um_; bool enable_multiscale_; @@ -57,7 +57,7 @@ struct Zarr : public Storage std::unordered_map> scaled_frames_; // changes on flush - std::vector metadata_sinks_; + std::vector> metadata_sinks_; /// Multithreading std::shared_ptr thread_pool_; @@ -72,16 +72,7 @@ struct Zarr : public Storage virtual void allocate_writers_() = 0; /// Metadata - virtual std::vector make_metadata_sink_paths_() = 0; - - template - void make_metadata_sinks_() - { - const auto metadata_sink_paths = make_metadata_sink_paths_(); - SinkCreatorT creator(thread_pool_); - CHECK( - creator.create_metadata_sinks(metadata_sink_paths, metadata_sinks_)); - } + virtual void make_metadata_sinks_() = 0; // fixed metadata void write_fixed_metadata_() const; diff --git a/src/zarr.v2.cpp b/src/zarr.v2.cpp index 7d3bd6f6..f877c628 100644 --- a/src/zarr.v2.cpp +++ b/src/zarr.v2.cpp @@ -1,5 +1,6 @@ #include "zarr.v2.hh" #include "writers/zarrv2.writer.hh" +#include "writers/sink.creator.hh" #include "nlohmann/json.hpp" @@ -42,16 +43,17 @@ zarr::ZarrV2::allocate_writers_() { writers_.clear(); - ArrayConfig config = { + WriterConfig config = { .image_shape = image_shape_, .dimensions = acquisition_dimensions_, - .data_root = (dataset_root_ / "0").string(), + .data_root = dataset_root_ + "/0", .compression_params = blosc_compression_params_, }; + writers_.push_back(std::make_shared(config, thread_pool_)); if (enable_multiscale_) { - ArrayConfig downsampled_config; + WriterConfig downsampled_config; bool do_downsample = true; int level = 1; @@ -67,21 +69,12 @@ zarr::ZarrV2::allocate_writers_() } } -std::vector -zarr::ZarrV2::make_metadata_sink_paths_() +void +zarr::ZarrV2::make_metadata_sinks_() { - std::vector metadata_sink_paths = { - (dataset_root_ / ".metadata").string(), // base metadata - (dataset_root_ / "0" / ".zattrs").string(), // external metadata - (dataset_root_ / ".zattrs").string(), // group metadata - }; - - for (auto i = 0; i < writers_.size(); ++i) { - metadata_sink_paths.push_back( - (dataset_root_ / std::to_string(i) / ".zarray").string()); - } - - return metadata_sink_paths; + SinkCreator creator(thread_pool_); + CHECK(creator.create_v2_metadata_sinks( + dataset_root_, writers_.size(), metadata_sinks_)); } void @@ -93,7 +86,9 @@ zarr::ZarrV2::write_base_metadata_() const const json metadata = { { "zarr_format", 2 } }; const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(0); + CHECK(!metadata_sinks_.empty()); + const std::unique_ptr& sink = metadata_sinks_.at(0); + CHECK(sink); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } @@ -103,7 +98,6 @@ zarr::ZarrV2::write_external_metadata_() const namespace fs = std::filesystem; using json = nlohmann::json; - std::string zattrs_path = (dataset_root_ / "0" / ".zattrs").string(); std::string metadata_str = external_metadata_json_.empty() ? "{}" : json::parse(external_metadata_json_, @@ -113,7 +107,9 @@ zarr::ZarrV2::write_external_metadata_() const ) .dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(1); + CHECK(metadata_sinks_.size() > 1); + const std::unique_ptr& sink = metadata_sinks_.at(1); + CHECK(sink); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } @@ -217,7 +213,9 @@ zarr::ZarrV2::write_group_metadata_() const const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(2); + CHECK(metadata_sinks_.size() > 2); + const std::unique_ptr& sink = metadata_sinks_.at(2); + CHECK(sink); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } @@ -230,7 +228,7 @@ zarr::ZarrV2::write_array_metadata_(size_t level) const CHECK(level < writers_.size()); const auto& writer = writers_.at(level); - const ArrayConfig& config = writer->config(); + const WriterConfig& config = writer->config(); const auto& image_shape = config.image_shape; std::vector array_shape; @@ -257,15 +255,17 @@ zarr::ZarrV2::write_array_metadata_(size_t level) const metadata["filters"] = nullptr; metadata["dimension_separator"] = "/"; - if (config.compression_params.has_value()) { - metadata["compressor"] = config.compression_params.value(); + if (config.compression_params) { + metadata["compressor"] = *config.compression_params; } else { metadata["compressor"] = nullptr; } const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(3 + level); + CHECK(metadata_sinks_.size() > 3 + level); + const std::unique_ptr& sink = metadata_sinks_.at(3 + level); + CHECK(sink); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } diff --git a/src/zarr.v2.hh b/src/zarr.v2.hh index b07acfc7..dde6b257 100644 --- a/src/zarr.v2.hh +++ b/src/zarr.v2.hh @@ -19,7 +19,7 @@ struct ZarrV2 final : public Zarr void allocate_writers_() override; /// Metadata - std::vector make_metadata_sink_paths_() override; + void make_metadata_sinks_() override; // fixed metadata void write_base_metadata_() const override; diff --git a/src/zarr.v3.cpp b/src/zarr.v3.cpp index 34045054..311753e5 100644 --- a/src/zarr.v3.cpp +++ b/src/zarr.v3.cpp @@ -1,5 +1,6 @@ #include "zarr.v3.hh" #include "writers/zarrv3.writer.hh" +#include "writers/sink.creator.hh" #include "nlohmann/json.hpp" @@ -35,16 +36,17 @@ zarr::ZarrV3::allocate_writers_() { writers_.clear(); - ArrayConfig config = { + WriterConfig config = { .image_shape = image_shape_, .dimensions = acquisition_dimensions_, - .data_root = (dataset_root_ / "data" / "root" / "0").string(), + .data_root = dataset_root_ + "/data/root/0", .compression_params = blosc_compression_params_, }; + writers_.push_back(std::make_shared(config, thread_pool_)); if (enable_multiscale_) { - ArrayConfig downsampled_config; + WriterConfig downsampled_config; bool do_downsample = true; int level = 1; @@ -68,22 +70,13 @@ zarr::ZarrV3::get_meta(StoragePropertyMetadata* meta) const meta->multiscale_is_supported = 0; } -std::vector -zarr::ZarrV3::make_metadata_sink_paths_() +void +zarr::ZarrV3::make_metadata_sinks_() { - std::vector metadata_sink_paths; - metadata_sink_paths.push_back((dataset_root_ / "zarr.json").string()); - metadata_sink_paths.push_back( - (dataset_root_ / "meta" / "root.group.json").string()); - for (auto i = 0; i < writers_.size(); ++i) { - metadata_sink_paths.push_back((dataset_root_ / "meta" / "root" / - (std::to_string(i) + ".array.json")) - .string()); - } - - return metadata_sink_paths; + SinkCreator creator(thread_pool_); + CHECK(creator.create_v3_metadata_sinks( + dataset_root_, writers_.size(), metadata_sinks_)); } - /// @brief Write the metadata for the dataset. void zarr::ZarrV3::write_base_metadata_() const @@ -100,7 +93,9 @@ zarr::ZarrV3::write_base_metadata_() const const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(0); + CHECK(!metadata_sinks_.empty()); + const std::unique_ptr& sink = metadata_sinks_.at(0); + CHECK(sink); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } @@ -133,7 +128,8 @@ zarr::ZarrV3::write_group_metadata_() const const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(1); + CHECK(metadata_sinks_.size() > 1); + const std::unique_ptr& sink = metadata_sinks_.at(1); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } @@ -146,7 +142,7 @@ zarr::ZarrV3::write_array_metadata_(size_t level) const CHECK(level < writers_.size()); const auto& writer = writers_.at(level); - const ArrayConfig& config = writer->config(); + const WriterConfig& config = writer->config(); const auto& image_shape = config.image_shape; json metadata; @@ -184,8 +180,8 @@ zarr::ZarrV3::write_array_metadata_(size_t level) const metadata["fill_value"] = 0; metadata["shape"] = array_shape; - if (config.compression_params.has_value()) { - const auto params = config.compression_params.value(); + if (config.compression_params) { + const auto params = *config.compression_params; metadata["compressor"] = json::object({ { "codec", "https://purl.org/zarr/spec/codec/blosc/1.0" }, { "configuration", @@ -214,7 +210,8 @@ zarr::ZarrV3::write_array_metadata_(size_t level) const const std::string metadata_str = metadata.dump(4); const auto* metadata_bytes = (const uint8_t*)metadata_str.c_str(); - Sink* sink = metadata_sinks_.at(2 + level); + CHECK(metadata_sinks_.size() > 2 + level); + const std::unique_ptr& sink = metadata_sinks_.at(2 + level); CHECK(sink->write(0, metadata_bytes, metadata_str.size())); } diff --git a/src/zarr.v3.hh b/src/zarr.v3.hh index 319dee01..6107fbd5 100644 --- a/src/zarr.v3.hh +++ b/src/zarr.v3.hh @@ -19,7 +19,7 @@ struct ZarrV3 final : public Zarr void allocate_writers_() override; /// Metadata - std::vector make_metadata_sink_paths_() override; + void make_metadata_sinks_() override; // fixed metadata void write_base_metadata_() const override; diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 00000000..75529f33 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,10 @@ +# Testing the Zarr driver + +To run the tests, you can run: + +```bash +ctest -L acquire-driver-zarr --output-on-failure +``` + +You can disable unit tests by setting `-DNO_UNIT_TESTS=ON` when configuring the project. +You can disable testing altogether by setting `-DNOTEST=ON`. diff --git a/tests/unit-tests.cpp b/tests/unit-tests.cpp index 32b13be8..2fd615fe 100644 --- a/tests/unit-tests.cpp +++ b/tests/unit-tests.cpp @@ -83,8 +83,8 @@ main() const std::vector tests{ #define CASE(e) { .name = #e, .test = (int (*)())lib_load(&lib, #e) } CASE(unit_test__average_frame), - CASE(unit_test__file_creator__create_chunk_sinks), - CASE(unit_test__file_creator__create_shard_sinks), + CASE(unit_test__sink_creator__create_chunk_file_sinks), + CASE(unit_test__sink_creator__create_shard_file_sinks), CASE(unit_test__chunk_lattice_index), CASE(unit_test__tile_group_offset), CASE(unit_test__chunk_internal_offset),