From 45429fb18b65e102c8099b5e01520fd8db626c35 Mon Sep 17 00:00:00 2001 From: Alan Liddell Date: Thu, 21 Sep 2023 18:05:25 -0400 Subject: [PATCH] Account for early-morning delirium. --- CHANGELOG.md | 10 ++++++++++ src/prelude.h | 4 ++-- src/writers/chunk.writer.cpp | 18 +++++++----------- src/writers/shard.writer.cpp | 35 ++++++++++++++++------------------- src/writers/writer.cpp | 3 +++ src/writers/writer.hh | 1 + tests/write-zarr-v3-raw.cpp | 12 ++++++------ 7 files changed, 45 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc4bbfc0..c67fa205 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support for [Zarr v3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html). +- Support for + the [sharding storage transformer](https://web.archive.org/web/20230213221154/https://zarr-specs.readthedocs.io/en/latest/extensions/storage-transformers/sharding/v1.0.html) + in Zarr v3. +- Ship debug libs for C-Blosc on Linux and Mac. + +### Changed + +- Upgrades C-Blosc from v1.21.4 to v1.21.5. ### Fixed - A bug where enabling multiscale without specifying the tile size would cause an error. +- Exceptions thrown off the main thread are now caught and logged, and Zarr throws an error in `append`. +- Job queue is now cleared after every operation. ## [0.1.4](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.3...v0.1.4) - 2023-08-11 diff --git a/src/prelude.h b/src/prelude.h index 824b842d..603496cd 100644 --- a/src/prelude.h +++ b/src/prelude.h @@ -19,8 +19,8 @@ } while (0) #define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) -#define TRACE(...) LOG(__VA_ARGS__) -//#define TRACE(...) +//#define TRACE(...) LOG(__VA_ARGS__) +#define TRACE(...) #define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V))) #define countof(e) (sizeof(e) / sizeof(*(e))) diff --git a/src/writers/chunk.writer.cpp b/src/writers/chunk.writer.cpp index c64c8b41..c15f335f 100644 --- a/src/writers/chunk.writer.cpp +++ b/src/writers/chunk.writer.cpp @@ -12,13 +12,9 @@ zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims, const Zarr* zarr) : Writer(frame_dims, tile_dims, frames_per_chunk, data_root, zarr) { - // pare down the number of threads if we have too many - while (threads_.size() > tiles_per_frame_()) { - threads_.pop_back(); - } - // spin up threads for (auto& ctx : threads_) { + ctx.ready = true; ctx.should_stop = false; ctx.thread = std::thread([this, capture0 = &ctx] { worker_thread_(capture0); }); @@ -38,13 +34,9 @@ zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims, zarr, compression_params) { - // pare down the number of threads if we have too many - while (threads_.size() > tiles_per_frame_()) { - threads_.pop_back(); - } - // spin up threads for (auto& ctx : threads_) { + ctx.ready = true; ctx.should_stop = false; ctx.thread = std::thread([this, capture0 = &ctx] { worker_thread_(capture0); }); @@ -182,7 +174,11 @@ zarr::ChunkWriter::flush_() noexcept // wait for all writers to finish while (!jobs_.empty()) { - std::this_thread::sleep_for(2ms); + std::this_thread::sleep_for(500us); + } + for (auto& ctx : threads_) { + std::unique_lock lock(ctx.mutex); + ctx.cv.wait(lock, [&ctx] { return ctx.ready; }); } // reset buffers diff --git a/src/writers/shard.writer.cpp b/src/writers/shard.writer.cpp index 609f8d4f..29be0767 100644 --- a/src/writers/shard.writer.cpp +++ b/src/writers/shard.writer.cpp @@ -2,6 +2,7 @@ #include "writer.hh" #include +#include namespace zarr = acquire::sink::zarr; @@ -19,13 +20,9 @@ zarr::ShardWriter::ShardWriter(const ImageDims& frame_dims, shards_per_frame_y_ = std::ceil((float)frame_dims.rows / (float)shard_dims.rows); - // pare down the number of threads if we have too many - while (threads_.size() > shards_per_frame_()) { - threads_.pop_back(); - } - // spin up threads for (auto& ctx : threads_) { + ctx.ready = true; ctx.should_stop = false; ctx.thread = std::thread([this, capture0 = &ctx] { worker_thread_(capture0); }); @@ -52,13 +49,9 @@ zarr::ShardWriter::ShardWriter(const ImageDims& frame_dims, shards_per_frame_y_ = std::ceil((float)frame_dims.rows / (float)shard_dims.rows); - // pare down the number of threads if we have too many - while (threads_.size() > shards_per_frame_()) { - threads_.pop_back(); - } - // spin up threads for (auto& ctx : threads_) { + ctx.ready = true; ctx.should_stop = false; ctx.thread = std::thread([this, capture0 = &ctx] { worker_thread_(capture0); }); @@ -125,6 +118,7 @@ zarr::ShardWriter::make_buffers_() noexcept const auto bytes_of_type = common::bytes_of_type(pixel_type_); const auto bytes_per_tile = tile_dims_.cols * tile_dims_.rows * bytes_of_type; + const auto bytes_per_chunk = bytes_per_tile * frames_per_chunk_; for (auto& buf : chunk_buffers_) { buf.resize(frames_per_chunk_ * bytes_per_tile); @@ -133,7 +127,7 @@ zarr::ShardWriter::make_buffers_() noexcept shard_buffers_.resize(shards_per_frame_()); for (auto& buf : shard_buffers_) { - buf.resize(chunks_per_shard_() * bytes_per_tile // data + buf.resize(chunks_per_shard_() * bytes_per_chunk // data + 2 * chunks_per_shard_() * sizeof(uint64_t) // indices ); } @@ -205,6 +199,7 @@ zarr::ShardWriter::flush_() noexcept // compress buffers auto chunk_sizes = compress_buffers_(); + const size_t index_size = 2 * chunks_per_shard * sizeof(uint64_t); // concatenate chunks into shards std::vector shard_sizes; @@ -213,8 +208,6 @@ zarr::ShardWriter::flush_() noexcept size_t shard_size = 0; std::vector chunk_indices; - const size_t index_size = 2 * chunks_per_shard * sizeof(uint64_t); - for (auto j = 0; j < chunks_per_shard; ++j) { chunk_indices.push_back(shard_size); // chunk index const auto k = i * chunks_per_shard + j; @@ -245,18 +238,22 @@ zarr::ShardWriter::flush_() noexcept { std::scoped_lock lock(mutex_); for (auto i = 0; i < files_.size(); ++i) { - auto& buf = shard_buffers_.at(i); + const auto& shard = shard_buffers_.at(i); jobs_.push([fh = &files_.at(i), - data = buf.data(), + shard = shard.data(), size = shard_sizes.at(i)]() -> bool { - return (bool)file_write(fh, 0, data, data + size); + return (bool)file_write(fh, 0, shard, shard + size); }); } } // wait for all writers to finish while (!jobs_.empty()) { - std::this_thread::sleep_for(2ms); + std::this_thread::sleep_for(500us); + } + for (auto& ctx : threads_) { + std::unique_lock lock(ctx.mutex); + ctx.cv.wait(lock, [&ctx] { return ctx.ready; }); } // reset buffers @@ -273,8 +270,8 @@ zarr::ShardWriter::flush_() noexcept const auto bytes_per_shard = bytes_per_chunk * chunks_per_shard; for (auto& buf : shard_buffers_) { // absurd edge case we need to account for - if (buf.size() > bytes_per_shard) { - buf.resize(bytes_per_shard); + if (buf.size() > bytes_per_shard + index_size) { + buf.resize(bytes_per_shard + index_size); } std::fill(buf.begin(), buf.end(), 0); diff --git a/src/writers/writer.cpp b/src/writers/writer.cpp index 772a36e2..efa54184 100644 --- a/src/writers/writer.cpp +++ b/src/writers/writer.cpp @@ -239,11 +239,14 @@ zarr::Writer::worker_thread_(ThreadContext* ctx) noexcept } if (auto job = pop_from_job_queue(); job.has_value()) { + ctx->ready = false; if (!job.value()()) { zarr_->error_ = true; zarr_->error_msg_ = "Job failed"; } + ctx->ready = true; lock.unlock(); + ctx->cv.notify_one(); } else { lock.unlock(); std::this_thread::sleep_for(1ms); diff --git a/src/writers/writer.hh b/src/writers/writer.hh index 13c2490e..44069c13 100644 --- a/src/writers/writer.hh +++ b/src/writers/writer.hh @@ -32,6 +32,7 @@ struct Writer std::mutex mutex; std::condition_variable cv; bool should_stop; + bool ready; }; Writer() = delete; diff --git a/tests/write-zarr-v3-raw.cpp b/tests/write-zarr-v3-raw.cpp index d9dfcff8..7463988e 100644 --- a/tests/write-zarr-v3-raw.cpp +++ b/tests/write-zarr-v3-raw.cpp @@ -71,12 +71,12 @@ reporter(int is_error, a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \ } while (0) -const static uint32_t frame_width = 1920; +const static uint32_t frame_width = 1080; const static uint32_t tile_width = frame_width / 4; -const static uint32_t frame_height = 1080; +const static uint32_t frame_height = 960; const static uint32_t tile_height = frame_height / 3; -const static uint32_t expected_frames_per_chunk = 97; -const static uint32_t max_frame_count = 200; +const static uint32_t expected_frames_per_chunk = 48; +const static uint32_t max_frame_count = 48; void setup(AcquireRuntime* runtime) @@ -91,7 +91,7 @@ setup(AcquireRuntime* runtime) DEVOK(device_manager_select(dm, DeviceKind_Camera, - SIZED("simulated.*empty.*"), + SIZED("simulated.*radial.*"), &props.video[0].camera.identifier)); DEVOK(device_manager_select(dm, DeviceKind_Storage, @@ -117,7 +117,7 @@ setup(AcquireRuntime* runtime) props.video[0].camera.settings.shape = { .x = frame_width, .y = frame_height }; // we may drop frames with lower exposure -// props.video[0].camera.settings.exposure_time_us = 1e5; + props.video[0].camera.settings.exposure_time_us = 1e5; props.video[0].max_frame_count = max_frame_count; OK(acquire_configure(runtime, &props));