Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test timeout redux #161

Merged
merged 15 commits into from
Nov 20, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## [0.1.5](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.3...v0.1.4) - 2023-11-20

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you're planning to tag the commit associated with merging this PR with v0.1.5? If so, looks good.


### Added

Expand Down
47 changes: 26 additions & 21 deletions src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,36 @@ namespace common = acquire::sink::zarr::common;
common::ThreadPool::ThreadPool(size_t n_threads,
std::function<void(const std::string&)> err)
: error_handler_{ err }
, started_{ false }
, should_stop_{ false }
, is_accepting_jobs_{ true }
{
n_threads_ = std::clamp(
n_threads = std::clamp(
n_threads,
(size_t)1,
(size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1));
}

common::ThreadPool::~ThreadPool() noexcept
{
await_stop();
for (auto i = 0; i < n_threads; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
}

void
common::ThreadPool::start()
common::ThreadPool::~ThreadPool() noexcept
{
EXPECT(!started_, "Thread pool already started.");

for (auto i = 0; i < n_threads_; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
{
std::scoped_lock lock(jobs_mutex_);
while (!jobs_.empty()) {
jobs_.pop();
}
}
started_ = true;

await_stop();
}

void
common::ThreadPool::push_to_job_queue(JobT&& job)
{
EXPECT(started_, "Cannot push to job queue before starting.");

std::unique_lock lock(jobs_mutex_);
CHECK(is_accepting_jobs_);

jobs_.push(std::move(job));
lock.unlock();

Expand All @@ -50,11 +49,11 @@ common::ThreadPool::push_to_job_queue(JobT&& job)
void
common::ThreadPool::await_stop() noexcept
{
if (!started_) {
return;
{
std::scoped_lock lock(jobs_mutex_);
is_accepting_jobs_ = false;
}

should_stop_ = true;
cv_.notify_all();

// spin down threads
Expand All @@ -77,16 +76,22 @@ common::ThreadPool::pop_from_job_queue_() noexcept
return job;
}

bool
common::ThreadPool::should_stop_() const noexcept
{
return !is_accepting_jobs_ && jobs_.empty();
}

void
common::ThreadPool::thread_worker_()
{
TRACE("Worker thread starting.");

while (true) {
std::unique_lock lock(jobs_mutex_);
cv_.wait(lock, [&] { return should_stop_ || !jobs_.empty(); });
cv_.wait(lock, [&] { return should_stop_() || !jobs_.empty(); });

if (should_stop_) {
if (should_stop_()) {
break;
}

Expand Down
13 changes: 8 additions & 5 deletions src/common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,27 @@ struct ThreadPool final
ThreadPool(size_t n_threads, std::function<void(const std::string&)> err);
~ThreadPool() noexcept;

void start();
void push_to_job_queue(JobT&& job);

/**
* @brief Block until all jobs on the queue have processed, then spin down
* the threads.
* @note After calling this function, the job queue no longer accepts jobs.
*/
void await_stop() noexcept;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional

Add a comment here to explain that calling this will wait/block until pending jobs are complete. Also, after this is complete, no new jobs can be submitted.


private:
std::function<void(const std::string&)> error_handler_;

size_t n_threads_;
std::vector<std::thread> threads_;
mutable std::mutex jobs_mutex_;
std::condition_variable cv_;
std::queue<JobT> jobs_;

bool started_;
std::atomic<bool> should_stop_;
std::atomic<bool> is_accepting_jobs_;

/// Multithreading
std::optional<common::ThreadPool::JobT> pop_from_job_queue_() noexcept;
[[nodiscard]] bool should_stop_() const noexcept;
void thread_worker_();
};
size_t
Expand Down
1 change: 0 additions & 1 deletion src/zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ void
zarr::Zarr::start()
{
error_ = true;
thread_pool_->start();

if (fs::exists(dataset_root_)) {
std::error_code ec;
Expand Down
4 changes: 3 additions & 1 deletion src/zarr.v3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ zarr::ZarrV3::write_group_metadata_() const
using json = nlohmann::json;

json metadata;
metadata["attributes"]["acquire"] = json::parse(external_metadata_json_);
metadata["attributes"]["acquire"] =
external_metadata_json_.empty() ? ""
: json::parse(external_metadata_json_);

auto path = (dataset_root_ / "meta" / "root.group.json").string();
common::write_string(path, metadata.dump(4));
Expand Down
7 changes: 3 additions & 4 deletions tests/write-zarr-v3-raw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ setup(AcquireRuntime* runtime)
SIZED("ZarrV3"),
&props.video[0].storage.identifier));

const char external_metadata[] = R"({"hello":"world"})";
const struct PixelScale sample_spacing_um = { 1, 1 };

storage_properties_init(&props.video[0].storage.settings,
0,
(char*)filename,
strlen(filename) + 1,
(char*)external_metadata,
sizeof(external_metadata),
nullptr,
0,
sample_spacing_um);

storage_properties_set_chunking_props(&props.video[0].storage.settings,
Expand Down Expand Up @@ -217,7 +216,7 @@ validate(AcquireRuntime* runtime)

f = std::ifstream(metadata_path);
metadata = json::parse(f);
CHECK("world" == metadata["attributes"]["acquire"]["hello"]);
CHECK("" == metadata["attributes"]["acquire"]);

// check the array metadata file
metadata_path = test_path / "meta" / "root" / "0.array.json";
Expand Down
Loading