diff --git a/src/streaming/CMakeLists.txt b/src/streaming/CMakeLists.txt index 3fed4a3..8ef79d2 100644 --- a/src/streaming/CMakeLists.txt +++ b/src/streaming/CMakeLists.txt @@ -29,8 +29,10 @@ add_library(${tgt} zarrv2.array.writer.cpp zarrv3.array.writer.hh zarrv3.array.writer.cpp - vectorized.file.writer.hh - vectorized.file.writer.cpp + shard.writer.hh + shard.writer.cpp + platform.hh + platform.cpp $ ) diff --git a/src/streaming/array.writer.cpp b/src/streaming/array.writer.cpp index 0c7fd21..4c06183 100644 --- a/src/streaming/array.writer.cpp +++ b/src/streaming/array.writer.cpp @@ -75,7 +75,6 @@ zarr::downsample(const ArrayWriterConfig& config, return true; } -/// Writer zarr::ArrayWriter::ArrayWriter(const ArrayWriterConfig& config, std::shared_ptr thread_pool) : ArrayWriter(std::move(config), thread_pool, nullptr) @@ -138,54 +137,6 @@ zarr::ArrayWriter::is_s3_array_() const return config_.bucket_name.has_value(); } -bool -zarr::ArrayWriter::make_data_sinks_() -{ - std::string data_root; - std::function parts_along_dimension; - switch (version_()) { - case ZarrVersion_2: - parts_along_dimension = chunks_along_dimension; - data_root = config_.store_path + "/" + - std::to_string(config_.level_of_detail) + "/" + - std::to_string(append_chunk_index_); - break; - case ZarrVersion_3: - parts_along_dimension = shards_along_dimension; - data_root = config_.store_path + "/data/root/" + - std::to_string(config_.level_of_detail) + "/c" + - std::to_string(append_chunk_index_); - break; - default: - LOG_ERROR("Unsupported Zarr version"); - return false; - } - - SinkCreator creator(thread_pool_, s3_connection_pool_); - - if (is_s3_array_()) { - if (!creator.make_data_sinks(*config_.bucket_name, - data_root, - config_.dimensions.get(), - parts_along_dimension, - data_sinks_)) { - LOG_ERROR("Failed to create data sinks in ", - data_root, - " for bucket ", - *config_.bucket_name); - return false; - } - } else if (!creator.make_data_sinks(data_root, - config_.dimensions.get(), - parts_along_dimension, - data_sinks_)) { - LOG_ERROR("Failed to create data sinks in ", data_root); - return false; - } - - return true; -} - bool zarr::ArrayWriter::make_metadata_sink_() { @@ -193,29 +144,14 @@ zarr::ArrayWriter::make_metadata_sink_() return true; } - std::string metadata_path; - switch (version_()) { - case ZarrVersion_2: - metadata_path = config_.store_path + "/" + - std::to_string(config_.level_of_detail) + - "/.zarray"; - break; - case ZarrVersion_3: - metadata_path = config_.store_path + "/meta/root/" + - std::to_string(config_.level_of_detail) + - ".array.json"; - break; - default: - LOG_ERROR("Unsupported Zarr version"); - return false; - } + const auto metadata_path = metadata_path_(); if (is_s3_array_()) { SinkCreator creator(thread_pool_, s3_connection_pool_); metadata_sink_ = - creator.make_sink(*config_.bucket_name, metadata_path); + creator.make_s3_sink(*config_.bucket_name, metadata_path); } else { - metadata_sink_ = zarr::SinkCreator::make_sink(metadata_path); + metadata_sink_ = SinkCreator::make_file_sink(metadata_path); } if (!metadata_sink_) { @@ -336,60 +272,47 @@ zarr::ArrayWriter::should_flush_() const return frames_written_ % frames_before_flush == 0; } -void -zarr::ArrayWriter::compress_buffers_() +bool +zarr::ArrayWriter::compress_chunk_buffer_(size_t chunk_index) { if (!config_.compression_params.has_value()) { - return; + return true; } - LOG_DEBUG("Compressing"); + EXPECT(chunk_index < chunk_buffers_.size(), + "Chunk index out of bounds: ", + chunk_index); - BloscCompressionParams params = config_.compression_params.value(); + LOG_DEBUG("Compressing chunk ", chunk_index); + + const auto params = *config_.compression_params; const auto bytes_per_px = bytes_of_type(config_.dtype); - std::scoped_lock lock(buffers_mutex_); - std::latch latch(chunk_buffers_.size()); - for (auto& chunk : chunk_buffers_) { - EXPECT(thread_pool_->push_job( - [¶ms, buf = &chunk, bytes_per_px, &latch]( - std::string& err) -> bool { - bool success = false; - const size_t bytes_of_chunk = buf->size(); - - try { - const auto tmp_size = - bytes_of_chunk + BLOSC_MAX_OVERHEAD; - ChunkBuffer tmp(tmp_size); - const auto nb = - blosc_compress_ctx(params.clevel, - params.shuffle, - bytes_per_px, - bytes_of_chunk, - buf->data(), - tmp.data(), - tmp_size, - params.codec_id.c_str(), - 0 /* blocksize - 0:automatic */, - 1); - - tmp.resize(nb); - buf->swap(tmp); - - success = true; - } catch (const std::exception& exc) { - err = "Failed to compress chunk: " + - std::string(exc.what()); - } - latch.count_down(); - - return success; - }), - "Failed to push to job queue"); + auto& chunk = chunk_buffers_[chunk_index]; + const auto bytes_of_chunk = chunk.size(); + + const auto tmp_size = bytes_of_chunk + BLOSC_MAX_OVERHEAD; + ChunkBuffer tmp(tmp_size); + const auto nb = blosc_compress_ctx(params.clevel, + params.shuffle, + bytes_per_px, + bytes_of_chunk, + chunk.data(), + tmp.data(), + tmp_size, + params.codec_id.c_str(), + 0 /* blocksize - 0:automatic */, + 1); + + if (nb <= 0) { + LOG_ERROR("Failed to compress chunk ", chunk_index); + return false; } - // wait for all threads to finish - latch.wait(); + tmp.resize(nb); + chunk.swap(tmp); + + return true; } void @@ -400,15 +323,12 @@ zarr::ArrayWriter::flush_() } // compress buffers and write out - compress_buffers_(); - CHECK(flush_impl_()); + compress_and_flush_(); const auto should_rollover = should_rollover_(); - if (should_rollover) { - rollover_(); - } if (should_rollover || is_finalizing_) { + rollover_(); CHECK(write_array_metadata_()); } @@ -419,17 +339,6 @@ zarr::ArrayWriter::flush_() bytes_to_flush_ = 0; } -void -zarr::ArrayWriter::close_sinks_() -{ - for (auto i = 0; i < data_sinks_.size(); ++i) { - EXPECT(finalize_sink(std::move(data_sinks_[i])), - "Failed to finalize sink ", - i); - } - data_sinks_.clear(); -} - void zarr::ArrayWriter::rollover_() { diff --git a/src/streaming/array.writer.hh b/src/streaming/array.writer.hh index 076b114..da66b16 100644 --- a/src/streaming/array.writer.hh +++ b/src/streaming/array.writer.hh @@ -78,11 +78,12 @@ class ArrayWriter std::shared_ptr s3_connection_pool_; - virtual ZarrVersion version_() const = 0; + virtual std::string data_root_() const = 0; + virtual std::string metadata_path_() const = 0; bool is_s3_array_() const; - [[nodiscard]] bool make_data_sinks_(); + [[nodiscard]] virtual bool make_data_sinks_() = 0; [[nodiscard]] bool make_metadata_sink_(); void make_buffers_() noexcept; @@ -90,15 +91,15 @@ class ArrayWriter virtual bool should_rollover_() const = 0; size_t write_frame_to_chunks_(std::span data); - void compress_buffers_(); + bool compress_chunk_buffer_(size_t chunk_index); + virtual void compress_and_flush_() = 0; void flush_(); - [[nodiscard]] virtual bool flush_impl_() = 0; void rollover_(); [[nodiscard]] virtual bool write_array_metadata_() = 0; - void close_sinks_(); + virtual void close_sinks_() = 0; friend bool finalize_array(std::unique_ptr&& writer); }; diff --git a/src/streaming/platform.cpp b/src/streaming/platform.cpp new file mode 100644 index 0000000..b91270c --- /dev/null +++ b/src/streaming/platform.cpp @@ -0,0 +1,264 @@ +#include "platform.hh" +#include "macros.hh" + +#ifdef _WIN32 +#include +#else +#include +#include +#include + +#include +#endif + +namespace { +size_t +align_to(size_t size, size_t alignment) +{ + if (alignment == 0) { + return size; + } + + return (size + alignment - 1) & ~(alignment - 1); +} +} // namespace + +#ifdef _WIN32 +namespace { +size_t _PAGE_SIZE = 0; +size_t _SECTOR_SIZE = 0; + +[[nodiscard]] +size_t +get_sector_size() +{ + if (_SECTOR_SIZE == 0) { + DWORD bytes_per_sector; + GetDiskFreeSpace(NULL, &bytes_per_sector, NULL, NULL, NULL); + _SECTOR_SIZE = bytes_per_sector; + } + return _SECTOR_SIZE; +} + +[[nodiscard]] +size_t +get_page_size() +{ + if (_PAGE_SIZE == 0) { + SYSTEM_INFO sys_info; + GetSystemInfo(&sys_info); + _PAGE_SIZE = sys_info.dwPageSize; + } + return _PAGE_SIZE; +} + +std::string +get_last_error_as_string() +{ + DWORD errorMessageID = ::GetLastError(); + if (errorMessageID == 0) { + return std::string(); // No error message has been recorded + } + + LPSTR messageBuffer = nullptr; + + size_t size = FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + errorMessageID, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR)&messageBuffer, + 0, + NULL); + + std::string message(messageBuffer, size); + + LocalFree(messageBuffer); + + return message; +} +} // namespace + +size_t +zarr::align_to_system_boundary(size_t size) +{ + size = align_to(size, get_page_size()); + return align_to(size, get_sector_size()); +} + +zarr::VectorizedFile::VectorizedFile(std::string_view path) + : inner_(INVALID_HANDLE_VALUE) +{ + inner_ = (void*)CreateFileA(path.data(), + GENERIC_WRITE, + 0, // no sharing + nullptr, + OPEN_ALWAYS, + FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | + FILE_FLAG_SEQUENTIAL_SCAN, + nullptr); + + EXPECT(inner_ != INVALID_HANDLE_VALUE, "Failed to open file: ", path); +} + +zarr::VectorizedFile::~VectorizedFile() +{ + if (inner_ != INVALID_HANDLE_VALUE) { + CloseHandle((HANDLE)inner_); + } +} + +bool +zarr::file_write_vectorized(VectorizedFile& file, + const std::vector>& buffers, + size_t offset) +{ + auto fh = (HANDLE)file.inner_; + EXPECT(fh != INVALID_HANDLE_VALUE, "Invalid file handle"); + + const auto page_size = get_page_size(); + + bool success = true; + + size_t total_bytes_to_write = 0; + for (const auto& buffer : buffers) { + total_bytes_to_write += buffer.size(); + } + + const size_t nbytes_aligned = + align_to_system_boundary(total_bytes_to_write); + CHECK(nbytes_aligned >= total_bytes_to_write); + + auto* aligned = + static_cast(_aligned_malloc(nbytes_aligned, page_size)); + if (!aligned) { + return false; + } + + auto* cur = aligned; + for (const auto& buffer : buffers) { + std::copy(buffer.begin(), buffer.end(), cur); + cur += buffer.size(); + } + + std::vector segments(nbytes_aligned / page_size); + + cur = aligned; + for (auto& segment : segments) { + memset(&segment, 0, sizeof(segment)); + segment.Buffer = PtrToPtr64(cur); + cur += page_size; + } + + OVERLAPPED overlapped = { 0 }; + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + + if (!WriteFileGather( + fh, segments.data(), nbytes_aligned, nullptr, &overlapped)) { + if (GetLastError() != ERROR_IO_PENDING) { + LOG_ERROR("Failed to write file : ", get_last_error_as_string()); + success = false; + } + + // Wait for the operation to complete + DWORD bytes_written = 0; + if (success && + !GetOverlappedResult(fh, &overlapped, &bytes_written, TRUE)) { + LOG_ERROR("Failed to get overlapped result: ", + get_last_error_as_string()); + success = false; + } + EXPECT(bytes_written == nbytes_aligned, + "Expected to write ", + nbytes_aligned, + " bytes, wrote ", + bytes_written); + } + + _aligned_free(aligned); + + return success; +} +#else // _WIN32 +namespace { +size_t _PAGE_SIZE = 0; + +size_t +get_page_size() +{ + if (_PAGE_SIZE == 0) { + _PAGE_SIZE = sysconf(_SC_PAGE_SIZE); + } + return _PAGE_SIZE; +} + +std::string +get_last_error_as_string() +{ + return strerror(errno); +} +} // namespace + +size_t +zarr::align_to_system_boundary(size_t size) +{ + return align_to(size, get_page_size()); +} + +zarr::VectorizedFile::VectorizedFile(std::string_view path) + : inner_(nullptr) +{ + inner_ = new int(open(path.data(), O_WRONLY | O_CREAT, 0644)); + EXPECT(inner_ != nullptr, "Failed to open file: ", path); + EXPECT(*(int*)inner_ != -1, "Failed to open file: ", path); +} + +zarr::VectorizedFile::~VectorizedFile() +{ + if (inner_ != nullptr) { + if (*(int*)inner_ != -1) { + close(*(int*)inner_); + } + delete (int*)inner_; + } +} + +bool +zarr::file_write_vectorized(zarr::VectorizedFile& file, + const std::vector>& buffers, + size_t offset) +{ + auto fd = *(int*)file.inner_; + EXPECT(fd != -1, "Invalid file descriptor"); + + std::vector iovecs(buffers.size()); + + for (auto i = 0; i < buffers.size(); ++i) { + auto* iov = &iovecs[i]; + memset(iov, 0, sizeof(struct iovec)); + iov->iov_base = + const_cast(static_cast(buffers[i].data())); + iov->iov_len = buffers[i].size(); + } + + ssize_t total_bytes = 0; + for (const auto& buffer : buffers) { + total_bytes += static_cast(buffer.size()); + } + + ssize_t bytes_written = pwritev(fd, + iovecs.data(), + static_cast(iovecs.size()), + static_cast(offset)); + + if (bytes_written != total_bytes) { + LOG_ERROR("Failed to write file: ", get_last_error_as_string()); + return false; + } + + return true; +} +#endif \ No newline at end of file diff --git a/src/streaming/platform.hh b/src/streaming/platform.hh new file mode 100644 index 0000000..cd974a8 --- /dev/null +++ b/src/streaming/platform.hh @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +namespace zarr { +class VectorizedFile +{ + public: + explicit VectorizedFile(std::string_view path); + ~VectorizedFile(); + + private: + void* inner_; + + friend bool file_write_vectorized( + VectorizedFile& file, + const std::vector>& buffers, + size_t offset); +}; + +/** + * @brief Align and offset or a size to the nearest system boundary. + * @note Aligns to sector size on Windows, page size on UNIX. + * @param size The offset or size to align. + * @return The aligned offset or size. + */ +size_t +align_to_system_boundary(size_t size); + +/** + * @brief Write a vector of buffers to the file at the given path. + * @param[in] file The VectorizedFile to write to. + * @param[in] buffers The buffers to write. + * @param[in] offset The offset in the file to write to. This value must be + * aligned to the system boundary. + * @throws std::runtime_error if the file cannot be opened for writing, or if + * the offset is not aligned to the system boundary. + * @return True if the write was successful, false otherwise. + */ +bool +file_write_vectorized(VectorizedFile& file, + const std::vector>& buffers, + size_t offset); +} \ No newline at end of file diff --git a/src/streaming/shard.writer.cpp b/src/streaming/shard.writer.cpp new file mode 100644 index 0000000..525c32e --- /dev/null +++ b/src/streaming/shard.writer.cpp @@ -0,0 +1,151 @@ +#include "macros.hh" +#include "shard.writer.hh" +#include "zarr.common.hh" + +#include +#include +#include + +namespace fs = std::filesystem; + +#ifdef max +#undef max +#endif + +zarr::ShardWriter::ShardWriter(const ShardWriterConfig& config) + : file_path_(config.file_path) + , file_{ std::make_unique(config.file_path) } + , chunks_before_flush_{ config.chunks_before_flush } + , chunks_per_shard_{ config.chunks_per_shard } + , chunks_flushed_{ 0 } + , cumulative_size_{ 0 } + , file_offset_{ 0 } + , index_table_(2 * config.chunks_per_shard * sizeof(uint64_t)) +{ + std::fill_n(reinterpret_cast(index_table_.data()), + index_table_.size() / sizeof(uint64_t), + std::numeric_limits::max()); + + chunks_.reserve(chunks_before_flush_); +} + +void +zarr::ShardWriter::add_chunk(ChunkBufferPtr buffer, uint32_t index_in_shard) +{ + std::unique_lock lock(mutex_); + cv_.wait(lock, [this] { return chunks_.size() < chunks_before_flush_; }); + + // chunks have been flushed and the new offset is aligned to the sector size + if (chunks_.empty()) { + cumulative_size_ = file_offset_; + } + + set_offset_extent_(index_in_shard, cumulative_size_, buffer->size()); + cumulative_size_ += buffer->size(); + + chunks_.push_back(buffer); + if (chunks_.size() == chunks_before_flush_) { + CHECK(flush_()); + } +} + +bool +zarr::ShardWriter::flush_() +{ + std::vector> buffers; + buffers.reserve(chunks_.size() + 1); + for (const auto& chunk : chunks_) { + buffers.emplace_back(*chunk); + } + buffers.emplace_back(index_table_); + + try { + file_write_vectorized(*file_, buffers, file_offset_); + } catch (const std::exception& exc) { + LOG_ERROR("Failed to write chunk: ", std::string(exc.what())); + return false; + } + + chunks_flushed_ += chunks_.size(); + chunks_.clear(); + chunks_.reserve(chunks_before_flush_); + file_offset_ = zarr::align_to_system_boundary(cumulative_size_); + + cv_.notify_all(); + + return true; +} + +void +zarr::ShardWriter::set_offset_extent_(uint32_t shard_internal_index, + uint64_t offset, + uint64_t size) +{ + EXPECT(shard_internal_index < chunks_per_shard_, + "Shard internal index ", + shard_internal_index, + " out of bounds"); + + auto* index_table_u64 = reinterpret_cast(index_table_.data()); + const auto index = 2 * shard_internal_index; + index_table_u64[index] = offset; + index_table_u64[index + 1] = size; +} + +bool +zarr::finalize_shard_writer(std::unique_ptr&& writer) +{ + if (writer == nullptr) { + LOG_INFO("Writer is null. Nothing to finalize."); + return true; + } + + // flush remaining chunks and close file + if (!writer->flush_()) { + LOG_ERROR("Failed to flush remaining chunks."); + return false; + } + writer->file_.reset(); // release file handle + + // resize file if necessary + const auto file_size = fs::file_size(writer->file_path_); + const auto expected_size = writer->cumulative_size_ + + writer->chunks_per_shard_ * 2 * sizeof(uint64_t); + if (file_size > expected_size) { + fs::resize_file(writer->file_path_, expected_size); + } + + writer.reset(); + return true; +} + +bool +zarr::make_shard_writers( + std::string_view base_path, + uint32_t chunks_before_flush, + uint32_t chunks_per_shard, + const ArrayDimensions& dimensions, + std::shared_ptr thread_pool, + std::vector>& shard_writers) +{ + auto paths = + construct_data_paths(base_path, dimensions, shards_along_dimension); + + auto parent_paths = get_parent_paths(paths); + if (!make_dirs(parent_paths, thread_pool)) { + LOG_ERROR("Failed to create dataset paths."); + return false; + } + + shard_writers.clear(); + shard_writers.reserve(paths.size()); + + for (const auto& path : paths) { + ShardWriterConfig config{ .file_path = path, + .chunks_before_flush = chunks_before_flush, + .chunks_per_shard = chunks_per_shard }; + shard_writers.emplace_back(std::make_unique(config)); + } + + return true; +} diff --git a/src/streaming/shard.writer.hh b/src/streaming/shard.writer.hh new file mode 100644 index 0000000..4b43953 --- /dev/null +++ b/src/streaming/shard.writer.hh @@ -0,0 +1,66 @@ +#pragma once + +#include "platform.hh" +#include "thread.pool.hh" +#include "zarr.dimension.hh" + +#include +#include +#include +#include +#include + +namespace zarr { +struct ShardWriterConfig +{ + std::string file_path; + uint32_t chunks_before_flush; + uint32_t chunks_per_shard; +}; + +class ShardWriter +{ + public: + using ChunkBufferPtr = std::vector*; + + explicit ShardWriter(const ShardWriterConfig& config); + ~ShardWriter() = default; + + void add_chunk(ChunkBufferPtr buffer, uint32_t index_in_shard); + + private: + std::string file_path_; + std::unique_ptr file_; + + uint32_t chunks_before_flush_; + uint32_t chunks_per_shard_; + uint32_t chunks_flushed_; + + std::vector index_table_; + + std::mutex mutex_; + std::condition_variable cv_; + + std::vector chunks_; + uint64_t cumulative_size_; + uint64_t file_offset_; + + void set_offset_extent_(uint32_t shard_internal_index, + uint64_t offset, + uint64_t size); + [[nodiscard]] bool flush_(); + + friend bool finalize_shard_writer(std::unique_ptr&& writer); +}; + +bool +finalize_shard_writer(std::unique_ptr&& writer); + +bool +make_shard_writers(std::string_view base_path, + uint32_t chunks_before_flush, + uint32_t chunks_per_shard, + const ArrayDimensions& dimensions, + std::shared_ptr thread_pool, + std::vector>& shard_writers); +} // namespace zarr \ No newline at end of file diff --git a/src/streaming/sink.creator.cpp b/src/streaming/sink.creator.cpp index d1343c2..ddc18f5 100644 --- a/src/streaming/sink.creator.cpp +++ b/src/streaming/sink.creator.cpp @@ -1,8 +1,9 @@ -#include "macros.hh" -#include "sink.creator.hh" +#include "acquire.zarr.h" #include "file.sink.hh" +#include "macros.hh" #include "s3.sink.hh" -#include "acquire.zarr.h" +#include "sink.creator.hh" +#include "zarr.common.hh" #include #include @@ -20,7 +21,7 @@ zarr::SinkCreator::SinkCreator( } std::unique_ptr -zarr::SinkCreator::make_sink(std::string_view file_path) +zarr::SinkCreator::make_file_sink(std::string_view file_path) { if (file_path.starts_with("file://")) { file_path = file_path.substr(7); @@ -46,8 +47,8 @@ zarr::SinkCreator::make_sink(std::string_view file_path) } std::unique_ptr -zarr::SinkCreator::make_sink(std::string_view bucket_name, - std::string_view object_key) +zarr::SinkCreator::make_s3_sink(std::string_view bucket_name, + std::string_view object_key) { EXPECT(!bucket_name.empty(), "Bucket name must not be empty."); EXPECT(!object_key.empty(), "Object key must not be empty."); @@ -61,7 +62,7 @@ zarr::SinkCreator::make_sink(std::string_view bucket_name, } bool -zarr::SinkCreator::make_data_sinks( +zarr::SinkCreator::make_data_file_sinks( std::string_view base_path, const ArrayDimensions* dimensions, const std::function& parts_along_dimension, @@ -73,20 +74,27 @@ zarr::SinkCreator::make_data_sinks( EXPECT(!base_path.empty(), "Base path must not be empty."); - std::queue paths; + std::vector paths; try { - paths = make_data_sink_paths_( - base_path, dimensions, parts_along_dimension, true); + paths = + construct_data_paths(base_path, *dimensions, parts_along_dimension); + auto parent_paths = get_parent_paths(paths); + EXPECT(make_dirs(parent_paths, thread_pool_), + "Failed to create dataset paths."); } catch (const std::exception& exc) { LOG_ERROR("Failed to create dataset paths: ", exc.what()); return false; } - return make_files_(paths, part_sinks); + std::queue paths_queue; + for (const auto& path : paths) { + paths_queue.push(path); + } + return make_files_(paths_queue, part_sinks); } bool -zarr::SinkCreator::make_data_sinks( +zarr::SinkCreator::make_data_s3_sinks( std::string_view bucket_name, std::string_view base_path, const ArrayDimensions* dimensions, @@ -102,7 +110,7 @@ zarr::SinkCreator::make_data_sinks( } bool -zarr::SinkCreator::make_metadata_sinks( +zarr::SinkCreator::make_metadata_file_sinks( size_t version, std::string_view base_path, std::unordered_map>& metadata_sinks) @@ -119,7 +127,7 @@ zarr::SinkCreator::make_metadata_sinks( } bool -zarr::SinkCreator::make_metadata_sinks( +zarr::SinkCreator::make_metadata_s3_sinks( size_t version, std::string_view bucket_name, std::string_view base_path, @@ -255,57 +263,17 @@ zarr::SinkCreator::make_dirs_(std::queue& dir_paths) return true; } - std::atomic all_successful = 1; - - const auto n_dirs = dir_paths.size(); - std::latch latch(n_dirs); - - for (auto i = 0; i < n_dirs; ++i) { - const auto dirname = dir_paths.front(); + std::vector paths; + while (!dir_paths.empty()) { + paths.emplace_back(dir_paths.front()); dir_paths.pop(); - - EXPECT(thread_pool_->push_job( - [dirname, &latch, &all_successful](std::string& err) -> bool { - if (dirname.empty()) { - err = "Directory name must not be empty."; - latch.count_down(); - all_successful.fetch_and(0); - return false; - } - - if (fs::is_directory(dirname)) { - latch.count_down(); - return true; - } else if (fs::exists(dirname)) { - err = - "'" + dirname + "' exists but is not a directory"; - latch.count_down(); - all_successful.fetch_and(0); - return false; - } - - if (all_successful) { - std::error_code ec; - if (!fs::create_directories(dirname, ec)) { - err = "Failed to create directory '" + dirname + - "': " + ec.message(); - latch.count_down(); - all_successful.fetch_and(0); - return false; - } - } - - latch.count_down(); - return true; - }), - "Failed to push job to thread pool."); - - dir_paths.push(dirname); } - latch.wait(); + for (const auto& path : paths) { + dir_paths.push(path); + } - return (bool)all_successful; + return make_dirs(paths, thread_pool_); } bool @@ -345,7 +313,7 @@ zarr::SinkCreator::make_files_(std::queue& file_paths, } latch.count_down(); - all_successful.fetch_and((char)success); + all_successful.fetch_and(static_cast(success)); return success; }), @@ -354,7 +322,7 @@ zarr::SinkCreator::make_files_(std::queue& file_paths, latch.wait(); - return (bool)all_successful; + return static_cast(all_successful); } bool diff --git a/src/streaming/sink.creator.hh b/src/streaming/sink.creator.hh index b7f6e68..704c412 100644 --- a/src/streaming/sink.creator.hh +++ b/src/streaming/sink.creator.hh @@ -24,7 +24,7 @@ class SinkCreator * opened. * @throws std::runtime_error if the file path is not valid. */ - static std::unique_ptr make_sink(std::string_view file_path); + static std::unique_ptr make_file_sink(std::string_view file_path); /** * @brief Create a sink from an S3 bucket name and object key. @@ -35,8 +35,8 @@ class SinkCreator * @throws std::runtime_error if the bucket name or object key is not valid, * or if there is no connection pool. */ - std::unique_ptr make_sink(std::string_view bucket_name, - std::string_view object_key); + std::unique_ptr make_s3_sink(std::string_view bucket_name, + std::string_view object_key); /** * @brief Create a collection of file sinks for a Zarr dataset. @@ -49,7 +49,7 @@ class SinkCreator * @throws std::runtime_error if @p base_path is not valid, or if the number * of parts along a dimension is zero. */ - [[nodiscard]] bool make_data_sinks( + [[nodiscard]] bool make_data_file_sinks( std::string_view base_path, const ArrayDimensions* dimensions, const std::function& parts_along_dimension, @@ -66,7 +66,7 @@ class SinkCreator * @param[out] part_sinks The sinks created. * @return True iff all file sinks were created successfully. */ - [[nodiscard]] bool make_data_sinks( + [[nodiscard]] bool make_data_s3_sinks( std::string_view bucket_name, std::string_view base_path, const ArrayDimensions* dimensions, @@ -82,7 +82,7 @@ class SinkCreator * @throws std::runtime_error if @p base_uri is not valid, or if, for S3 * sinks, the bucket does not exist. */ - [[nodiscard]] bool make_metadata_sinks( + [[nodiscard]] bool make_metadata_file_sinks( size_t version, std::string_view base_path, std::unordered_map>& metadata_sinks); @@ -97,7 +97,7 @@ class SinkCreator * @throws std::runtime_error if @p version is invalid, if @p bucket_name is * empty or does not exist, or if @p base_path is empty. */ - [[nodiscard]] bool make_metadata_sinks( + [[nodiscard]] bool make_metadata_s3_sinks( size_t version, std::string_view bucket_name, std::string_view base_path, diff --git a/src/streaming/vectorized.file.writer.cpp b/src/streaming/vectorized.file.writer.cpp deleted file mode 100644 index 18b57f8..0000000 --- a/src/streaming/vectorized.file.writer.cpp +++ /dev/null @@ -1,220 +0,0 @@ -#include "vectorized.file.writer.hh" -#include "macros.hh" - -#include - -namespace { -#ifdef _WIN32 -std::string -get_last_error_as_string() -{ - DWORD errorMessageID = ::GetLastError(); - if (errorMessageID == 0) { - return std::string(); // No error message has been recorded - } - - LPSTR messageBuffer = nullptr; - - size_t size = FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - errorMessageID, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR)&messageBuffer, - 0, - NULL); - - std::string message(messageBuffer, size); - - LocalFree(messageBuffer); - - return message; -} - -size_t -get_sector_size(const std::string& path) -{ - // Get volume root path - char volume_path[MAX_PATH]; - if (!GetVolumePathNameA(path.c_str(), volume_path, MAX_PATH)) { - return 0; - } - - DWORD sectors_per_cluster; - DWORD bytes_per_sector; - DWORD number_of_free_clusters; - DWORD total_number_of_clusters; - - if (!GetDiskFreeSpaceA(volume_path, - §ors_per_cluster, - &bytes_per_sector, - &number_of_free_clusters, - &total_number_of_clusters)) { - return 0; - } - - return bytes_per_sector; -} -#else -std::string -get_last_error_as_string() -{ - return strerror(errno); -} -#endif -} // namespace - -zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path) -{ -#ifdef _WIN32 - SYSTEM_INFO si; - GetSystemInfo(&si); - page_size_ = si.dwPageSize; - - sector_size_ = get_sector_size(path); - if (sector_size_ == 0) { - throw std::runtime_error("Failed to get sector size"); - } - - handle_ = CreateFileA(path.c_str(), - GENERIC_WRITE, - 0, // No sharing - nullptr, - OPEN_ALWAYS, - FILE_FLAG_OVERLAPPED | FILE_FLAG_NO_BUFFERING | - FILE_FLAG_SEQUENTIAL_SCAN, - nullptr); - if (handle_ == INVALID_HANDLE_VALUE) { - auto err = get_last_error_as_string(); - throw std::runtime_error("Failed to open file '" + path + "': " + err); - } -#else - page_size_ = sysconf(_SC_PAGESIZE); - fd_ = open(path.c_str(), O_WRONLY | O_CREAT, 0644); - if (fd_ < 0) { - throw std::runtime_error("Failed to open file: " + path); - } -#endif -} - -zarr::VectorizedFileWriter::~VectorizedFileWriter() -{ -#ifdef _WIN32 - if (handle_ != INVALID_HANDLE_VALUE) { - CloseHandle(handle_); - } -#else - if (fd_ >= 0) { - close(fd_); - } -#endif -} - -bool -zarr::VectorizedFileWriter::write_vectors( - const std::vector>& buffers, - size_t offset) -{ - std::lock_guard lock(mutex_); - bool retval{ true }; - -#ifdef _WIN32 - size_t total_bytes_to_write = 0; - for (const auto& buffer : buffers) { - total_bytes_to_write += buffer.size(); - } - - const size_t nbytes_aligned = align_size_(total_bytes_to_write); - CHECK(nbytes_aligned >= total_bytes_to_write); - - auto* aligned_ptr = - static_cast(_aligned_malloc(nbytes_aligned, page_size_)); - if (!aligned_ptr) { - return false; - } - - auto* cur = aligned_ptr; - for (const auto& buffer : buffers) { - std::copy(buffer.begin(), buffer.end(), cur); - cur += buffer.size(); - } - - std::vector segments(nbytes_aligned / page_size_); - - cur = aligned_ptr; - for (auto& segment : segments) { - memset(&segment, 0, sizeof(segment)); - segment.Buffer = PtrToPtr64(cur); - cur += page_size_; - } - - OVERLAPPED overlapped = { 0 }; - overlapped.Offset = static_cast(offset & 0xFFFFFFFF); - overlapped.OffsetHigh = static_cast(offset >> 32); - overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); - - DWORD bytes_written; - - if (!WriteFileGather( - handle_, segments.data(), nbytes_aligned, nullptr, &overlapped)) { - if (GetLastError() != ERROR_IO_PENDING) { - LOG_ERROR("Failed to write file: ", get_last_error_as_string()); - retval = false; - } - - // Wait for the operation to complete - if (!GetOverlappedResult(handle_, &overlapped, &bytes_written, TRUE)) { - LOG_ERROR("Failed to get overlapped result: ", - get_last_error_as_string()); - retval = false; - } - } - - _aligned_free(aligned_ptr); -#else - std::vector iovecs(buffers.size()); - - for (auto i = 0; i < buffers.size(); ++i) { - auto* iov = &iovecs[i]; - memset(iov, 0, sizeof(struct iovec)); - iov->iov_base = - const_cast(static_cast(buffers[i].data())); - iov->iov_len = buffers[i].size(); - } - - ssize_t total_bytes = 0; - for (const auto& buffer : buffers) { - total_bytes += static_cast(buffer.size()); - } - - ssize_t bytes_written = pwritev(fd_, - iovecs.data(), - static_cast(iovecs.size()), - static_cast(offset)); - - if (bytes_written != total_bytes) { - auto error = get_last_error_as_string(); - LOG_ERROR("Failed to write file: ", error); - retval = false; - } -#endif - return retval; -} - -size_t -zarr::VectorizedFileWriter::align_size_(size_t size) const -{ - size = align_to_page_(size); -#ifdef _WIN32 - return (size + sector_size_ - 1) & ~(sector_size_ - 1); -#else - return size; -#endif -} - -size_t -zarr::VectorizedFileWriter::align_to_page_(size_t size) const -{ - return (size + page_size_ - 1) & ~(page_size_ - 1); -} \ No newline at end of file diff --git a/src/streaming/vectorized.file.writer.hh b/src/streaming/vectorized.file.writer.hh deleted file mode 100644 index 92350ea..0000000 --- a/src/streaming/vectorized.file.writer.hh +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#ifdef _WIN32 -#include -#else -#include -#include -#include -#endif - -namespace zarr { -class VectorizedFileWriter -{ - public: - explicit VectorizedFileWriter(const std::string& path); - ~VectorizedFileWriter(); - - bool write_vectors(const std::vector>& buffers, - size_t offset); - - std::mutex& mutex() { return mutex_; } - - private: - std::mutex mutex_; - size_t page_size_; -#ifdef _WIN32 - HANDLE handle_; - size_t sector_size_; -#else - int fd_; -#endif - - size_t align_size_(size_t size) const; - size_t align_to_page_(size_t size) const; -}; -} // namespace zarr \ No newline at end of file diff --git a/src/streaming/zarr.common.cpp b/src/streaming/zarr.common.cpp index f70fde0..ff2b837 100644 --- a/src/streaming/zarr.common.cpp +++ b/src/streaming/zarr.common.cpp @@ -1,8 +1,14 @@ #include "macros.hh" #include "zarr.common.hh" -#include #include +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; std::string zarr::trim(std::string_view s) @@ -91,3 +97,111 @@ zarr::shards_along_dimension(const ZarrDimension& dimension) const auto n_chunks = chunks_along_dimension(dimension); return (n_chunks + shard_size - 1) / shard_size; } + +std::vector +zarr::construct_data_paths( + std::string_view base_path, + const ArrayDimensions& dimensions, + const std::function& parts_along_dimension) +{ + std::queue paths_queue; + paths_queue.emplace(base_path); + + // create intermediate paths + for (auto i = 1; // skip the last dimension + i < dimensions.ndims() - 1; // skip the x dimension + ++i) { + const auto& dim = dimensions.at(i); + const auto n_parts = parts_along_dimension(dim); + CHECK(n_parts); + + auto n_paths = paths_queue.size(); + for (auto j = 0; j < n_paths; ++j) { + const auto path = paths_queue.front(); + paths_queue.pop(); + + for (auto k = 0; k < n_parts; ++k) { + const auto kstr = std::to_string(k); + paths_queue.push(path + (path.empty() ? kstr : "/" + kstr)); + } + } + } + + // create final paths + std::vector paths_out; + paths_out.reserve(paths_queue.size() * + parts_along_dimension(dimensions.width_dim())); + { + const auto& dim = dimensions.width_dim(); + const auto n_parts = parts_along_dimension(dim); + CHECK(n_parts); + + auto n_paths = paths_queue.size(); + for (auto i = 0; i < n_paths; ++i) { + const auto path = paths_queue.front(); + paths_queue.pop(); + for (auto j = 0; j < n_parts; ++j) + paths_out.push_back(path + "/" + std::to_string(j)); + } + } + + return paths_out; +} + +std::vector +zarr::get_parent_paths(const std::vector& file_paths) +{ + std::unordered_set unique_paths; + for (const auto& file_path : file_paths) { + unique_paths.emplace(fs::path(file_path).parent_path().string()); + } + + return std::vector(unique_paths.begin(), unique_paths.end()); +} + +bool +zarr::make_dirs(const std::vector& dir_paths, + std::shared_ptr thread_pool) +{ + if (dir_paths.empty()) { + return true; + } + EXPECT(thread_pool, "Thread pool not provided."); + + std::atomic all_successful = 1; + + std::unordered_set unique_paths(dir_paths.begin(), + dir_paths.end()); + + std::latch latch(unique_paths.size()); + for (const auto& path : unique_paths) { + auto job = [&path, &latch, &all_successful](std::string& err) { + bool success = true; + if (fs::is_directory(path)) { + latch.count_down(); + return success; + } + + std::error_code ec; + if (!fs::create_directories(path, ec)) { + err = + "Failed to create directory '" + path + "': " + ec.message(); + success = false; + } + + latch.count_down(); + all_successful.fetch_and(static_cast(success)); + + return success; + }; + + if (!thread_pool->push_job(std::move(job))) { + LOG_ERROR("Failed to push job to thread pool."); + return false; + } + } + + latch.wait(); + + return static_cast(all_successful); +} diff --git a/src/streaming/zarr.common.hh b/src/streaming/zarr.common.hh index e45241d..43a2e00 100644 --- a/src/streaming/zarr.common.hh +++ b/src/streaming/zarr.common.hh @@ -1,7 +1,8 @@ #pragma once -#include "zarr.dimension.hh" #include "acquire.zarr.h" +#include "thread.pool.hh" +#include "zarr.dimension.hh" namespace zarr { /** @@ -60,4 +61,35 @@ chunks_along_dimension(const ZarrDimension& dimension); */ uint32_t shards_along_dimension(const ZarrDimension& dimension); + +/** + * @brief Construct paths for data sinks, given the dimensions and a function + * to determine the number of parts along a dimension. + * @param base_path The base path for the dataset. + * @param dimensions The dimensions of the dataset. + * @param parts_along_dimension Function to determine the number of parts + */ +std::vector +construct_data_paths( + std::string_view base_path, + const ArrayDimensions& dimensions, + const std::function& parts_along_dimension); + +/** + * @brief Get unique paths to the parent directories of each file in @p file_paths. + * @param file_paths Collection of paths to files. + * @return Collection of unique parent directories. + */ +std::vector +get_parent_paths(const std::vector& file_paths); + +/** + * @brief Parallel create directories for a collection of paths. + * @param dir_paths The directories to create. + * @param thread_pool The thread pool to use for parallel creation. + * @return True iff all directories were created successfully. + */ +bool +make_dirs(const std::vector& dir_paths, + std::shared_ptr thread_pool); } // namespace zarr \ No newline at end of file diff --git a/src/streaming/zarr.dimension.hh b/src/streaming/zarr.dimension.hh index 9c0174c..66ba92d 100644 --- a/src/streaming/zarr.dimension.hh +++ b/src/streaming/zarr.dimension.hh @@ -2,6 +2,7 @@ #include "zarr.types.h" +#include #include #include #include @@ -111,4 +112,6 @@ class ArrayDimensions private: std::vector dims_; ZarrDataType dtype_; -}; \ No newline at end of file +}; + +using PartsAlongDimensionFun = std::function; \ No newline at end of file diff --git a/src/streaming/zarr.stream.cpp b/src/streaming/zarr.stream.cpp index 1f1f8cb..de166d7 100644 --- a/src/streaming/zarr.stream.cpp +++ b/src/streaming/zarr.stream.cpp @@ -664,15 +664,15 @@ ZarrStream_s::create_metadata_sinks_() try { if (s3_connection_pool_) { - if (!creator.make_metadata_sinks(version_, - s3_settings_->bucket_name, - store_path_, - metadata_sinks_)) { + if (!creator.make_metadata_s3_sinks(version_, + s3_settings_->bucket_name, + store_path_, + metadata_sinks_)) { set_error_("Error creating metadata sinks"); return false; } } else { - if (!creator.make_metadata_sinks( + if (!creator.make_metadata_file_sinks( version_, store_path_, metadata_sinks_)) { set_error_("Error creating metadata sinks"); return false; diff --git a/src/streaming/zarrv2.array.writer.cpp b/src/streaming/zarrv2.array.writer.cpp index 8b620ca..9d4d6e4 100644 --- a/src/streaming/zarrv2.array.writer.cpp +++ b/src/streaming/zarrv2.array.writer.cpp @@ -72,50 +72,104 @@ zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter( { } +std::string +zarr::ZarrV2ArrayWriter::data_root_() const +{ + return config_.store_path + "/" + std::to_string(config_.level_of_detail) + + "/" + std::to_string(append_chunk_index_); +} + +std::string +zarr::ZarrV2ArrayWriter::metadata_path_() const +{ + return config_.store_path + "/" + std::to_string(config_.level_of_detail) + + "/.zarray"; +} + bool -zarr::ZarrV2ArrayWriter::flush_impl_() +zarr::ZarrV2ArrayWriter::make_data_sinks_() { - // create chunk files - CHECK(data_sinks_.empty()); - if (!make_data_sinks_()) { + SinkCreator creator(thread_pool_, s3_connection_pool_); + + const auto data_root = data_root_(); + if (is_s3_array_()) { + if (!creator.make_data_s3_sinks(*config_.bucket_name, + data_root, + config_.dimensions.get(), + chunks_along_dimension, + data_sinks_)) { + LOG_ERROR("Failed to create data sinks in ", + data_root, + " for bucket ", + *config_.bucket_name); + return false; + } + } else if (!creator.make_data_file_sinks(data_root, + config_.dimensions.get(), + chunks_along_dimension, + data_sinks_)) { + LOG_ERROR("Failed to create data sinks in ", data_root); return false; } - CHECK(data_sinks_.size() == chunk_buffers_.size()); - - std::latch latch(chunk_buffers_.size()); - { - std::scoped_lock lock(buffers_mutex_); - for (auto i = 0; i < data_sinks_.size(); ++i) { - auto& chunk = chunk_buffers_.at(i); - EXPECT(thread_pool_->push_job( - std::move([&sink = data_sinks_.at(i), - data_ = chunk.data(), - size = chunk.size(), - &latch](std::string& err) -> bool { - bool success = false; - try { - std::span data{ - reinterpret_cast(data_), size - }; - CHECK(sink->write(0, data)); - success = true; - } catch (const std::exception& exc) { - err = "Failed to write chunk: " + - std::string(exc.what()); - } - - latch.count_down(); - return success; - })), - "Failed to push job to thread pool"); - } + return true; +} + +void +zarr::ZarrV2ArrayWriter::compress_and_flush_() +{ + if (bytes_to_flush_ == 0) { + LOG_DEBUG("No data to flush"); + return; + } + + const auto n_chunks = chunk_buffers_.size(); + + CHECK(data_sinks_.empty()); + CHECK(make_data_sinks_()); + CHECK(data_sinks_.size() == n_chunks); + + const auto bytes_per_px = bytes_of_type(config_.dtype); + + std::latch latch(n_chunks); + for (auto i = 0; i < n_chunks; ++i) { + auto job = [this, i, &latch](std::string& err) -> bool { + if (!compress_chunk_buffer_(i)) { // no-op if no compression + err = "Failed to compress chunk buffer"; + return false; + } + + auto& sink = data_sinks_[i]; + auto& buf = chunk_buffers_[i]; + + bool success = false; + + try { + success = sink->write(0, buf); + } catch (const std::exception& exc) { + err = "Failed to write chunk: " + std::string(exc.what()); + } + + latch.count_down(); + return success; + }; + + CHECK(thread_pool_->push_job(std::move(job))); } // wait for all threads to finish latch.wait(); +} - return true; +void +zarr::ZarrV2ArrayWriter::close_sinks_() +{ + for (auto i = 0; i < data_sinks_.size(); ++i) { + EXPECT(finalize_sink(std::move(data_sinks_[i])), + "Failed to finalize sink ", + i); + } + data_sinks_.clear(); } bool diff --git a/src/streaming/zarrv2.array.writer.hh b/src/streaming/zarrv2.array.writer.hh index 2a6039a..4c3619a 100644 --- a/src/streaming/zarrv2.array.writer.hh +++ b/src/streaming/zarrv2.array.writer.hh @@ -14,9 +14,12 @@ class ZarrV2ArrayWriter final : public ArrayWriter std::shared_ptr s3_connection_pool); private: - ZarrVersion version_() const override { return ZarrVersion_2; }; - bool flush_impl_() override; - bool write_array_metadata_() override; + std::string data_root_() const override; + std::string metadata_path_() const override; + bool make_data_sinks_() override; bool should_rollover_() const override; + void compress_and_flush_() override; + void close_sinks_() override; + bool write_array_metadata_() override; }; } // namespace zarr diff --git a/src/streaming/zarrv3.array.writer.cpp b/src/streaming/zarrv3.array.writer.cpp index d687076..acd1fb9 100644 --- a/src/streaming/zarrv3.array.writer.cpp +++ b/src/streaming/zarrv3.array.writer.cpp @@ -58,42 +58,155 @@ zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter( std::shared_ptr s3_connection_pool) : ArrayWriter(config, thread_pool, s3_connection_pool) { - const auto number_of_shards = config_.dimensions->number_of_shards(); + const auto n_shards = config_.dimensions->number_of_shards(); + const auto chunks_in_memory = + config_.dimensions->number_of_chunks_in_memory(); const auto chunks_per_shard = config_.dimensions->chunks_per_shard(); - shard_file_offsets_.resize(number_of_shards, 0); - shard_tables_.resize(number_of_shards); + shard_file_offsets_.resize(n_shards, 0); + shard_tables_.resize(n_shards); for (auto& table : shard_tables_) { table.resize(2 * chunks_per_shard); std::fill( table.begin(), table.end(), std::numeric_limits::max()); } + + // get shard indices for each chunk + chunk_in_shards_.resize(n_shards); + for (auto i = 0; i < chunks_in_memory; ++i) { + const auto index = config_.dimensions->shard_index_for_chunk(i); + chunk_in_shards_[index].push_back(i); + } +} + +std::string +zarr::ZarrV3ArrayWriter::data_root_() const +{ + return config_.store_path + "/data/root/" + + std::to_string(config_.level_of_detail) + "/c" + + std::to_string(append_chunk_index_); +} + +std::string +zarr::ZarrV3ArrayWriter::metadata_path_() const +{ + return config_.store_path + "/meta/root/" + + std::to_string(config_.level_of_detail) + ".array.json"; } bool -zarr::ZarrV3ArrayWriter::flush_impl_() +zarr::ZarrV3ArrayWriter::make_data_sinks_() { + const auto data_root = data_root_(); + + if (is_s3_array_()) { + SinkCreator creator(thread_pool_, s3_connection_pool_); + if (!creator.make_data_s3_sinks(*config_.bucket_name, + data_root, + config_.dimensions.get(), + shards_along_dimension, + data_sinks_)) { + LOG_ERROR("Failed to create data sinks in ", + data_root, + " for bucket ", + *config_.bucket_name); + return false; + } + } else { + const auto n_shards = config_.dimensions->number_of_shards(); + const auto chunks_in_memory = + config_.dimensions->number_of_chunks_in_memory(); + const auto chunks_before_flush = chunks_in_memory / n_shards; + const auto chunks_per_shard = config_.dimensions->chunks_per_shard(); + + if (!make_shard_writers(data_root, + chunks_before_flush, + chunks_per_shard, + *config_.dimensions, + thread_pool_, + shard_writers_)) { + LOG_ERROR("Failed to create shard writers in ", data_root); + return false; + } + } + + return true; +} + +void +zarr::ZarrV3ArrayWriter::compress_and_flush_() +{ + if (bytes_to_flush_ == 0) { + LOG_DEBUG("No data to flush"); + return; + } + // create shard files if they don't exist - if (data_sinks_.empty() && !make_data_sinks_()) { - return false; + const auto n_shards = chunk_in_shards_.size(); + + if (is_s3_array_()) { + if (data_sinks_.empty()) { + CHECK(make_data_sinks_()); + } + CHECK(data_sinks_.size() == n_shards); + CHECK(compress_and_flush_to_s3_()); + } else { + if (shard_writers_.empty()) { + CHECK(make_data_sinks_()); + } + CHECK(shard_writers_.size() == n_shards); + CHECK(compress_and_flush_to_filesystem_()); } +} - const auto n_shards = config_.dimensions->number_of_shards(); - CHECK(data_sinks_.size() == n_shards); +bool +zarr::ZarrV3ArrayWriter::compress_and_flush_to_filesystem_() +{ + const auto n_chunks = chunk_buffers_.size(); + std::latch latch(n_chunks); + + for (auto i = 0; i < n_chunks; ++i) { + const auto shard_index = config_.dimensions->shard_index_for_chunk(i); + auto& shard = shard_writers_[shard_index]; + + const auto internal_index = config_.dimensions->shard_internal_index(i); + + auto job = + [this, i, internal_index, &shard, &latch](std::string& err) -> bool { + bool success = true; + if (compress_chunk_buffer_(i)) { // no-op if compression is disabled + shard->add_chunk(&chunk_buffers_[i], internal_index); + } else { + err = "Failed to compress chunk " + std::to_string(i); + success = false; + } + latch.count_down(); - // get shard indices for each chunk - std::vector> chunk_in_shards(n_shards); - for (auto i = 0; i < chunk_buffers_.size(); ++i) { - const auto index = config_.dimensions->shard_index_for_chunk(i); - chunk_in_shards[index].push_back(i); + return success; + }; + + EXPECT(thread_pool_->push_job(std::move(job)), + "Failed to push job to thread pool"); } + latch.wait(); + + return true; +} + +bool +zarr::ZarrV3ArrayWriter::compress_and_flush_to_s3_() +{ + const auto n_shards = chunk_in_shards_.size(); + + const auto write_table = is_finalizing_ || should_rollover_(); + // write out chunks to shards - auto write_table = is_finalizing_ || should_rollover_(); std::latch latch(n_shards); + for (auto i = 0; i < n_shards; ++i) { - const auto& chunks = chunk_in_shards.at(i); + const auto& chunks = chunk_in_shards_.at(i); auto& chunk_table = shard_tables_.at(i); auto* file_offset = &shard_file_offsets_.at(i); @@ -108,6 +221,9 @@ zarr::ZarrV3ArrayWriter::flush_impl_() try { for (const auto& chunk_idx : chunks) { + // no-op if compression is disabled + compress_chunk_buffer_(chunk_idx); + auto& chunk = chunk_buffers_.at(chunk_idx); std::span data{ reinterpret_cast(chunk.data()), chunk.size() }; @@ -157,6 +273,24 @@ zarr::ZarrV3ArrayWriter::flush_impl_() return true; } +void +zarr::ZarrV3ArrayWriter::close_sinks_() +{ + for (auto i = 0; i < data_sinks_.size(); ++i) { + EXPECT(finalize_sink(std::move(data_sinks_[i])), + "Failed to finalize sink ", + i); + } + data_sinks_.clear(); + + for (auto i = 0; i < shard_writers_.size(); ++i) { + EXPECT(finalize_shard_writer(std::move(shard_writers_[i])), + "Failed to finalize shard writer ", + i); + } + shard_writers_.clear(); +} + bool zarr::ZarrV3ArrayWriter::write_array_metadata_() { diff --git a/src/streaming/zarrv3.array.writer.hh b/src/streaming/zarrv3.array.writer.hh index 3ad1b05..e9f52a5 100644 --- a/src/streaming/zarrv3.array.writer.hh +++ b/src/streaming/zarrv3.array.writer.hh @@ -1,6 +1,7 @@ #pragma once #include "array.writer.hh" +#include "shard.writer.hh" namespace zarr { struct ZarrV3ArrayWriter : public ArrayWriter @@ -14,12 +15,21 @@ struct ZarrV3ArrayWriter : public ArrayWriter std::shared_ptr s3_connection_pool); private: + std::vector> shard_writers_; + std::vector shard_file_offsets_; std::vector> shard_tables_; + std::vector> chunk_in_shards_; - ZarrVersion version_() const override { return ZarrVersion_3; } - bool flush_impl_() override; - bool write_array_metadata_() override; + std::string data_root_() const override; + std::string metadata_path_() const override; + bool make_data_sinks_() override; bool should_rollover_() const override; + void compress_and_flush_() override; + void close_sinks_() override; + bool write_array_metadata_() override; + + bool compress_and_flush_to_filesystem_(); + bool compress_and_flush_to_s3_(); }; } // namespace zarr diff --git a/tests/unit-tests/CMakeLists.txt b/tests/unit-tests/CMakeLists.txt index fef3ecf..c6a0f8f 100644 --- a/tests/unit-tests/CMakeLists.txt +++ b/tests/unit-tests/CMakeLists.txt @@ -8,6 +8,7 @@ set(tests array-dimensions-shard-index-for-chunk array-dimensions-shard-internal-index thread-pool-push-to-job-queue + make-dirs s3-connection-bucket-exists s3-connection-object-exists-check-false-positives s3-connection-put-object @@ -22,10 +23,11 @@ set(tests zarrv2-writer-write-even zarrv2-writer-write-ragged-append-dim zarrv2-writer-write-ragged-internal-dim + vectorized-file-write + shard-writer-add-chunk-data-to-flush zarrv3-writer-write-even zarrv3-writer-write-ragged-append-dim zarrv3-writer-write-ragged-internal-dim - vectorized-file-write ) foreach (name ${tests}) diff --git a/tests/unit-tests/array-writer-write-frame-to-chunks.cpp b/tests/unit-tests/array-writer-write-frame-to-chunks.cpp index 9c42634..7fd82f8 100644 --- a/tests/unit-tests/array-writer-write-frame-to-chunks.cpp +++ b/tests/unit-tests/array-writer-write-frame-to-chunks.cpp @@ -13,9 +13,12 @@ class TestWriter : public zarr::ArrayWriter } private: - ZarrVersion version_() const override { return ZarrVersionCount; } + std::string data_root_() const override { return ""; } + std::string metadata_path_() const override { return ""; } + bool make_data_sinks_() override { return true; } bool should_rollover_() const override { return false; } - bool flush_impl_() override { return true; } + void compress_and_flush_() override {} + void close_sinks_() override {} bool write_array_metadata_() override { return true; } }; } // namespace diff --git a/tests/unit-tests/create-stream.cpp b/tests/unit-tests/create-stream.cpp index 2dc8e39..a9f4df5 100644 --- a/tests/unit-tests/create-stream.cpp +++ b/tests/unit-tests/create-stream.cpp @@ -46,21 +46,24 @@ main() memset(&settings, 0, sizeof(settings)); settings.version = ZarrVersion_2; + std::string store_path = + (fs::temp_directory_path() / (TEST ".zarr")).string(); + try { // try to create a stream with no store path stream = ZarrStream_create(&settings); - CHECK(nullptr == stream); + CHECK(!stream); // try to create a stream with no dimensions - settings.store_path = static_cast(TEST ".zarr"); + settings.store_path = store_path.c_str(); stream = ZarrStream_create(&settings); - CHECK(nullptr == stream); + CHECK(!stream); CHECK(!fs::exists(settings.store_path)); // allocate dimensions configure_stream_dimensions(&settings); stream = ZarrStream_create(&settings); - CHECK(nullptr != stream); + CHECK(stream); CHECK(fs::is_directory(settings.store_path)); retval = 0; diff --git a/tests/unit-tests/make-dirs.cpp b/tests/unit-tests/make-dirs.cpp new file mode 100644 index 0000000..f11e119 --- /dev/null +++ b/tests/unit-tests/make-dirs.cpp @@ -0,0 +1,46 @@ +#include "unit.test.macros.hh" +#include "zarr.common.hh" + +#include + +namespace fs = std::filesystem; + +int +main() +{ + int retval = 1; + auto temp_dir = fs::temp_directory_path() / TEST; + + auto thread_pool = std::make_shared( + std::thread::hardware_concurrency(), + [](const std::string& err) { LOG_ERROR("Error: ", err); }); + + std::vector dir_paths = { (temp_dir / "a").string(), + (temp_dir / "b/c").string(), + (temp_dir / "d/e/f").string() }; + + try { + for (const auto& dir_path : dir_paths) { + EXPECT( + !fs::exists(dir_path), "Directory ", dir_path, " already exists"); + } + + EXPECT(zarr::make_dirs(dir_paths, thread_pool), + "Failed to create dirs."); + for (const auto& dir_path : dir_paths) { + EXPECT(fs::is_directory(temp_dir / dir_path), + "Failed to create directory ", + dir_path); + } + retval = 0; + } catch (const std::exception& exc) { + LOG_ERROR("Exception: ", exc.what()); + } + + // cleanup + if (fs::exists(temp_dir)) { + fs::remove_all(temp_dir); + } + + return retval; +} \ No newline at end of file diff --git a/tests/unit-tests/shard-writer-add-chunk-data-to-flush.cpp b/tests/unit-tests/shard-writer-add-chunk-data-to-flush.cpp new file mode 100644 index 0000000..897740b --- /dev/null +++ b/tests/unit-tests/shard-writer-add-chunk-data-to-flush.cpp @@ -0,0 +1,217 @@ +#include "shard.writer.hh" +#include "thread.pool.hh" +#include "unit.test.macros.hh" + +#include +#include +#include +#include + +namespace fs = std::filesystem; + +static constexpr size_t chunk_size = 967; +static constexpr uint32_t chunks_before_flush = 2; +static constexpr uint32_t chunks_per_shard = 8; + +void +verify_file_data(const std::string& filename, size_t file_size) +{ + std::ifstream file(filename, std::ios::binary); + std::vector read_buffer(file_size); + + file.read(reinterpret_cast(read_buffer.data()), file_size); + CHECK(file.good() && file.gcount() == file_size); + + int values[] = { 0, 7, 3, 1, 5, 6, 2, 4 }; + + size_t buf_offset = 0; + for (auto i = 0; i < 8; ++i) { + if (i % chunks_before_flush == 0) { + buf_offset = zarr::align_to_system_boundary(buf_offset); + } + + for (size_t j = buf_offset; j < buf_offset + chunk_size; ++j) { + auto byte = (int)read_buffer[j]; + EXPECT(byte == values[i], + "Data mismatch at offset ", + j, + ". Expected ", + i, + " got ", + byte, + "."); + } + buf_offset += chunk_size; + } + + // check the index table + const auto index_table_size = chunks_per_shard * 2 * sizeof(uint64_t); + CHECK(buf_offset == file_size - index_table_size); + auto* index_table = + reinterpret_cast(read_buffer.data() + buf_offset); + + size_t file_offset = 0; + + // chunk 0 + EXPECT(index_table[0] == file_offset, + "Expected ", + file_offset, + ", got ", + index_table[0]); + EXPECT(index_table[1] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[1]); + + // chunk 7 + EXPECT(index_table[14] == file_offset + chunk_size, + "Expected ", + file_offset + chunk_size, + ", got ", + index_table[14]); + EXPECT(index_table[15] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[15]); + + file_offset = + zarr::align_to_system_boundary(chunks_before_flush * chunk_size); + + // chunk 3 + EXPECT(index_table[6] == file_offset, + "Expected ", + file_offset, + " got ", + index_table[6]); + EXPECT(index_table[7] == chunk_size, + "Expected ", + chunk_size, + ", got ", + index_table[7]); + + // chunk 1 + EXPECT(index_table[2] == file_offset + chunk_size, + "Expected ", + file_offset + chunk_size, + ", got ", + index_table[2]); + EXPECT(index_table[3] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[3]); + + file_offset += + zarr::align_to_system_boundary(chunks_before_flush * chunk_size); + + // chunk 5 + EXPECT(index_table[10] == file_offset, + "Expected ", + file_offset, + ", got ", + index_table[10]); + EXPECT(index_table[11] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[11]); + + // chunk 6 + EXPECT(index_table[12] == file_offset + chunk_size, + "Expected ", + file_offset + chunk_size, + ", got ", + index_table[12]); + EXPECT(index_table[13] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[13]); + + file_offset += + zarr::align_to_system_boundary(chunks_before_flush * chunk_size); + + // chunk 2 + EXPECT(index_table[4] == file_offset, + "Expected ", + file_offset, + ", got ", + index_table[4]); + EXPECT(index_table[5] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[5]); + + // chunk 4 + EXPECT(index_table[8] == file_offset + chunk_size, + "Expected ", + file_offset + chunk_size, + ", got ", + index_table[8]); + EXPECT(index_table[9] == chunk_size, + "Expected ", + chunk_size, + " got ", + index_table[9]); +} + +int +main() +{ + int retval = 1; + + auto shard_file_path = fs::temp_directory_path() / "shard-data.bin"; + zarr::ShardWriterConfig config = { .file_path = shard_file_path.string(), + .chunks_before_flush = + chunks_before_flush, + .chunks_per_shard = chunks_per_shard }; + auto writer = std::make_unique(config); + + const auto index_table_size = chunks_per_shard * 2 * sizeof(uint64_t); + + std::vector> chunk_data(chunks_per_shard); + for (auto i = 0; i < chunks_per_shard; ++i) { + chunk_data[i].resize(chunk_size); + std::fill(chunk_data[i].begin(), chunk_data[i].end(), std::byte(i)); + } + + try { + writer->add_chunk(&chunk_data[0], 0); + writer->add_chunk(&chunk_data[7], 7); + writer->add_chunk(&chunk_data[3], 3); + writer->add_chunk(&chunk_data[1], 1); + writer->add_chunk(&chunk_data[5], 5); + writer->add_chunk(&chunk_data[6], 6); + writer->add_chunk(&chunk_data[2], 2); + writer->add_chunk(&chunk_data[4], 4); + zarr::finalize_shard_writer(std::move(writer)); + + auto expected_file_size = + 3 * zarr::align_to_system_boundary( + chunks_before_flush * chunk_size) + // 3 aligned sets of chunks + chunks_before_flush * chunk_size + // final, unaligned set of chunks + index_table_size; // index table + auto actual_file_size = fs::file_size(shard_file_path); + EXPECT(actual_file_size == expected_file_size, + "Expected a file size of ", + expected_file_size, + ", got ", + actual_file_size); + + verify_file_data(shard_file_path.string(), actual_file_size); + + retval = 0; + } catch (const std::exception& exc) { + LOG_ERROR("Exception: ", exc.what()); + } + + // cleanup + if (fs::exists(shard_file_path)) { + fs::remove(shard_file_path); + } + + return retval; +} \ No newline at end of file diff --git a/tests/unit-tests/sink-creator-make-data-sinks.cpp b/tests/unit-tests/sink-creator-make-data-sinks.cpp index cac596a..4b13a1b 100644 --- a/tests/unit-tests/sink-creator-make-data-sinks.cpp +++ b/tests/unit-tests/sink-creator-make-data-sinks.cpp @@ -56,7 +56,7 @@ sink_creator_make_chunk_sinks(std::shared_ptr thread_pool, // create the sinks, then let them go out of scope to close the handles { std::vector> sinks; - CHECK(sink_creator.make_data_sinks( + CHECK(sink_creator.make_data_file_sinks( test_dir, dimensions, zarr::chunks_along_dimension, sinks)); } @@ -94,11 +94,11 @@ sink_creator_make_chunk_sinks( char data_[] = { 0, 0 }; std::span data(reinterpret_cast(data_), sizeof(data_)); std::vector> sinks; - CHECK(sink_creator.make_data_sinks(bucket_name, - test_dir, - dimensions, - zarr::chunks_along_dimension, - sinks)); + CHECK(sink_creator.make_data_s3_sinks(bucket_name, + test_dir, + dimensions, + zarr::chunks_along_dimension, + sinks)); for (auto& sink : sinks) { CHECK(sink); @@ -142,7 +142,7 @@ sink_creator_make_shard_sinks(std::shared_ptr thread_pool, // create the sinks, then let them go out of scope to close the handles { std::vector> sinks; - CHECK(sink_creator.make_data_sinks( + CHECK(sink_creator.make_data_file_sinks( test_dir, dimensions, zarr::shards_along_dimension, sinks)); } @@ -180,11 +180,11 @@ sink_creator_make_shard_sinks( char data_[] = { 0, 0 }; std::span data(reinterpret_cast(data_), sizeof(data_)); std::vector> sinks; - CHECK(sink_creator.make_data_sinks(bucket_name, - test_dir, - dimensions, - zarr::shards_along_dimension, - sinks)); + CHECK(sink_creator.make_data_s3_sinks(bucket_name, + test_dir, + dimensions, + zarr::shards_along_dimension, + sinks)); for (auto& sink : sinks) { CHECK(sink); diff --git a/tests/unit-tests/sink-creator-make-metadata-sinks.cpp b/tests/unit-tests/sink-creator-make-metadata-sinks.cpp index c400079..00cdbff 100644 --- a/tests/unit-tests/sink-creator-make-metadata-sinks.cpp +++ b/tests/unit-tests/sink-creator-make-metadata-sinks.cpp @@ -52,7 +52,7 @@ sink_creator_make_v2_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, nullptr); std::unordered_map> metadata_sinks; - CHECK(sink_creator.make_metadata_sinks(2, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_file_sinks(2, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 4); CHECK(metadata_sinks.contains(".zattrs")); @@ -82,8 +82,8 @@ sink_creator_make_v2_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, connection_pool); std::unordered_map> metadata_sinks; - CHECK( - sink_creator.make_metadata_sinks(2, bucket_name, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_s3_sinks( + 2, bucket_name, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 4); CHECK(metadata_sinks.contains(".zattrs")); @@ -118,7 +118,7 @@ sink_creator_make_v3_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, nullptr); std::unordered_map> metadata_sinks; - CHECK(sink_creator.make_metadata_sinks(3, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_file_sinks(3, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 3); CHECK(metadata_sinks.contains("zarr.json")); @@ -147,8 +147,8 @@ sink_creator_make_v3_metadata_sinks( zarr::SinkCreator sink_creator(thread_pool, connection_pool); std::unordered_map> metadata_sinks; - CHECK( - sink_creator.make_metadata_sinks(3, bucket_name, test_dir, metadata_sinks)); + CHECK(sink_creator.make_metadata_s3_sinks( + 3, bucket_name, test_dir, metadata_sinks)); CHECK(metadata_sinks.size() == 3); CHECK(metadata_sinks.contains("zarr.json")); diff --git a/tests/unit-tests/unit.test.macros.hh b/tests/unit-tests/unit.test.macros.hh index 302a033..dcb61b7 100644 --- a/tests/unit-tests/unit.test.macros.hh +++ b/tests/unit-tests/unit.test.macros.hh @@ -21,6 +21,13 @@ a_ == b_, "Expected ", #a, " == ", #b, " but ", a_, " != ", b_); \ } while (0) +#define EXPECT_GTE(T, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ >= b_, "Expected ", #a, " < ", #b, " but ", a_, " >= ", b_); \ + } while (0) + #define EXPECT_STR_EQ(a, b) \ do { \ std::string a_ = (a) ? (a) : ""; \ diff --git a/tests/unit-tests/vectorized-file-write.cpp b/tests/unit-tests/vectorized-file-write.cpp index 79ebe6d..20066d5 100644 --- a/tests/unit-tests/vectorized-file-write.cpp +++ b/tests/unit-tests/vectorized-file-write.cpp @@ -1,4 +1,4 @@ -#include "vectorized.file.writer.hh" +#include "platform.hh" #include "unit.test.macros.hh" #include @@ -11,18 +11,19 @@ size_t write_to_file(const std::string& filename) { size_t file_size = 0; - zarr::VectorizedFileWriter writer(filename); + zarr::VectorizedFile file(filename); std::vector> data(10); std::vector> spans(10); for (auto i = 0; i < data.size(); ++i) { - data[i].resize((i + 1) * 1024); + data[i].resize((i + 1) * 967); std::fill(data[i].begin(), data[i].end(), std::byte(i)); file_size += data[i].size(); spans[i] = data[i]; } - CHECK(writer.write_vectors(spans, 0)); + size_t offset = 0; + CHECK(file_write_vectorized(file, spans, offset)); // write more data for (auto i = 0; i < 10; ++i) { @@ -30,9 +31,10 @@ write_to_file(const std::string& filename) std::fill(vec.begin(), vec.end(), std::byte(i + 10)); spans[i] = vec; } - CHECK(writer.write_vectors(spans, file_size)); + offset = zarr::align_to_system_boundary(file_size); + CHECK(file_write_vectorized(file, spans, offset)); - return 2 * file_size; + return offset + file_size; } void @@ -47,7 +49,7 @@ verify_file_data(const std::string& filename, size_t file_size) // Verify data pattern size_t offset = 0; for (size_t i = 0; i < 10; ++i) { - size_t size = (i + 1) * 1024; + size_t size = (i + 1) * 967; for (size_t j = offset; j < offset + size; ++j) { auto byte = (int)read_buffer[j]; @@ -63,8 +65,9 @@ verify_file_data(const std::string& filename, size_t file_size) offset += size; } + offset = zarr::align_to_system_boundary(offset); for (size_t i = 0; i < 10; ++i) { - size_t size = (i + 1) * 1024; + size_t size = (i + 1) * 967; for (size_t j = offset; j < offset + size; ++j) { auto byte = (int)read_buffer[j]; @@ -98,7 +101,7 @@ main() EXPECT(fs::exists(filename), "File not found: ", filename); auto file_size_on_disk = fs::file_size(filename); - EXPECT(file_size_on_disk >= file_size, // sum(1:10) * 1024 * 2 + EXPECT(file_size_on_disk >= file_size, "Expected file size of at least ", file_size, " bytes, got ", diff --git a/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp b/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp index 0affa4e..a9690b2 100644 --- a/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp +++ b/tests/unit-tests/zarrv3-writer-write-ragged-internal-dim.cpp @@ -117,7 +117,8 @@ main() "x", ZarrDimensionType_Space, array_width, chunk_width, shard_width); zarr::ArrayWriterConfig config = { - .dimensions = std::make_shared(std::move(dims), dtype), + .dimensions = + std::make_shared(std::move(dims), dtype), .dtype = dtype, .level_of_detail = 5, .bucket_name = std::nullopt, @@ -130,7 +131,8 @@ main() std::move(config), thread_pool); const size_t frame_size = array_width * array_height * nbytes_px; - std::vector data(frame_size, std::byte(0));; + std::vector data(frame_size, std::byte(0)); + ; for (auto i = 0; i < n_frames; ++i) { // 2 time points CHECK(writer->write_frame(data)); @@ -144,10 +146,9 @@ main() const auto index_size = chunks_per_shard * sizeof(uint64_t) * // indices are 64 bits 2; // 2 indices per chunk - const auto expected_file_size = shard_width * shard_height * - shard_planes * shard_timepoints * - chunk_size + - index_size; + const auto min_file_size = shard_width * shard_height * shard_planes * + shard_timepoints * chunk_size + + index_size; const fs::path data_root = base_dir / "data/root" / std::to_string(config.level_of_detail); @@ -168,7 +169,7 @@ main() const auto x_file = y_dir / std::to_string(x); CHECK(fs::is_regular_file(x_file)); const auto file_size = fs::file_size(x_file); - EXPECT_EQ(int, file_size, expected_file_size); + EXPECT_GTE(int, file_size, min_file_size); } CHECK(!fs::is_regular_file(y_dir /