Skip to content

Commit

Permalink
Account for early-morning delirium.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Sep 21, 2023
1 parent 2400daf commit 45429fb
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 38 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Support for [Zarr v3](https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html).
- Support for
the [sharding storage transformer](https://web.archive.org/web/20230213221154/https://zarr-specs.readthedocs.io/en/latest/extensions/storage-transformers/sharding/v1.0.html)
in Zarr v3.
- Ship debug libs for C-Blosc on Linux and Mac.

### Changed

- Upgrades C-Blosc from v1.21.4 to v1.21.5.

### Fixed

- A bug where enabling multiscale without specifying the tile size would cause an error.
- Exceptions thrown off the main thread are now caught and logged, and Zarr throws an error in `append`.
- Job queue is now cleared after every operation.

## [0.1.4](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.3...v0.1.4) - 2023-08-11

Expand Down
4 changes: 2 additions & 2 deletions src/prelude.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
} while (0)
#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e)

#define TRACE(...) LOG(__VA_ARGS__)
//#define TRACE(...)
//#define TRACE(...) LOG(__VA_ARGS__)
#define TRACE(...)

#define containerof(ptr, T, V) ((T*)(((char*)(ptr)) - offsetof(T, V)))
#define countof(e) (sizeof(e) / sizeof(*(e)))
Expand Down
18 changes: 7 additions & 11 deletions src/writers/chunk.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@ zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims,
const Zarr* zarr)
: Writer(frame_dims, tile_dims, frames_per_chunk, data_root, zarr)
{
// pare down the number of threads if we have too many
while (threads_.size() > tiles_per_frame_()) {
threads_.pop_back();
}

// spin up threads
for (auto& ctx : threads_) {
ctx.ready = true;
ctx.should_stop = false;
ctx.thread =
std::thread([this, capture0 = &ctx] { worker_thread_(capture0); });
Expand All @@ -38,13 +34,9 @@ zarr::ChunkWriter::ChunkWriter(const ImageDims& frame_dims,
zarr,
compression_params)
{
// pare down the number of threads if we have too many
while (threads_.size() > tiles_per_frame_()) {
threads_.pop_back();
}

// spin up threads
for (auto& ctx : threads_) {
ctx.ready = true;
ctx.should_stop = false;
ctx.thread =
std::thread([this, capture0 = &ctx] { worker_thread_(capture0); });
Expand Down Expand Up @@ -182,7 +174,11 @@ zarr::ChunkWriter::flush_() noexcept

// wait for all writers to finish
while (!jobs_.empty()) {
std::this_thread::sleep_for(2ms);
std::this_thread::sleep_for(500us);
}
for (auto& ctx : threads_) {
std::unique_lock lock(ctx.mutex);
ctx.cv.wait(lock, [&ctx] { return ctx.ready; });
}

// reset buffers
Expand Down
35 changes: 16 additions & 19 deletions src/writers/shard.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "writer.hh"

#include <stdexcept>
#include <iostream>

namespace zarr = acquire::sink::zarr;

Expand All @@ -19,13 +20,9 @@ zarr::ShardWriter::ShardWriter(const ImageDims& frame_dims,
shards_per_frame_y_ =
std::ceil((float)frame_dims.rows / (float)shard_dims.rows);

// pare down the number of threads if we have too many
while (threads_.size() > shards_per_frame_()) {
threads_.pop_back();
}

// spin up threads
for (auto& ctx : threads_) {
ctx.ready = true;
ctx.should_stop = false;
ctx.thread =
std::thread([this, capture0 = &ctx] { worker_thread_(capture0); });
Expand All @@ -52,13 +49,9 @@ zarr::ShardWriter::ShardWriter(const ImageDims& frame_dims,
shards_per_frame_y_ =
std::ceil((float)frame_dims.rows / (float)shard_dims.rows);

// pare down the number of threads if we have too many
while (threads_.size() > shards_per_frame_()) {
threads_.pop_back();
}

// spin up threads
for (auto& ctx : threads_) {
ctx.ready = true;
ctx.should_stop = false;
ctx.thread =
std::thread([this, capture0 = &ctx] { worker_thread_(capture0); });
Expand Down Expand Up @@ -125,6 +118,7 @@ zarr::ShardWriter::make_buffers_() noexcept
const auto bytes_of_type = common::bytes_of_type(pixel_type_);
const auto bytes_per_tile =
tile_dims_.cols * tile_dims_.rows * bytes_of_type;
const auto bytes_per_chunk = bytes_per_tile * frames_per_chunk_;

for (auto& buf : chunk_buffers_) {
buf.resize(frames_per_chunk_ * bytes_per_tile);
Expand All @@ -133,7 +127,7 @@ zarr::ShardWriter::make_buffers_() noexcept

shard_buffers_.resize(shards_per_frame_());
for (auto& buf : shard_buffers_) {
buf.resize(chunks_per_shard_() * bytes_per_tile // data
buf.resize(chunks_per_shard_() * bytes_per_chunk // data
+ 2 * chunks_per_shard_() * sizeof(uint64_t) // indices
);
}
Expand Down Expand Up @@ -205,6 +199,7 @@ zarr::ShardWriter::flush_() noexcept

// compress buffers
auto chunk_sizes = compress_buffers_();
const size_t index_size = 2 * chunks_per_shard * sizeof(uint64_t);

// concatenate chunks into shards
std::vector<size_t> shard_sizes;
Expand All @@ -213,8 +208,6 @@ zarr::ShardWriter::flush_() noexcept
size_t shard_size = 0;
std::vector<uint64_t> chunk_indices;

const size_t index_size = 2 * chunks_per_shard * sizeof(uint64_t);

for (auto j = 0; j < chunks_per_shard; ++j) {
chunk_indices.push_back(shard_size); // chunk index
const auto k = i * chunks_per_shard + j;
Expand Down Expand Up @@ -245,18 +238,22 @@ zarr::ShardWriter::flush_() noexcept
{
std::scoped_lock lock(mutex_);
for (auto i = 0; i < files_.size(); ++i) {
auto& buf = shard_buffers_.at(i);
const auto& shard = shard_buffers_.at(i);
jobs_.push([fh = &files_.at(i),
data = buf.data(),
shard = shard.data(),
size = shard_sizes.at(i)]() -> bool {
return (bool)file_write(fh, 0, data, data + size);
return (bool)file_write(fh, 0, shard, shard + size);
});
}
}

// wait for all writers to finish
while (!jobs_.empty()) {
std::this_thread::sleep_for(2ms);
std::this_thread::sleep_for(500us);
}
for (auto& ctx : threads_) {
std::unique_lock lock(ctx.mutex);
ctx.cv.wait(lock, [&ctx] { return ctx.ready; });
}

// reset buffers
Expand All @@ -273,8 +270,8 @@ zarr::ShardWriter::flush_() noexcept
const auto bytes_per_shard = bytes_per_chunk * chunks_per_shard;
for (auto& buf : shard_buffers_) {
// absurd edge case we need to account for
if (buf.size() > bytes_per_shard) {
buf.resize(bytes_per_shard);
if (buf.size() > bytes_per_shard + index_size) {
buf.resize(bytes_per_shard + index_size);
}

std::fill(buf.begin(), buf.end(), 0);
Expand Down
3 changes: 3 additions & 0 deletions src/writers/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,14 @@ zarr::Writer::worker_thread_(ThreadContext* ctx) noexcept
}

if (auto job = pop_from_job_queue(); job.has_value()) {
ctx->ready = false;
if (!job.value()()) {
zarr_->error_ = true;
zarr_->error_msg_ = "Job failed";
}
ctx->ready = true;
lock.unlock();
ctx->cv.notify_one();
} else {
lock.unlock();
std::this_thread::sleep_for(1ms);
Expand Down
1 change: 1 addition & 0 deletions src/writers/writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct Writer
std::mutex mutex;
std::condition_variable cv;
bool should_stop;
bool ready;
};

Writer() = delete;
Expand Down
12 changes: 6 additions & 6 deletions tests/write-zarr-v3-raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ reporter(int is_error,
a_ > b_, "Expected (%s) > (%s) but " fmt "<=" fmt, #a, #b, a_, b_); \
} while (0)

const static uint32_t frame_width = 1920;
const static uint32_t frame_width = 1080;
const static uint32_t tile_width = frame_width / 4;
const static uint32_t frame_height = 1080;
const static uint32_t frame_height = 960;
const static uint32_t tile_height = frame_height / 3;
const static uint32_t expected_frames_per_chunk = 97;
const static uint32_t max_frame_count = 200;
const static uint32_t expected_frames_per_chunk = 48;
const static uint32_t max_frame_count = 48;

void
setup(AcquireRuntime* runtime)
Expand All @@ -91,7 +91,7 @@ setup(AcquireRuntime* runtime)

DEVOK(device_manager_select(dm,
DeviceKind_Camera,
SIZED("simulated.*empty.*"),
SIZED("simulated.*radial.*"),
&props.video[0].camera.identifier));
DEVOK(device_manager_select(dm,
DeviceKind_Storage,
Expand All @@ -117,7 +117,7 @@ setup(AcquireRuntime* runtime)
props.video[0].camera.settings.shape = { .x = frame_width,
.y = frame_height };
// we may drop frames with lower exposure
// props.video[0].camera.settings.exposure_time_us = 1e5;
props.video[0].camera.settings.exposure_time_us = 1e5;
props.video[0].max_frame_count = max_frame_count;

OK(acquire_configure(runtime, &props));
Expand Down

0 comments on commit 45429fb

Please sign in to comment.