From f0f767dc25c39dd1cfa1836f081e6d231bb8632f Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Wed, 10 Jul 2024 14:36:53 -0400 Subject: [PATCH] 238, part 3: Separate common classes and utilities. Fix includes. Move tests. (#264) - Moves classes and functions from common.hh/common.cpp to separate files in src/common - Updates header includes accordingly - Move shard index tests to common/utilities.cpp - Adds a thread pool unit test. Depends on #262. --- src/CMakeLists.txt | 8 +- src/common.cpp | 216 --------------- src/common.hh | 149 ----------- src/common/dimension.cpp | 79 ++++++ src/common/dimension.hh | 26 ++ src/common/macros.hh | 20 ++ src/common/thread.pool.cpp | 157 +++++++++++ src/common/thread.pool.hh | 50 ++++ src/common/utilities.cpp | 485 ++++++++++++++++++++++++++++++++++ src/common/utilities.hh | 110 ++++++++ src/writers/file.sink.cpp | 2 + src/writers/file.sink.hh | 1 + src/writers/sink.creator.cpp | 1 + src/writers/sink.creator.hh | 3 +- src/writers/sink.hh | 8 +- src/writers/writer.cpp | 2 +- src/writers/writer.hh | 3 +- src/writers/zarrv2.writer.cpp | 2 +- src/writers/zarrv3.writer.cpp | 353 +------------------------ src/zarr.hh | 3 +- tests/unit-tests.cpp | 4 +- 21 files changed, 954 insertions(+), 728 deletions(-) delete mode 100644 src/common.cpp delete mode 100644 src/common.hh create mode 100644 src/common/dimension.cpp create mode 100644 src/common/dimension.hh create mode 100644 src/common/macros.hh create mode 100644 src/common/thread.pool.cpp create mode 100644 src/common/thread.pool.hh create mode 100644 src/common/utilities.cpp create mode 100644 src/common/utilities.hh diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a74ce85..0ed837ea 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,8 +4,12 @@ endif () set(tgt acquire-driver-zarr) add_library(${tgt} MODULE - common.hh - common.cpp + common/dimension.hh + common/dimension.cpp + common/thread.pool.hh + common/thread.pool.cpp + common/utilities.hh + common/utilities.cpp writers/sink.hh writers/sink.creator.hh writers/sink.creator.cpp diff --git a/src/common.cpp b/src/common.cpp deleted file mode 100644 index 597f8764..00000000 --- a/src/common.cpp +++ /dev/null @@ -1,216 +0,0 @@ -#include "common.hh" - -#include -#include - -namespace zarr = acquire::sink::zarr; -namespace common = zarr::common; - -zarr::Dimension::Dimension(const std::string& name, - DimensionType kind, - uint32_t array_size_px, - uint32_t chunk_size_px, - uint32_t shard_size_chunks) - : name{ name } - , kind{ kind } - , array_size_px{ array_size_px } - , chunk_size_px{ chunk_size_px } - , shard_size_chunks{ shard_size_chunks } -{ - EXPECT(kind < DimensionTypeCount, "Invalid dimension type."); - EXPECT(!name.empty(), "Dimension name cannot be empty."); -} - -zarr::Dimension::Dimension(const StorageDimension& dim) - : Dimension(dim.name.str, - dim.kind, - dim.array_size_px, - dim.chunk_size_px, - dim.shard_size_chunks) -{ -} - -common::ThreadPool::ThreadPool(size_t n_threads, - std::function err) - : error_handler_{ err } - , is_accepting_jobs_{ true } -{ - n_threads = std::clamp( - n_threads, - (size_t)1, - (size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1)); - - for (auto i = 0; i < n_threads; ++i) { - threads_.emplace_back([this] { thread_worker_(); }); - } -} - -common::ThreadPool::~ThreadPool() noexcept -{ - { - std::scoped_lock lock(jobs_mutex_); - while (!jobs_.empty()) { - jobs_.pop(); - } - } - - await_stop(); -} - -void -common::ThreadPool::push_to_job_queue(JobT&& job) -{ - std::unique_lock lock(jobs_mutex_); - CHECK(is_accepting_jobs_); - - jobs_.push(std::move(job)); - lock.unlock(); - - cv_.notify_one(); -} - -void -common::ThreadPool::await_stop() noexcept -{ - { - std::scoped_lock lock(jobs_mutex_); - is_accepting_jobs_ = false; - } - - cv_.notify_all(); - - // spin down threads - for (auto& thread : threads_) { - if (thread.joinable()) { - thread.join(); - } - } -} - -std::optional -common::ThreadPool::pop_from_job_queue_() noexcept -{ - if (jobs_.empty()) { - return std::nullopt; - } - - auto job = std::move(jobs_.front()); - jobs_.pop(); - return job; -} - -bool -common::ThreadPool::should_stop_() const noexcept -{ - return !is_accepting_jobs_ && jobs_.empty(); -} - -void -common::ThreadPool::thread_worker_() -{ - TRACE("Worker thread starting."); - - while (true) { - std::unique_lock lock(jobs_mutex_); - cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); }); - - if (should_stop_()) { - break; - } - - if (auto job = pop_from_job_queue_(); job.has_value()) { - lock.unlock(); - if (std::string err_msg; !job.value()(err_msg)) { - error_handler_(err_msg); - } - } - } - - TRACE("Worker thread exiting."); -} - -size_t -common::chunks_along_dimension(const Dimension& dimension) -{ - EXPECT(dimension.chunk_size_px > 0, "Invalid chunk_size size."); - - return (dimension.array_size_px + dimension.chunk_size_px - 1) / - dimension.chunk_size_px; -} - -size_t -common::shards_along_dimension(const Dimension& dimension) -{ - const size_t shard_size = dimension.shard_size_chunks; - if (shard_size == 0) { - return 0; - } - - const size_t n_chunks = chunks_along_dimension(dimension); - return (n_chunks + shard_size - 1) / shard_size; -} - -size_t -common::number_of_chunks_in_memory(const std::vector& dimensions) -{ - size_t n_chunks = 1; - for (auto i = 0; i < dimensions.size() - 1; ++i) { - n_chunks *= chunks_along_dimension(dimensions[i]); - } - - return n_chunks; -} - -size_t -common::number_of_shards(const std::vector& dimensions) -{ - size_t n_shards = 1; - for (auto i = 0; i < dimensions.size() - 1; ++i) { - const auto& dim = dimensions.at(i); - n_shards *= shards_along_dimension(dim); - } - - return n_shards; -} - -size_t -common::chunks_per_shard(const std::vector& dimensions) -{ - size_t n_chunks = 1; - for (const auto& dim : dimensions) { - n_chunks *= dim.shard_size_chunks; - } - - return n_chunks; -} - -size_t -common::bytes_per_chunk(const std::vector& dimensions, - const SampleType& type) -{ - auto n_bytes = bytes_of_type(type); - for (const auto& d : dimensions) { - n_bytes *= d.chunk_size_px; - } - - return n_bytes; -} - -const char* -common::sample_type_to_string(SampleType t) noexcept -{ - static const char* table[] = { "u8", "u16", "i8", "i16", - "f32", "u16", "u16", "u16" }; - if (t < countof(table)) { - return table[t]; - } else { - return "unrecognized pixel type"; - } -} - -size_t -common::align_up(size_t n, size_t align) -{ - EXPECT(align > 0, "Alignment must be greater than zero."); - return align * ((n + align - 1) / align); -} diff --git a/src/common.hh b/src/common.hh deleted file mode 100644 index b1afe285..00000000 --- a/src/common.hh +++ /dev/null @@ -1,149 +0,0 @@ -#ifndef H_ACQUIRE_STORAGE_ZARR_COMMON_V0 -#define H_ACQUIRE_STORAGE_ZARR_COMMON_V0 - -#include "logger.h" -#include "device/props/components.h" -#include "device/props/storage.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) -#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) -#define EXPECT(e, ...) \ - do { \ - if (!(e)) { \ - LOGE(__VA_ARGS__); \ - throw std::runtime_error("Expression was false: " #e); \ - } \ - } while (0) -#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) - -// #define TRACE(...) LOG(__VA_ARGS__) -#define TRACE(...) - -#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V))) -#define countof(e) (sizeof(e) / sizeof(*(e))) - -namespace fs = std::filesystem; - -namespace acquire::sink::zarr { -struct Dimension -{ - public: - explicit Dimension(const std::string& name, - DimensionType kind, - uint32_t array_size_px, - uint32_t chunk_size_px, - uint32_t shard_size_chunks); - explicit Dimension(const StorageDimension& dim); - - const std::string name; - const DimensionType kind; - const uint32_t array_size_px; - const uint32_t chunk_size_px; - const uint32_t shard_size_chunks; -}; - -struct Zarr; - -namespace common { -struct ThreadPool final -{ - public: - using JobT = std::function; - - // The error handler `err` is called when a job returns false. This - // can happen when the job encounters an error, or otherwise fails. The - // std::string& argument to the error handler is a diagnostic message from - // the failing job and is logged to the error stream by the Zarr driver when - // the next call to `append()` is made. - ThreadPool(size_t n_threads, std::function err); - ~ThreadPool() noexcept; - - void push_to_job_queue(JobT&& job); - - /** - * @brief Block until all jobs on the queue have processed, then spin down - * the threads. - * @note After calling this function, the job queue no longer accepts jobs. - */ - void await_stop() noexcept; - - private: - std::function error_handler_; - - std::vector threads_; - mutable std::mutex jobs_mutex_; - std::condition_variable cv_; - std::queue jobs_; - - std::atomic is_accepting_jobs_; - - std::optional pop_from_job_queue_() noexcept; - [[nodiscard]] bool should_stop_() const noexcept; - void thread_worker_(); -}; - -/// @brief Get the number of chunks along a dimension. -/// @param dimension A dimension. -/// @return The number of, possibly ragged, chunks along the dimension, given -/// the dimension's array and chunk sizes. -size_t -chunks_along_dimension(const Dimension& dimension); - -/// @brief Get the number of chunks to hold in memory. -/// @param dimensions The dimensions of the array. -/// @return The number of chunks to buffer before writing out. -size_t -number_of_chunks_in_memory(const std::vector& dimensions); - -/// @brief Get the number of shards along a dimension. -/// @param dimension A dimension. -/// @return The number of shards along the dimension, given the dimension's -/// array, chunk, and shard sizes. -size_t -shards_along_dimension(const Dimension& dimension); - -/// @brief Get the number of shards to write at one time. -/// @param dimensions The dimensions of the array. -/// @return The number of shards to buffer and write out. -size_t -number_of_shards(const std::vector& dimensions); - -/// @brief Get the number of chunks in a single shard. -/// @param dimensions The dimensions of the array. -/// @return The number of chunks in a shard. -size_t -chunks_per_shard(const std::vector& dimensions); - -/// @brief Get the size, in bytes, of a single chunk. -/// @param dimensions The dimensions of the array. -/// @param dtype The pixel type of the array. -/// @return The number of bytes to allocate for a chunk. -size_t -bytes_per_chunk(const std::vector& dimensions, - const SampleType& dtype); - -/// @brief Get a string representation of the SampleType enum. -/// @param t An enumerated sample type. -/// @return A human-readable representation of the SampleType @par t. -const char* -sample_type_to_string(SampleType t) noexcept; - -/// @brief Align a size to a given alignment. -/// @param n Size to align. -/// @param align Alignment. -/// @return Aligned size. -size_t -align_up(size_t n, size_t align); -} // namespace acquire::sink::zarr::common -} // namespace acquire::sink::zarr - -#endif // H_ACQUIRE_STORAGE_ZARR_COMMON_V0 diff --git a/src/common/dimension.cpp b/src/common/dimension.cpp new file mode 100644 index 00000000..e724acdd --- /dev/null +++ b/src/common/dimension.cpp @@ -0,0 +1,79 @@ +#include "dimension.hh" + +#include + +namespace zarr = acquire::sink::zarr; + +namespace { +inline std::string +trim(const std::string& s) +{ + // trim left + std::string trimmed = s; + trimmed.erase(trimmed.begin(), + std::find_if(trimmed.begin(), trimmed.end(), [](char c) { + return !std::isspace(c); + })); + + // trim right + trimmed.erase(std::find_if(trimmed.rbegin(), + trimmed.rend(), + [](char c) { return !std::isspace(c); }) + .base(), + trimmed.end()); + + return trimmed; +} +} // namespace + +zarr::Dimension::Dimension(const std::string& name, + DimensionType kind, + uint32_t array_size_px, + uint32_t chunk_size_px, + uint32_t shard_size_chunks) + : name{ trim(name) } + , kind{ kind } + , array_size_px{ array_size_px } + , chunk_size_px{ chunk_size_px } + , shard_size_chunks{ shard_size_chunks } +{ + EXPECT(kind < DimensionTypeCount, "Invalid dimension type."); + EXPECT(!name.empty(), "Dimension name cannot be empty."); +} + +zarr::Dimension::Dimension(const StorageDimension& dim) + : Dimension(dim.name.str, + dim.kind, + dim.array_size_px, + dim.chunk_size_px, + dim.shard_size_chunks) +{ +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif // _WIN32 + +extern "C" acquire_export int +unit_test__trim() +{ + try { + EXPECT(trim(" foo") == "foo", "Failed to trim left."); + EXPECT(trim("foo ") == "foo", "Failed to trim right."); + EXPECT(trim(" foo ") == "foo", "Failed to trim both."); + EXPECT(trim("foo") == "foo", "Failed to trim either."); + EXPECT(trim("").empty(), "Failed to trim empty."); + return 1; + } catch (const std::exception& e) { + LOGE("Exception: %s", e.what()); + } catch (...) { + LOGE("Unknown exception"); + } + + return 0; +} + +#endif // NO_UNIT_TESTS diff --git a/src/common/dimension.hh b/src/common/dimension.hh new file mode 100644 index 00000000..7cf14188 --- /dev/null +++ b/src/common/dimension.hh @@ -0,0 +1,26 @@ +#pragma once + +#include "macros.hh" +#include "device/props/storage.h" + +#include + +namespace acquire::sink::zarr { +struct Dimension +{ + public: + explicit Dimension(const std::string& name, + DimensionType kind, + unsigned int array_size_px, + unsigned int chunk_size_px, + unsigned int shard_size_chunks); + + explicit Dimension(const StorageDimension& dim); + + const std::string name; + const DimensionType kind; + const unsigned int array_size_px; + const unsigned int chunk_size_px; + const unsigned int shard_size_chunks; +}; +} // namespace acquire::sink::zarr diff --git a/src/common/macros.hh b/src/common/macros.hh new file mode 100644 index 00000000..b4a5c263 --- /dev/null +++ b/src/common/macros.hh @@ -0,0 +1,20 @@ +#pragma once + +#include "logger.h" +#include + +#define LOG(...) aq_logger(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define LOGE(...) aq_logger(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__) +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + LOGE(__VA_ARGS__); \ + throw std::runtime_error("Expression was false: " #e); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) + +#define TRACE(...) + +#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V))) +#define countof(e) (sizeof(e) / sizeof(*(e))) \ No newline at end of file diff --git a/src/common/thread.pool.cpp b/src/common/thread.pool.cpp new file mode 100644 index 00000000..0071fbff --- /dev/null +++ b/src/common/thread.pool.cpp @@ -0,0 +1,157 @@ +#include "thread.pool.hh" +#include "utilities.hh" + +namespace zarr = acquire::sink::zarr; +namespace common = zarr::common; + +common::ThreadPool::ThreadPool(unsigned int n_threads, + std::function err) + : error_handler_{ err } + , is_accepting_jobs_{ true } +{ + n_threads = std::clamp( + n_threads, 1u, std::max(std::thread::hardware_concurrency(), 1u)); + + for (auto i = 0; i < n_threads; ++i) { + threads_.emplace_back([this] { thread_worker_(); }); + } +} + +common::ThreadPool::~ThreadPool() noexcept +{ + { + std::scoped_lock lock(jobs_mutex_); + while (!jobs_.empty()) { + jobs_.pop(); + } + } + + await_stop(); +} + +void +common::ThreadPool::push_to_job_queue(JobT&& job) +{ + std::unique_lock lock(jobs_mutex_); + CHECK(is_accepting_jobs_); + jobs_.push(std::move(job)); + + cv_.notify_one(); +} + +void +common::ThreadPool::await_stop() noexcept +{ + { + std::scoped_lock lock(jobs_mutex_); + is_accepting_jobs_ = false; + + cv_.notify_all(); + } + + // spin down threads + for (auto& thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } +} + +std::optional +common::ThreadPool::pop_from_job_queue_() noexcept +{ + if (jobs_.empty()) { + return std::nullopt; + } + + auto job = std::move(jobs_.front()); + jobs_.pop(); + return job; +} + +bool +common::ThreadPool::should_stop_() const noexcept +{ + return !is_accepting_jobs_ && jobs_.empty(); +} + +void +common::ThreadPool::thread_worker_() +{ + TRACE("Worker thread starting."); + + while (true) { + std::unique_lock lock(jobs_mutex_); + cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); }); + + if (should_stop_()) { + break; + } + + if (auto job = pop_from_job_queue_(); job.has_value()) { + lock.unlock(); + if (std::string err_msg; !job.value()(err_msg)) { + error_handler_(err_msg); + } + } + } + + TRACE("Worker thread exiting."); +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif // _WIN32 + +#include +#include +#include + +namespace fs = std::filesystem; + +extern "C" +{ + acquire_export int unit_test__thread_pool__push_to_job_queue() + { + int retval = 0; + + fs::path tmp_path = fs::temp_directory_path() / "tmp_file"; + + try { + CHECK(!fs::exists(tmp_path)); + + common::ThreadPool pool{ 1, [](const std::string&) {} }; + pool.push_to_job_queue([&tmp_path](std::string&) { + std::ofstream ofs(tmp_path); + ofs << "Hello, Acquire!"; + ofs.close(); + return true; + }); + pool.await_stop(); + + CHECK(fs::exists(tmp_path)); + + retval = 1; + } catch (const std::exception& exc) { + LOGE("Caught exception: %s", exc.what()); + } catch (...) { + LOGE("Caught unknown exception"); + } + + try { + // cleanup + if (fs::exists(tmp_path)) { + fs::remove(tmp_path); + } + } catch (const std::exception& exc) { + LOGE("Caught exception: %s", exc.what()); + retval = 0; + } + + return retval; + } +} +#endif // NO_UNIT_TESTS \ No newline at end of file diff --git a/src/common/thread.pool.hh b/src/common/thread.pool.hh new file mode 100644 index 00000000..a5e24c01 --- /dev/null +++ b/src/common/thread.pool.hh @@ -0,0 +1,50 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_THREAD_POOL_V0 +#define H_ACQUIRE_STORAGE_ZARR_THREAD_POOL_V0 + +#include +#include +#include +#include +#include +#include +#include + +namespace acquire::sink::zarr::common { +struct ThreadPool final +{ + public: + using JobT = std::function; + + // The error handler `err` is called when a job returns false. This + // can happen when the job encounters an error, or otherwise fails. The + // std::string& argument to the error handler is a diagnostic message from + // the failing job and is logged to the error stream by the Zarr driver when + // the next call to `append()` is made. + ThreadPool(unsigned int n_threads, std::function err); + ~ThreadPool() noexcept; + + void push_to_job_queue(JobT&& job); + + /** + * @brief Block until all jobs on the queue have processed, then spin down + * the threads. + * @note After calling this function, the job queue no longer accepts jobs. + */ + void await_stop() noexcept; + + private: + std::function error_handler_; + + std::vector threads_; + mutable std::mutex jobs_mutex_; + std::condition_variable cv_; + std::queue jobs_; + + std::atomic is_accepting_jobs_; + + std::optional pop_from_job_queue_() noexcept; + [[nodiscard]] bool should_stop_() const noexcept; + void thread_worker_(); +}; +} +#endif // H_ACQUIRE_STORAGE_ZARR_THREAD_POOL_V0 diff --git a/src/common/utilities.cpp b/src/common/utilities.cpp new file mode 100644 index 00000000..63b98f94 --- /dev/null +++ b/src/common/utilities.cpp @@ -0,0 +1,485 @@ +#include "utilities.hh" +#include "zarr.hh" + +#include "platform.h" +#include "thread.pool.hh" + +#include +#include + +namespace zarr = acquire::sink::zarr; +namespace common = zarr::common; + +size_t +common::chunks_along_dimension(const Dimension& dimension) +{ + EXPECT(dimension.chunk_size_px > 0, "Invalid chunk_size size."); + + return (dimension.array_size_px + dimension.chunk_size_px - 1) / + dimension.chunk_size_px; +} + +size_t +common::shards_along_dimension(const Dimension& dimension) +{ + const size_t shard_size = dimension.shard_size_chunks; + if (shard_size == 0) { + return 0; + } + + const size_t n_chunks = chunks_along_dimension(dimension); + return (n_chunks + shard_size - 1) / shard_size; +} + +size_t +common::number_of_chunks_in_memory(const std::vector& dimensions) +{ + size_t n_chunks = 1; + for (auto i = 0; i < dimensions.size() - 1; ++i) { + n_chunks *= chunks_along_dimension(dimensions[i]); + } + + return n_chunks; +} + +size_t +common::number_of_shards(const std::vector& dimensions) +{ + size_t n_shards = 1; + for (auto i = 0; i < dimensions.size() - 1; ++i) { + const auto& dim = dimensions.at(i); + n_shards *= shards_along_dimension(dim); + } + + return n_shards; +} + +size_t +common::chunks_per_shard(const std::vector& dimensions) +{ + size_t n_chunks = 1; + for (const auto& dim : dimensions) { + n_chunks *= dim.shard_size_chunks; + } + + return n_chunks; +} + +size_t +common::shard_index_for_chunk(size_t chunk_index, + const std::vector& dimensions) +{ + // make chunk strides + std::vector chunk_strides(1, 1); + for (auto i = 0; i < dimensions.size() - 1; ++i) { + const auto& dim = dimensions.at(i); + chunk_strides.push_back(chunk_strides.back() * + zarr::common::chunks_along_dimension(dim)); + CHECK(chunk_strides.back()); + } + + // get chunk indices + std::vector chunk_lattice_indices; + for (auto i = 0; i < dimensions.size() - 1; ++i) { + chunk_lattice_indices.push_back(chunk_index % chunk_strides.at(i + 1) / + chunk_strides.at(i)); + } + chunk_lattice_indices.push_back(chunk_index / chunk_strides.back()); + + // make shard strides + std::vector shard_strides(1, 1); + for (auto i = 0; i < dimensions.size() - 1; ++i) { + const auto& dim = dimensions.at(i); + shard_strides.push_back(shard_strides.back() * + zarr::common::shards_along_dimension(dim)); + } + + std::vector shard_lattice_indices; + for (auto i = 0; i < dimensions.size(); ++i) { + shard_lattice_indices.push_back(chunk_lattice_indices.at(i) / + dimensions.at(i).shard_size_chunks); + } + + size_t index = 0; + for (auto i = 0; i < dimensions.size(); ++i) { + index += shard_lattice_indices.at(i) * shard_strides.at(i); + } + + return index; +} + +size_t +common::shard_internal_index(size_t chunk_idx, + const std::vector& dimensions) +{ + // make chunk strides + std::vector chunk_strides(1, 1); + for (auto i = 0; i < dimensions.size() - 1; ++i) { + const auto& dim = dimensions.at(i); + chunk_strides.push_back(chunk_strides.back() * + zarr::common::chunks_along_dimension(dim)); + CHECK(chunk_strides.back()); + } + + // get chunk indices + std::vector chunk_lattice_indices; + for (auto i = 0; i < dimensions.size() - 1; ++i) { + chunk_lattice_indices.push_back(chunk_idx % chunk_strides.at(i + 1) / + chunk_strides.at(i)); + } + chunk_lattice_indices.push_back(chunk_idx / chunk_strides.back()); + + // make shard lattice indices + std::vector shard_lattice_indices; + for (auto i = 0; i < dimensions.size(); ++i) { + shard_lattice_indices.push_back(chunk_lattice_indices.at(i) / + dimensions.at(i).shard_size_chunks); + } + + std::vector chunk_internal_strides(1, 1); + for (auto i = 0; i < dimensions.size() - 1; ++i) { + const auto& dim = dimensions.at(i); + chunk_internal_strides.push_back(chunk_internal_strides.back() * + dim.shard_size_chunks); + } + + size_t index = 0; + + for (auto i = 0; i < dimensions.size(); ++i) { + index += + (chunk_lattice_indices.at(i) % dimensions.at(i).shard_size_chunks) * + chunk_internal_strides.at(i); + } + + return index; +} + +size_t +common::bytes_per_chunk(const std::vector& dimensions, + const SampleType& type) +{ + auto n_bytes = bytes_of_type(type); + for (const auto& d : dimensions) { + n_bytes *= d.chunk_size_px; + } + + return n_bytes; +} + +const char* +common::sample_type_to_string(SampleType t) noexcept +{ + switch (t) { + case SampleType_u8: + return "u8"; + case SampleType_u16: + return "u16"; + case SampleType_i8: + return "i8"; + case SampleType_i16: + return "i16"; + case SampleType_f32: + return "f32"; + default: + return "unrecognized pixel type"; + } +} + +size_t +common::align_up(size_t n, size_t align) +{ + EXPECT(align > 0, "Alignment must be greater than zero."); + return align * ((n + align - 1) / align); +} + +std::vector +common::split_uri(const std::string& uri) +{ + const char delim = '/'; + + size_t begin = 0; + auto end = uri.find_first_of(delim); + + std::vector out; + while (end != std::string::npos) { + if (end > begin) { + out.emplace_back(uri.substr(begin, end - begin)); + } + + begin = end + 1; + end = uri.find_first_of(delim, begin); + } + return out; +} + +bool +common::is_s3_uri(const std::string& uri) +{ + return uri.starts_with("s3://") || uri.starts_with("http://") || + uri.starts_with("https://"); +} + +#ifndef NO_UNIT_TESTS +#ifdef _WIN32 +#define acquire_export __declspec(dllexport) +#else +#define acquire_export +#endif + +extern "C" +{ + acquire_export int unit_test__shard_index_for_chunk() + { + int retval = 0; + try { + std::vector dims; + dims.emplace_back("x", + DimensionType_Space, + 64, + 16, // 64 / 16 = 4 chunks + 2); // 4 / 2 = 2 shards + dims.emplace_back("y", + DimensionType_Space, + 48, + 16, // 48 / 16 = 3 chunks + 1); // 3 / 1 = 3 shards + dims.emplace_back("z", + DimensionType_Space, + 6, + 2, // 6 / 2 = 3 chunks + 1); // 3 / 1 = 3 shards + dims.emplace_back("c", + DimensionType_Channel, + 8, + 4, // 8 / 4 = 2 chunks + 2); // 4 / 2 = 2 shards + dims.emplace_back("t", + DimensionType_Time, + 0, + 5, // 5 timepoints / chunk + 2); // 2 chunks / shard + + CHECK(common::shard_index_for_chunk(0, dims) == 0); + CHECK(common::shard_index_for_chunk(1, dims) == 0); + CHECK(common::shard_index_for_chunk(2, dims) == 1); + CHECK(common::shard_index_for_chunk(3, dims) == 1); + CHECK(common::shard_index_for_chunk(4, dims) == 2); + CHECK(common::shard_index_for_chunk(5, dims) == 2); + CHECK(common::shard_index_for_chunk(6, dims) == 3); + CHECK(common::shard_index_for_chunk(7, dims) == 3); + CHECK(common::shard_index_for_chunk(8, dims) == 4); + CHECK(common::shard_index_for_chunk(9, dims) == 4); + CHECK(common::shard_index_for_chunk(10, dims) == 5); + CHECK(common::shard_index_for_chunk(11, dims) == 5); + CHECK(common::shard_index_for_chunk(12, dims) == 6); + CHECK(common::shard_index_for_chunk(13, dims) == 6); + CHECK(common::shard_index_for_chunk(14, dims) == 7); + CHECK(common::shard_index_for_chunk(15, dims) == 7); + CHECK(common::shard_index_for_chunk(16, dims) == 8); + CHECK(common::shard_index_for_chunk(17, dims) == 8); + CHECK(common::shard_index_for_chunk(18, dims) == 9); + CHECK(common::shard_index_for_chunk(19, dims) == 9); + CHECK(common::shard_index_for_chunk(20, dims) == 10); + CHECK(common::shard_index_for_chunk(21, dims) == 10); + CHECK(common::shard_index_for_chunk(22, dims) == 11); + CHECK(common::shard_index_for_chunk(23, dims) == 11); + CHECK(common::shard_index_for_chunk(24, dims) == 12); + CHECK(common::shard_index_for_chunk(25, dims) == 12); + CHECK(common::shard_index_for_chunk(26, dims) == 13); + CHECK(common::shard_index_for_chunk(27, dims) == 13); + CHECK(common::shard_index_for_chunk(28, dims) == 14); + CHECK(common::shard_index_for_chunk(29, dims) == 14); + CHECK(common::shard_index_for_chunk(30, dims) == 15); + CHECK(common::shard_index_for_chunk(31, dims) == 15); + CHECK(common::shard_index_for_chunk(32, dims) == 16); + CHECK(common::shard_index_for_chunk(33, dims) == 16); + CHECK(common::shard_index_for_chunk(34, dims) == 17); + CHECK(common::shard_index_for_chunk(35, dims) == 17); + CHECK(common::shard_index_for_chunk(36, dims) == 0); + CHECK(common::shard_index_for_chunk(37, dims) == 0); + CHECK(common::shard_index_for_chunk(38, dims) == 1); + CHECK(common::shard_index_for_chunk(39, dims) == 1); + CHECK(common::shard_index_for_chunk(40, dims) == 2); + CHECK(common::shard_index_for_chunk(41, dims) == 2); + CHECK(common::shard_index_for_chunk(42, dims) == 3); + CHECK(common::shard_index_for_chunk(43, dims) == 3); + CHECK(common::shard_index_for_chunk(44, dims) == 4); + CHECK(common::shard_index_for_chunk(45, dims) == 4); + CHECK(common::shard_index_for_chunk(46, dims) == 5); + CHECK(common::shard_index_for_chunk(47, dims) == 5); + CHECK(common::shard_index_for_chunk(48, dims) == 6); + CHECK(common::shard_index_for_chunk(49, dims) == 6); + CHECK(common::shard_index_for_chunk(50, dims) == 7); + CHECK(common::shard_index_for_chunk(51, dims) == 7); + CHECK(common::shard_index_for_chunk(52, dims) == 8); + CHECK(common::shard_index_for_chunk(53, dims) == 8); + CHECK(common::shard_index_for_chunk(54, dims) == 9); + CHECK(common::shard_index_for_chunk(55, dims) == 9); + CHECK(common::shard_index_for_chunk(56, dims) == 10); + CHECK(common::shard_index_for_chunk(57, dims) == 10); + CHECK(common::shard_index_for_chunk(58, dims) == 11); + CHECK(common::shard_index_for_chunk(59, dims) == 11); + CHECK(common::shard_index_for_chunk(60, dims) == 12); + CHECK(common::shard_index_for_chunk(61, dims) == 12); + CHECK(common::shard_index_for_chunk(62, dims) == 13); + CHECK(common::shard_index_for_chunk(63, dims) == 13); + CHECK(common::shard_index_for_chunk(64, dims) == 14); + CHECK(common::shard_index_for_chunk(65, dims) == 14); + CHECK(common::shard_index_for_chunk(66, dims) == 15); + CHECK(common::shard_index_for_chunk(67, dims) == 15); + CHECK(common::shard_index_for_chunk(68, dims) == 16); + CHECK(common::shard_index_for_chunk(69, dims) == 16); + CHECK(common::shard_index_for_chunk(70, dims) == 17); + CHECK(common::shard_index_for_chunk(71, dims) == 17); + CHECK(common::shard_index_for_chunk(72, dims) == 0); + CHECK(common::shard_index_for_chunk(73, dims) == 0); + CHECK(common::shard_index_for_chunk(74, dims) == 1); + CHECK(common::shard_index_for_chunk(75, dims) == 1); + CHECK(common::shard_index_for_chunk(76, dims) == 2); + CHECK(common::shard_index_for_chunk(77, dims) == 2); + CHECK(common::shard_index_for_chunk(78, dims) == 3); + CHECK(common::shard_index_for_chunk(79, dims) == 3); + CHECK(common::shard_index_for_chunk(80, dims) == 4); + CHECK(common::shard_index_for_chunk(81, dims) == 4); + CHECK(common::shard_index_for_chunk(82, dims) == 5); + CHECK(common::shard_index_for_chunk(83, dims) == 5); + CHECK(common::shard_index_for_chunk(84, dims) == 6); + CHECK(common::shard_index_for_chunk(85, dims) == 6); + CHECK(common::shard_index_for_chunk(86, dims) == 7); + CHECK(common::shard_index_for_chunk(87, dims) == 7); + CHECK(common::shard_index_for_chunk(88, dims) == 8); + CHECK(common::shard_index_for_chunk(89, dims) == 8); + CHECK(common::shard_index_for_chunk(90, dims) == 9); + CHECK(common::shard_index_for_chunk(91, dims) == 9); + CHECK(common::shard_index_for_chunk(92, dims) == 10); + CHECK(common::shard_index_for_chunk(93, dims) == 10); + CHECK(common::shard_index_for_chunk(94, dims) == 11); + CHECK(common::shard_index_for_chunk(95, dims) == 11); + CHECK(common::shard_index_for_chunk(96, dims) == 12); + CHECK(common::shard_index_for_chunk(97, dims) == 12); + CHECK(common::shard_index_for_chunk(98, dims) == 13); + CHECK(common::shard_index_for_chunk(99, dims) == 13); + CHECK(common::shard_index_for_chunk(100, dims) == 14); + CHECK(common::shard_index_for_chunk(101, dims) == 14); + CHECK(common::shard_index_for_chunk(102, dims) == 15); + CHECK(common::shard_index_for_chunk(103, dims) == 15); + CHECK(common::shard_index_for_chunk(104, dims) == 16); + CHECK(common::shard_index_for_chunk(105, dims) == 16); + CHECK(common::shard_index_for_chunk(106, dims) == 17); + CHECK(common::shard_index_for_chunk(107, dims) == 17); + CHECK(common::shard_index_for_chunk(108, dims) == 0); + CHECK(common::shard_index_for_chunk(109, dims) == 0); + CHECK(common::shard_index_for_chunk(110, dims) == 1); + CHECK(common::shard_index_for_chunk(111, dims) == 1); + CHECK(common::shard_index_for_chunk(112, dims) == 2); + CHECK(common::shard_index_for_chunk(113, dims) == 2); + CHECK(common::shard_index_for_chunk(114, dims) == 3); + CHECK(common::shard_index_for_chunk(115, dims) == 3); + CHECK(common::shard_index_for_chunk(116, dims) == 4); + CHECK(common::shard_index_for_chunk(117, dims) == 4); + CHECK(common::shard_index_for_chunk(118, dims) == 5); + CHECK(common::shard_index_for_chunk(119, dims) == 5); + CHECK(common::shard_index_for_chunk(120, dims) == 6); + CHECK(common::shard_index_for_chunk(121, dims) == 6); + CHECK(common::shard_index_for_chunk(122, dims) == 7); + CHECK(common::shard_index_for_chunk(123, dims) == 7); + CHECK(common::shard_index_for_chunk(124, dims) == 8); + CHECK(common::shard_index_for_chunk(125, dims) == 8); + CHECK(common::shard_index_for_chunk(126, dims) == 9); + CHECK(common::shard_index_for_chunk(127, dims) == 9); + CHECK(common::shard_index_for_chunk(128, dims) == 10); + CHECK(common::shard_index_for_chunk(129, dims) == 10); + CHECK(common::shard_index_for_chunk(130, dims) == 11); + CHECK(common::shard_index_for_chunk(131, dims) == 11); + CHECK(common::shard_index_for_chunk(132, dims) == 12); + CHECK(common::shard_index_for_chunk(133, dims) == 12); + CHECK(common::shard_index_for_chunk(134, dims) == 13); + CHECK(common::shard_index_for_chunk(135, dims) == 13); + CHECK(common::shard_index_for_chunk(136, dims) == 14); + CHECK(common::shard_index_for_chunk(137, dims) == 14); + CHECK(common::shard_index_for_chunk(138, dims) == 15); + CHECK(common::shard_index_for_chunk(139, dims) == 15); + CHECK(common::shard_index_for_chunk(140, dims) == 16); + CHECK(common::shard_index_for_chunk(141, dims) == 16); + CHECK(common::shard_index_for_chunk(142, dims) == 17); + CHECK(common::shard_index_for_chunk(143, dims) == 17); + + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + return retval; + } + + acquire_export int unit_test__shard_internal_index() + { + int retval = 0; + + std::vector dims; + dims.emplace_back("x", + DimensionType_Space, + 1080, + 270, // 4 chunks + 3); // 2 ragged shards + dims.emplace_back("y", + DimensionType_Space, + 960, + 320, // 3 chunks + 2); // 2 ragged shards + dims.emplace_back("t", + DimensionType_Time, + 0, + 32, // 32 timepoints / chunk + 1); // 1 shard + + try { + CHECK(common::shard_index_for_chunk(0, dims) == 0); + CHECK(common::shard_internal_index(0, dims) == 0); + + CHECK(common::shard_index_for_chunk(1, dims) == 0); + CHECK(common::shard_internal_index(1, dims) == 1); + + CHECK(common::shard_index_for_chunk(2, dims) == 0); + CHECK(common::shard_internal_index(2, dims) == 2); + + CHECK(common::shard_index_for_chunk(3, dims) == 1); + CHECK(common::shard_internal_index(3, dims) == 0); + + CHECK(common::shard_index_for_chunk(4, dims) == 0); + CHECK(common::shard_internal_index(4, dims) == 3); + + CHECK(common::shard_index_for_chunk(5, dims) == 0); + CHECK(common::shard_internal_index(5, dims) == 4); + + CHECK(common::shard_index_for_chunk(6, dims) == 0); + CHECK(common::shard_internal_index(6, dims) == 5); + + CHECK(common::shard_index_for_chunk(7, dims) == 1); + CHECK(common::shard_internal_index(7, dims) == 3); + + CHECK(common::shard_index_for_chunk(8, dims) == 2); + CHECK(common::shard_internal_index(8, dims) == 0); + + CHECK(common::shard_index_for_chunk(9, dims) == 2); + CHECK(common::shard_internal_index(9, dims) == 1); + + CHECK(common::shard_index_for_chunk(10, dims) == 2); + CHECK(common::shard_internal_index(10, dims) == 2); + + CHECK(common::shard_index_for_chunk(11, dims) == 3); + CHECK(common::shard_internal_index(11, dims) == 0); + retval = 1; + } catch (const std::exception& exc) { + LOGE("Exception: %s\n", exc.what()); + } catch (...) { + LOGE("Exception: (unknown)"); + } + + return retval; + } +} + +#endif \ No newline at end of file diff --git a/src/common/utilities.hh b/src/common/utilities.hh new file mode 100644 index 00000000..1290c53e --- /dev/null +++ b/src/common/utilities.hh @@ -0,0 +1,110 @@ +#ifndef H_ACQUIRE_STORAGE_ZARR_COMMON_UTILITIES_V0 +#define H_ACQUIRE_STORAGE_ZARR_COMMON_UTILITIES_V0 + +#include "logger.h" +#include "device/props/components.h" +#include "device/props/storage.h" +#include "macros.hh" +#include "dimension.hh" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +namespace acquire::sink::zarr { + +struct Zarr; + +namespace common { + +/// @brief Get the number of chunks along a dimension. +/// @param dimension A dimension. +/// @return The number of, possibly ragged, chunks along the dimension, given +/// the dimension's array and chunk sizes. +size_t +chunks_along_dimension(const Dimension& dimension); + +/// @brief Get the number of chunks to hold in memory. +/// @param dimensions The dimensions of the array. +/// @return The number of chunks to buffer before writing out. +size_t +number_of_chunks_in_memory(const std::vector& dimensions); + +/// @brief Get the number of shards along a dimension. +/// @param dimension A dimension. +/// @return The number of shards along the dimension, given the dimension's +/// array, chunk, and shard sizes. +size_t +shards_along_dimension(const Dimension& dimension); + +/// @brief Get the number of shards to write at one time. +/// @param dimensions The dimensions of the array. +/// @return The number of shards to buffer and write out. +size_t +number_of_shards(const std::vector& dimensions); + +/// @brief Get the number of chunks in a single shard. +/// @param dimensions The dimensions of the array. +/// @return The number of chunks in a shard. +size_t +chunks_per_shard(const std::vector& dimensions); + +/// @brief Get the shard index for a given chunk index, given array dimensions. +/// @param chunk_index The index of the chunk. +/// @param dimensions The dimensions of the array. +/// @return The index of the shard containing the chunk. +size_t +shard_index_for_chunk(size_t chunk_index, + const std::vector& dimensions); + +/// @brief Get the internal index of a chunk within a shard. +/// @param chunk_index The index of the chunk. +/// @param dimensions The dimensions of the array. +/// @return The index of the chunk within the shard. +size_t +shard_internal_index(size_t chunk_index, + const std::vector& dimensions); + +/// @brief Get the size, in bytes, of a single chunk. +/// @param dimensions The dimensions of the array. +/// @param dtype The pixel type of the array. +/// @return The number of bytes to allocate for a chunk. +size_t +bytes_per_chunk(const std::vector& dimensions, + const SampleType& dtype); + +/// @brief Get a string representation of the SampleType enum. +/// @param t An enumerated sample type. +/// @return A human-readable representation of the SampleType @par t. +const char* +sample_type_to_string(SampleType t) noexcept; + +/// @brief Align a size to a given alignment. +/// @param n Size to align. +/// @param align Alignment. +/// @return Aligned size. +size_t +align_up(size_t n, size_t align); + +/// @brief Split a URI by the '/' delimiter. +/// @param uri String to split. +/// @return Vector of strings. +std::vector +split_uri(const std::string& uri); + +/// @brief Check if a URI is an S3 URI. +/// @param uri String to check. +/// @return True if the URI is an S3 URI, false otherwise. +bool +is_s3_uri(const std::string& uri); +} // namespace acquire::sink::zarr::common +} // namespace acquire::sink::zarr + +#endif // H_ACQUIRE_STORAGE_ZARR_COMMON_UTILITIES_V0 diff --git a/src/writers/file.sink.cpp b/src/writers/file.sink.cpp index d580cf5c..d5106dcb 100644 --- a/src/writers/file.sink.cpp +++ b/src/writers/file.sink.cpp @@ -1,4 +1,6 @@ #include "file.sink.hh" +#include "platform.h" +#include "common/utilities.hh" #include diff --git a/src/writers/file.sink.hh b/src/writers/file.sink.hh index 87d98031..871df72c 100644 --- a/src/writers/file.sink.hh +++ b/src/writers/file.sink.hh @@ -4,6 +4,7 @@ #include "sink.hh" #include "platform.h" +#include #include namespace acquire::sink::zarr { diff --git a/src/writers/sink.creator.cpp b/src/writers/sink.creator.cpp index bcde5ae9..b72d0c3a 100644 --- a/src/writers/sink.creator.cpp +++ b/src/writers/sink.creator.cpp @@ -1,5 +1,6 @@ #include "sink.creator.hh" #include "file.sink.hh" +#include "common/utilities.hh" #include #include diff --git a/src/writers/sink.creator.hh b/src/writers/sink.creator.hh index 07d60632..29797db1 100644 --- a/src/writers/sink.creator.hh +++ b/src/writers/sink.creator.hh @@ -2,7 +2,8 @@ #define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_CREATOR_V0 #include "sink.hh" -#include "../common.hh" +#include "common/dimension.hh" +#include "common/thread.pool.hh" #include #include diff --git a/src/writers/sink.hh b/src/writers/sink.hh index e90f4601..4f64db1d 100644 --- a/src/writers/sink.hh +++ b/src/writers/sink.hh @@ -1,12 +1,8 @@ #ifndef H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_V0 #define H_ACQUIRE_STORAGE_ZARR_WRITERS_SINK_V0 -#include -#include -#include -#include - -#include "../common.hh" +#include // uint8_t +#include // size_t namespace acquire::sink::zarr { struct Sink diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index e919d19a..2afa5730 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -1,6 +1,6 @@ #include #include "writer.hh" -#include "../zarr.hh" +#include "common/utilities.hh" #include #include diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 83ea63fb..5b720d01 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -4,7 +4,8 @@ #include "platform.h" #include "device/props/components.h" -#include "../common.hh" +#include "common/dimension.hh" +#include "common/thread.pool.hh" #include "blosc.compressor.hh" #include "file.sink.hh" diff --git a/src/writers/zarrv2.writer.cpp b/src/writers/zarrv2.writer.cpp index af985640..14dacef5 100644 --- a/src/writers/zarrv2.writer.cpp +++ b/src/writers/zarrv2.writer.cpp @@ -1,6 +1,6 @@ #include "zarrv2.writer.hh" #include "sink.creator.hh" -#include "../zarr.hh" +#include "zarr.hh" #include #include diff --git a/src/writers/zarrv3.writer.cpp b/src/writers/zarrv3.writer.cpp index d21d1bd1..527c6527 100644 --- a/src/writers/zarrv3.writer.cpp +++ b/src/writers/zarrv3.writer.cpp @@ -1,104 +1,12 @@ #include "zarrv3.writer.hh" #include "sink.creator.hh" -#include "../zarr.hh" +#include "zarr.hh" #include #include namespace zarr = acquire::sink::zarr; -namespace { -/// @brief Get the shard index for a given chunk index. -size_t -shard_index(size_t chunk_idx, const std::vector& dimensions) -{ - // make chunk strides - std::vector chunk_strides(1, 1); - for (auto i = 0; i < dimensions.size() - 1; ++i) { - const auto& dim = dimensions.at(i); - chunk_strides.push_back(chunk_strides.back() * - zarr::common::chunks_along_dimension(dim)); - CHECK(chunk_strides.back()); - } - - // get chunk indices - std::vector chunk_lattice_indices; - for (auto i = 0; i < dimensions.size() - 1; ++i) { - chunk_lattice_indices.push_back(chunk_idx % chunk_strides.at(i + 1) / - chunk_strides.at(i)); - } - chunk_lattice_indices.push_back(chunk_idx / chunk_strides.back()); - - // make shard strides - std::vector shard_strides(1, 1); - for (auto i = 0; i < dimensions.size() - 1; ++i) { - const auto& dim = dimensions.at(i); - shard_strides.push_back(shard_strides.back() * - zarr::common::shards_along_dimension(dim)); - } - - std::vector shard_lattice_indices; - for (auto i = 0; i < dimensions.size(); ++i) { - shard_lattice_indices.push_back(chunk_lattice_indices.at(i) / - dimensions.at(i).shard_size_chunks); - } - - size_t index = 0; - for (auto i = 0; i < dimensions.size(); ++i) { - index += shard_lattice_indices.at(i) * shard_strides.at(i); - } - - return index; -} - -/// @brief Get the index for a chunk within a shard. -size_t -shard_internal_index(size_t chunk_idx, - const std::vector& dimensions) -{ - // make chunk strides - std::vector chunk_strides(1, 1); - for (auto i = 0; i < dimensions.size() - 1; ++i) { - const auto& dim = dimensions.at(i); - chunk_strides.push_back(chunk_strides.back() * - zarr::common::chunks_along_dimension(dim)); - CHECK(chunk_strides.back()); - } - - // get chunk indices - std::vector chunk_lattice_indices; - for (auto i = 0; i < dimensions.size() - 1; ++i) { - chunk_lattice_indices.push_back(chunk_idx % chunk_strides.at(i + 1) / - chunk_strides.at(i)); - } - chunk_lattice_indices.push_back(chunk_idx / chunk_strides.back()); - - // make shard lattice indices - std::vector shard_lattice_indices; - for (auto i = 0; i < dimensions.size(); ++i) { - shard_lattice_indices.push_back(chunk_lattice_indices.at(i) / - dimensions.at(i).shard_size_chunks); - } - - std::vector chunk_internal_strides(1, 1); - for (auto i = 0; i < dimensions.size() - 1; ++i) { - const auto& dim = dimensions.at(i); - chunk_internal_strides.push_back(chunk_internal_strides.back() * - dim.shard_size_chunks); - } - - size_t index = 0; - - for (auto i = 0; i < dimensions.size(); ++i) { - index += - (chunk_lattice_indices.at(i) % dimensions.at(i).shard_size_chunks) * - chunk_internal_strides.at(i); - } - - return index; -} -} // namespace - zarr::ZarrV3Writer::ZarrV3Writer( const WriterConfig& array_spec, std::shared_ptr thread_pool) @@ -141,7 +49,7 @@ zarr::ZarrV3Writer::flush_impl_() // get shard indices for each chunk std::vector> chunk_in_shards(n_shards); for (auto i = 0; i < chunk_buffers_.size(); ++i) { - const auto index = shard_index(i, config_.dimensions); + const auto index = common::shard_index_for_chunk(i, config_.dimensions); chunk_in_shards.at(index).push_back(i); } @@ -172,8 +80,8 @@ zarr::ZarrV3Writer::flush_impl_() break; } - const auto internal_idx = - shard_internal_index(chunk_idx, config_.dimensions); + const auto internal_idx = common::shard_internal_index( + chunk_idx, config_.dimensions); chunk_table.at(2 * internal_idx) = *file_offset; chunk_table.at(2 * internal_idx + 1) = chunk.size(); @@ -241,259 +149,6 @@ namespace common = zarr::common; extern "C" { - acquire_export int unit_test__shard_index() - { - int retval = 0; - try { - std::vector dims; - dims.emplace_back("x", - DimensionType_Space, - 64, - 16, // 64 / 16 = 4 chunks - 2); // 4 / 2 = 2 shards - dims.emplace_back("y", - DimensionType_Space, - 48, - 16, // 48 / 16 = 3 chunks - 1); // 3 / 1 = 3 shards - dims.emplace_back("z", - DimensionType_Space, - 6, - 2, // 6 / 2 = 3 chunks - 1); // 3 / 1 = 3 shards - dims.emplace_back("c", - DimensionType_Channel, - 8, - 4, // 8 / 4 = 2 chunks - 2); // 4 / 2 = 2 shards - dims.emplace_back("t", - DimensionType_Time, - 0, - 5, // 5 timepoints / chunk - 2); // 2 chunks / shard - - CHECK(shard_index(0, dims) == 0); - CHECK(shard_index(1, dims) == 0); - CHECK(shard_index(2, dims) == 1); - CHECK(shard_index(3, dims) == 1); - CHECK(shard_index(4, dims) == 2); - CHECK(shard_index(5, dims) == 2); - CHECK(shard_index(6, dims) == 3); - CHECK(shard_index(7, dims) == 3); - CHECK(shard_index(8, dims) == 4); - CHECK(shard_index(9, dims) == 4); - CHECK(shard_index(10, dims) == 5); - CHECK(shard_index(11, dims) == 5); - CHECK(shard_index(12, dims) == 6); - CHECK(shard_index(13, dims) == 6); - CHECK(shard_index(14, dims) == 7); - CHECK(shard_index(15, dims) == 7); - CHECK(shard_index(16, dims) == 8); - CHECK(shard_index(17, dims) == 8); - CHECK(shard_index(18, dims) == 9); - CHECK(shard_index(19, dims) == 9); - CHECK(shard_index(20, dims) == 10); - CHECK(shard_index(21, dims) == 10); - CHECK(shard_index(22, dims) == 11); - CHECK(shard_index(23, dims) == 11); - CHECK(shard_index(24, dims) == 12); - CHECK(shard_index(25, dims) == 12); - CHECK(shard_index(26, dims) == 13); - CHECK(shard_index(27, dims) == 13); - CHECK(shard_index(28, dims) == 14); - CHECK(shard_index(29, dims) == 14); - CHECK(shard_index(30, dims) == 15); - CHECK(shard_index(31, dims) == 15); - CHECK(shard_index(32, dims) == 16); - CHECK(shard_index(33, dims) == 16); - CHECK(shard_index(34, dims) == 17); - CHECK(shard_index(35, dims) == 17); - CHECK(shard_index(36, dims) == 0); - CHECK(shard_index(37, dims) == 0); - CHECK(shard_index(38, dims) == 1); - CHECK(shard_index(39, dims) == 1); - CHECK(shard_index(40, dims) == 2); - CHECK(shard_index(41, dims) == 2); - CHECK(shard_index(42, dims) == 3); - CHECK(shard_index(43, dims) == 3); - CHECK(shard_index(44, dims) == 4); - CHECK(shard_index(45, dims) == 4); - CHECK(shard_index(46, dims) == 5); - CHECK(shard_index(47, dims) == 5); - CHECK(shard_index(48, dims) == 6); - CHECK(shard_index(49, dims) == 6); - CHECK(shard_index(50, dims) == 7); - CHECK(shard_index(51, dims) == 7); - CHECK(shard_index(52, dims) == 8); - CHECK(shard_index(53, dims) == 8); - CHECK(shard_index(54, dims) == 9); - CHECK(shard_index(55, dims) == 9); - CHECK(shard_index(56, dims) == 10); - CHECK(shard_index(57, dims) == 10); - CHECK(shard_index(58, dims) == 11); - CHECK(shard_index(59, dims) == 11); - CHECK(shard_index(60, dims) == 12); - CHECK(shard_index(61, dims) == 12); - CHECK(shard_index(62, dims) == 13); - CHECK(shard_index(63, dims) == 13); - CHECK(shard_index(64, dims) == 14); - CHECK(shard_index(65, dims) == 14); - CHECK(shard_index(66, dims) == 15); - CHECK(shard_index(67, dims) == 15); - CHECK(shard_index(68, dims) == 16); - CHECK(shard_index(69, dims) == 16); - CHECK(shard_index(70, dims) == 17); - CHECK(shard_index(71, dims) == 17); - CHECK(shard_index(72, dims) == 0); - CHECK(shard_index(73, dims) == 0); - CHECK(shard_index(74, dims) == 1); - CHECK(shard_index(75, dims) == 1); - CHECK(shard_index(76, dims) == 2); - CHECK(shard_index(77, dims) == 2); - CHECK(shard_index(78, dims) == 3); - CHECK(shard_index(79, dims) == 3); - CHECK(shard_index(80, dims) == 4); - CHECK(shard_index(81, dims) == 4); - CHECK(shard_index(82, dims) == 5); - CHECK(shard_index(83, dims) == 5); - CHECK(shard_index(84, dims) == 6); - CHECK(shard_index(85, dims) == 6); - CHECK(shard_index(86, dims) == 7); - CHECK(shard_index(87, dims) == 7); - CHECK(shard_index(88, dims) == 8); - CHECK(shard_index(89, dims) == 8); - CHECK(shard_index(90, dims) == 9); - CHECK(shard_index(91, dims) == 9); - CHECK(shard_index(92, dims) == 10); - CHECK(shard_index(93, dims) == 10); - CHECK(shard_index(94, dims) == 11); - CHECK(shard_index(95, dims) == 11); - CHECK(shard_index(96, dims) == 12); - CHECK(shard_index(97, dims) == 12); - CHECK(shard_index(98, dims) == 13); - CHECK(shard_index(99, dims) == 13); - CHECK(shard_index(100, dims) == 14); - CHECK(shard_index(101, dims) == 14); - CHECK(shard_index(102, dims) == 15); - CHECK(shard_index(103, dims) == 15); - CHECK(shard_index(104, dims) == 16); - CHECK(shard_index(105, dims) == 16); - CHECK(shard_index(106, dims) == 17); - CHECK(shard_index(107, dims) == 17); - CHECK(shard_index(108, dims) == 0); - CHECK(shard_index(109, dims) == 0); - CHECK(shard_index(110, dims) == 1); - CHECK(shard_index(111, dims) == 1); - CHECK(shard_index(112, dims) == 2); - CHECK(shard_index(113, dims) == 2); - CHECK(shard_index(114, dims) == 3); - CHECK(shard_index(115, dims) == 3); - CHECK(shard_index(116, dims) == 4); - CHECK(shard_index(117, dims) == 4); - CHECK(shard_index(118, dims) == 5); - CHECK(shard_index(119, dims) == 5); - CHECK(shard_index(120, dims) == 6); - CHECK(shard_index(121, dims) == 6); - CHECK(shard_index(122, dims) == 7); - CHECK(shard_index(123, dims) == 7); - CHECK(shard_index(124, dims) == 8); - CHECK(shard_index(125, dims) == 8); - CHECK(shard_index(126, dims) == 9); - CHECK(shard_index(127, dims) == 9); - CHECK(shard_index(128, dims) == 10); - CHECK(shard_index(129, dims) == 10); - CHECK(shard_index(130, dims) == 11); - CHECK(shard_index(131, dims) == 11); - CHECK(shard_index(132, dims) == 12); - CHECK(shard_index(133, dims) == 12); - CHECK(shard_index(134, dims) == 13); - CHECK(shard_index(135, dims) == 13); - CHECK(shard_index(136, dims) == 14); - CHECK(shard_index(137, dims) == 14); - CHECK(shard_index(138, dims) == 15); - CHECK(shard_index(139, dims) == 15); - CHECK(shard_index(140, dims) == 16); - CHECK(shard_index(141, dims) == 16); - CHECK(shard_index(142, dims) == 17); - CHECK(shard_index(143, dims) == 17); - - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - return retval; - } - - acquire_export int unit_test__shard_internal_index() - { - int retval = 0; - - std::vector dims; - dims.emplace_back("x", - DimensionType_Space, - 1080, - 270, // 4 chunks - 3); // 2 ragged shards - dims.emplace_back("y", - DimensionType_Space, - 960, - 320, // 3 chunks - 2); // 2 ragged shards - dims.emplace_back("t", - DimensionType_Time, - 0, - 32, // 32 timepoints / chunk - 1); // 1 shard - - try { - CHECK(shard_index(0, dims) == 0); - CHECK(shard_internal_index(0, dims) == 0); - - CHECK(shard_index(1, dims) == 0); - CHECK(shard_internal_index(1, dims) == 1); - - CHECK(shard_index(2, dims) == 0); - CHECK(shard_internal_index(2, dims) == 2); - - CHECK(shard_index(3, dims) == 1); - CHECK(shard_internal_index(3, dims) == 0); - - CHECK(shard_index(4, dims) == 0); - CHECK(shard_internal_index(4, dims) == 3); - - CHECK(shard_index(5, dims) == 0); - CHECK(shard_internal_index(5, dims) == 4); - - CHECK(shard_index(6, dims) == 0); - CHECK(shard_internal_index(6, dims) == 5); - - CHECK(shard_index(7, dims) == 1); - CHECK(shard_internal_index(7, dims) == 3); - - CHECK(shard_index(8, dims) == 2); - CHECK(shard_internal_index(8, dims) == 0); - - CHECK(shard_index(9, dims) == 2); - CHECK(shard_internal_index(9, dims) == 1); - - CHECK(shard_index(10, dims) == 2); - CHECK(shard_internal_index(10, dims) == 2); - - CHECK(shard_index(11, dims) == 3); - CHECK(shard_internal_index(11, dims) == 0); - retval = 1; - } catch (const std::exception& exc) { - LOGE("Exception: %s\n", exc.what()); - } catch (...) { - LOGE("Exception: (unknown)"); - } - - return retval; - } - acquire_export int unit_test__zarrv3_writer__write_even() { int retval = 0; diff --git a/src/zarr.hh b/src/zarr.hh index 70c859ee..0dbc0570 100644 --- a/src/zarr.hh +++ b/src/zarr.hh @@ -3,7 +3,8 @@ #include "device/kit/storage.h" -#include "common.hh" +#include "common/utilities.hh" +#include "common/thread.pool.hh" #include "writers/writer.hh" #include "writers/blosc.compressor.hh" diff --git a/tests/unit-tests.cpp b/tests/unit-tests.cpp index 2fd615fe..84a30d15 100644 --- a/tests/unit-tests.cpp +++ b/tests/unit-tests.cpp @@ -82,7 +82,9 @@ main() }; const std::vector tests{ #define CASE(e) { .name = #e, .test = (int (*)())lib_load(&lib, #e) } + CASE(unit_test__trim), CASE(unit_test__average_frame), + CASE(unit_test__thread_pool__push_to_job_queue), CASE(unit_test__sink_creator__create_chunk_file_sinks), CASE(unit_test__sink_creator__create_shard_file_sinks), CASE(unit_test__chunk_lattice_index), @@ -92,7 +94,7 @@ main() CASE(unit_test__downsample_writer_config), CASE(unit_test__zarrv2_writer__write_even), CASE(unit_test__zarrv2_writer__write_ragged_append_dim), - CASE(unit_test__shard_index), + CASE(unit_test__shard_index_for_chunk), CASE(unit_test__zarrv2_writer__write_ragged_internal_dim), CASE(unit_test__shard_internal_index), CASE(unit_test__zarrv3_writer__write_even),