Skip to content

Commit

Permalink
fix test timeout (#158)
Browse files Browse the repository at this point in the history
A glance at the
[actions](https://github.com/acquire-project/acquire-driver-zarr/actions)
lately shows that all testing is timing out on Windows only, and on
`test-get-meta` only. I can't reproduce this on my machine, but I
suspect this is because threads are being shut down before they have a
chance to properly start, since we're not actually doing anything with
them in that test, so they're getting left in a bad state where they're
joinable but we can't join them. So instead of spinning up threads on
the thread pool's construction, I do it in `Zarr::start`. That seems to
fix the problem.
  • Loading branch information
aliddell authored Nov 17, 2023
1 parent 5ed03f5 commit 2e81b40
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
30 changes: 23 additions & 7 deletions src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,36 @@ namespace common = acquire::sink::zarr::common;
common::ThreadPool::ThreadPool(size_t n_threads,
std::function<void(const std::string&)> err)
: error_handler_{ err }
, started_{ false }
, should_stop_{ false }
{
if (n_threads == 0) {
throw std::runtime_error("Cannot create thread pool with 0 threads.");
}

for (auto i = 0; i < n_threads; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
n_threads_ = std::clamp(
n_threads,
(size_t)1,
(size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1));
}

common::ThreadPool::~ThreadPool() noexcept
{
await_stop();
}

void
common::ThreadPool::start()
{
EXPECT(!started_, "Thread pool already started.");

for (auto i = 0; i < n_threads_; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
started_ = true;
}

void
common::ThreadPool::push_to_job_queue(JobT&& job)
{
EXPECT(started_, "Cannot push to job queue before starting.");

std::unique_lock lock(jobs_mutex_);
jobs_.push(std::move(job));
lock.unlock();
Expand All @@ -38,6 +50,10 @@ common::ThreadPool::push_to_job_queue(JobT&& job)
void
common::ThreadPool::await_stop() noexcept
{
if (!started_) {
return;
}

should_stop_ = true;
cv_.notify_all();

Expand Down
4 changes: 3 additions & 1 deletion src/common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,20 @@ struct ThreadPool final
ThreadPool(size_t n_threads, std::function<void(const std::string&)> err);
~ThreadPool() noexcept;

void start();
void push_to_job_queue(JobT&& job);

void await_stop() noexcept;

private:
std::function<void(const std::string&)> error_handler_;

size_t n_threads_;
std::vector<std::thread> threads_;
mutable std::mutex jobs_mutex_;
std::condition_variable cv_;
std::queue<JobT> jobs_;

bool started_;
std::atomic<bool> should_stop_;

/// Multithreading
Expand Down
2 changes: 2 additions & 0 deletions src/zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ void
zarr::Zarr::start()
{
error_ = true;
thread_pool_->start();

if (fs::exists(dataset_root_)) {
std::error_code ec;
EXPECT(fs::remove_all(dataset_root_, ec),
Expand Down

0 comments on commit 2e81b40

Please sign in to comment.