Skip to content

Commit

Permalink
Fix test timeout redux (#161)
Browse files Browse the repository at this point in the history
- Fixes a bug in the Zarr V3 writer where external metadata was tacitly
assumed to be nonempty.
- Removes explicit thread start from `ThreadPool` in favor of start on
construction.
- Aborts thread jobs in the destructor only, allowing any job that's
made it on to the queue to finish before shutting down the thread pool
in the usual case.
aliddell authored Nov 20, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 0432249 commit 7d814dc
Showing 6 changed files with 41 additions and 33 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## [0.1.5](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.3...v0.1.4) - 2023-11-20

### Added

47 changes: 26 additions & 21 deletions src/common.cpp
Original file line number Diff line number Diff line change
@@ -10,37 +10,36 @@ namespace common = acquire::sink::zarr::common;
common::ThreadPool::ThreadPool(size_t n_threads,
std::function<void(const std::string&)> err)
: error_handler_{ err }
, started_{ false }
, should_stop_{ false }
, is_accepting_jobs_{ true }
{
n_threads_ = std::clamp(
n_threads = std::clamp(
n_threads,
(size_t)1,
(size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1));
}

common::ThreadPool::~ThreadPool() noexcept
{
await_stop();
for (auto i = 0; i < n_threads; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
}

void
common::ThreadPool::start()
common::ThreadPool::~ThreadPool() noexcept
{
EXPECT(!started_, "Thread pool already started.");

for (auto i = 0; i < n_threads_; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
{
std::scoped_lock lock(jobs_mutex_);
while (!jobs_.empty()) {
jobs_.pop();
}
}
started_ = true;

await_stop();
}

void
common::ThreadPool::push_to_job_queue(JobT&& job)
{
EXPECT(started_, "Cannot push to job queue before starting.");

std::unique_lock lock(jobs_mutex_);
CHECK(is_accepting_jobs_);

jobs_.push(std::move(job));
lock.unlock();

@@ -50,11 +49,11 @@ common::ThreadPool::push_to_job_queue(JobT&& job)
void
common::ThreadPool::await_stop() noexcept
{
if (!started_) {
return;
{
std::scoped_lock lock(jobs_mutex_);
is_accepting_jobs_ = false;
}

should_stop_ = true;
cv_.notify_all();

// spin down threads
@@ -77,16 +76,22 @@ common::ThreadPool::pop_from_job_queue_() noexcept
return job;
}

bool
common::ThreadPool::should_stop_() const noexcept
{
return !is_accepting_jobs_ && jobs_.empty();
}

void
common::ThreadPool::thread_worker_()
{
TRACE("Worker thread starting.");

while (true) {
std::unique_lock lock(jobs_mutex_);
cv_.wait(lock, [&] { return should_stop_ || !jobs_.empty(); });
cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); });

if (should_stop_) {
if (should_stop_()) {
break;
}

13 changes: 8 additions & 5 deletions src/common.hh
Original file line number Diff line number Diff line change
@@ -60,24 +60,27 @@ struct ThreadPool final
ThreadPool(size_t n_threads, std::function<void(const std::string&)> err);
~ThreadPool() noexcept;

void start();
void push_to_job_queue(JobT&& job);

/**
* @brief Block until all jobs on the queue have processed, then spin down
* the threads.
* @note After calling this function, the job queue no longer accepts jobs.
*/
void await_stop() noexcept;

private:
std::function<void(const std::string&)> error_handler_;

size_t n_threads_;
std::vector<std::thread> threads_;
mutable std::mutex jobs_mutex_;
std::condition_variable cv_;
std::queue<JobT> jobs_;

bool started_;
std::atomic<bool> should_stop_;
std::atomic<bool> is_accepting_jobs_;

/// Multithreading
std::optional<common::ThreadPool::JobT> pop_from_job_queue_() noexcept;
[[nodiscard]] bool should_stop_() const noexcept;
void thread_worker_();
};
size_t
1 change: 0 additions & 1 deletion src/zarr.cpp
Original file line number Diff line number Diff line change
@@ -404,7 +404,6 @@ void
zarr::Zarr::start()
{
error_ = true;
thread_pool_->start();

if (fs::exists(dataset_root_)) {
std::error_code ec;
4 changes: 3 additions & 1 deletion src/zarr.v3.cpp
Original file line number Diff line number Diff line change
@@ -302,7 +302,9 @@ zarr::ZarrV3::write_group_metadata_() const
using json = nlohmann::json;

json metadata;
metadata["attributes"]["acquire"] = json::parse(external_metadata_json_);
metadata["attributes"]["acquire"] =
external_metadata_json_.empty() ? ""
: json::parse(external_metadata_json_);

auto path = (dataset_root_ / "meta" / "root.group.json").string();
common::write_string(path, metadata.dump(4));
7 changes: 3 additions & 4 deletions tests/write-zarr-v3-raw.cpp
Original file line number Diff line number Diff line change
@@ -98,15 +98,14 @@ setup(AcquireRuntime* runtime)
SIZED("ZarrV3"),
&props.video[0].storage.identifier));

const char external_metadata[] = R"({"hello":"world"})";
const struct PixelScale sample_spacing_um = { 1, 1 };

storage_properties_init(&props.video[0].storage.settings,
0,
(char*)filename,
strlen(filename) + 1,
(char*)external_metadata,
sizeof(external_metadata),
nullptr,
0,
sample_spacing_um);

storage_properties_set_chunking_props(&props.video[0].storage.settings,
@@ -217,7 +216,7 @@ validate(AcquireRuntime* runtime)

f = std::ifstream(metadata_path);
metadata = json::parse(f);
CHECK("world" == metadata["attributes"]["acquire"]["hello"]);
CHECK("" == metadata["attributes"]["acquire"]);

// check the array metadata file
metadata_path = test_path / "meta" / "root" / "0.array.json";

0 comments on commit 7d814dc

Please sign in to comment.