Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Nov 17, 2023
1 parent 334d16f commit ebfba1f
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 23 deletions.
24 changes: 5 additions & 19 deletions src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,26 @@ namespace common = acquire::sink::zarr::common;
common::ThreadPool::ThreadPool(size_t n_threads,
std::function<void(const std::string&)> err)
: error_handler_{ err }
, started_{ false }
, should_stop_{ false }
{
n_threads_ = std::clamp(
n_threads = std::clamp(
n_threads,
(size_t)1,
(size_t)std::max(std::thread::hardware_concurrency(), (unsigned)1));

for (auto i = 0; i < n_threads; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
}

common::ThreadPool::~ThreadPool() noexcept
{
await_stop();
}

void
common::ThreadPool::start()
{
EXPECT(!started_, "Thread pool already started.");

for (auto i = 0; i < n_threads_; ++i) {
threads_.emplace_back([this] { thread_worker_(); });
}
started_ = true;
}

void
common::ThreadPool::push_to_job_queue(JobT&& job)
{
EXPECT(started_, "Cannot push to job queue before starting.");

std::unique_lock lock(jobs_mutex_);
jobs_.push(std::move(job));
lock.unlock();
Expand All @@ -50,10 +40,6 @@ common::ThreadPool::push_to_job_queue(JobT&& job)
void
common::ThreadPool::await_stop() noexcept
{
if (!started_) {
return;
}

should_stop_ = true;
cv_.notify_all();

Expand Down
3 changes: 0 additions & 3 deletions src/common.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,17 @@ struct ThreadPool final
ThreadPool(size_t n_threads, std::function<void(const std::string&)> err);
~ThreadPool() noexcept;

void start();
void push_to_job_queue(JobT&& job);
void await_stop() noexcept;

private:
std::function<void(const std::string&)> error_handler_;

size_t n_threads_;
std::vector<std::thread> threads_;
mutable std::mutex jobs_mutex_;
std::condition_variable cv_;
std::queue<JobT> jobs_;

bool started_;
std::atomic<bool> should_stop_;

/// Multithreading
Expand Down
1 change: 0 additions & 1 deletion src/zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ void
zarr::Zarr::start()
{
error_ = true;
thread_pool_->start();

if (fs::exists(dataset_root_)) {
std::error_code ec;
Expand Down

0 comments on commit ebfba1f

Please sign in to comment.