Skip to content

Commit

Permalink
Fix: static analysis errors and Change: optimize adding a thread to t…
Browse files Browse the repository at this point in the history
…he map
  • Loading branch information
mjshakir committed Aug 26, 2024
1 parent e92531a commit 792b6ab
Showing 1 changed file with 28 additions and 33 deletions.
61 changes: 28 additions & 33 deletions include/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
// Standard library
//--------------------------------------------------------------
#include <iostream>
#include <condition_variable>
#include <thread>
#include <condition_variable>
#include <future>
#include <chrono>
#include <concepts>
#include <unordered_set>
#include <unordered_map>
#include <cmath>
#include <limits>
#include <functional>
#include <optional>
#include <mutex>
Expand Down Expand Up @@ -471,9 +470,9 @@ namespace ThreadPool {
//--------------------------
auto _threads_number = std::clamp(number_threads, m_lower_threshold, m_upper_threshold);
//--------------------------
if constexpr (adoptive_tick){
if constexpr (adoptive_tick) {
m_idle_threads.emplace();
m_idle_threads->reserve(_threads_number);
m_idle_threads->reserve(_threads_number*4UL);
}//end if constexpr (adoptive_tick)
//--------------------------
create_task(_threads_number);
Expand Down Expand Up @@ -719,22 +718,19 @@ namespace ThreadPool {
//--------------------------
for (size_t i = 0; i < number_threads; ++i) {
//--------------------------
if constexpr (!adoptive_tick){
if constexpr (!adoptive_tick) {
//--------------------------
m_workers.emplace_back([this](std::stop_token stoken){this->worker_function(stoken);});
//--------------------------
}// end if constexpr (!adoptive_tick)
//--------------------------
if constexpr(adoptive_tick){
if constexpr(adoptive_tick) {
//--------------------------
// Create a jthread with the worker function
std::jthread _thread([this](std::stop_token stoken){this->worker_function(stoken);});
//--------------------------
// Obtain the thread ID after the thread has started
const std::thread::id thread_id = _thread.get_id();
std::jthread thread_([this](std::stop_token stoken){this->worker_function(stoken);});
//--------------------------
// Insert the thread into the worker map using its thread ID
m_workers.emplace(thread_id, std::move(_thread));
m_workers.emplace(thread_.get_id(), std::move(thread_));
//--------------------------
}// end if constexpr(adoptive_tick)
//--------------------------
Expand All @@ -744,9 +740,9 @@ namespace ThreadPool {
//--------------------------
void worker_function(const std::stop_token& stoken){
//--------------------------
std::optional<std::thread::id> id;
std::optional<std::thread::id> id{std::nullopt};
//--------------------------
if constexpr (adoptive_tick){
if constexpr (adoptive_tick) {
//--------------------------
id = std::this_thread::get_id();
//--------------------------
Expand All @@ -761,13 +757,13 @@ namespace ThreadPool {
//--------------------------
std::unique_lock lock(m_mutex);
//--------------------------
if constexpr (adoptive_tick){
if constexpr (adoptive_tick) {
m_idle_threads->insert(id.value());
}// end if constexpr (adoptive_tick)
//--------------------------
m_task_available_condition.wait(lock, [this, &stoken] {return stoken.stop_requested() or !m_tasks.empty();});
//--------------------------
if constexpr (adoptive_tick){
if constexpr (adoptive_tick) {
m_idle_threads->erase(id.value());
}// end if constexpr (adoptive_tick)
//--------------------------
Expand All @@ -779,7 +775,7 @@ namespace ThreadPool {
//--------------------------
}// end if (stoken.stop_requested() and m_tasks.empty())
//--------------------------
if constexpr (static_cast<bool>(use_priority_queue)){
if constexpr (static_cast<bool>(use_priority_queue)) {
task = std::move(m_tasks.pop_top().value());
} else{
task = std::move(m_tasks.front());
Expand All @@ -790,7 +786,7 @@ namespace ThreadPool {
//--------------------------
try {
//--------------------------
if constexpr (static_cast<bool>(use_priority_queue)){
if constexpr (static_cast<bool>(use_priority_queue)) {
static_cast<void>(task.try_execute());
} else{
task();
Expand All @@ -799,7 +795,7 @@ namespace ThreadPool {
} // end try
catch (const std::exception& e) {
//--------------------------
if constexpr (static_cast<bool>(use_priority_queue)){
if constexpr (static_cast<bool>(use_priority_queue)) {
handle_error(std::move(task), e.what());
} else{
handle_error(e.what());
Expand All @@ -808,7 +804,7 @@ namespace ThreadPool {
} // end catch (const std::exception& e)
catch (...) {
//--------------------------
if constexpr (static_cast<bool>(use_priority_queue)){
if constexpr (static_cast<bool>(use_priority_queue)) {
handle_error(std::move(task), "Unknown error");
} else{
handle_error("Unknown error");
Expand All @@ -821,11 +817,11 @@ namespace ThreadPool {
}// end void worker_function(void)
//--------------------------
template <size_t U = adoptive_tick>
std::enable_if_t<U, void> adjust_workers(void){
std::enable_if_t<(U != 0UL), void> adjust_workers(void) {
//--------------------------
static const size_t threshold_ = static_cast<size_t>(std::ceil(m_upper_threshold*0.2));
static const size_t threshold_ = static_cast<size_t>(std::ceil(m_upper_threshold*0.2));
//--------------------------
const size_t task_count = active_tasks_size(), worker_count = thread_Workers_size();
const size_t task_count = active_tasks_size(), worker_count = thread_Workers_size();
//--------------------------
{
//--------------------------
Expand Down Expand Up @@ -864,7 +860,7 @@ namespace ThreadPool {
}// end void adjust_workers(void)
//--------------------------
template <size_t U = adoptive_tick>
std::enable_if_t<U, void> adjustment_thread_function(const std::stop_token& stoken){
std::enable_if_t<(U != 0UL), void> adjustment_thread_function(const std::stop_token& stoken){
//--------------------------
while (!stoken.stop_requested()) {
//--------------------------
Expand All @@ -877,12 +873,12 @@ namespace ThreadPool {
}// end void adjustment_thread_function(const std::stop_token& stoken)
//--------------------------
template <size_t U = adoptive_tick>
std::enable_if_t<!U, void> adjust_workers(void) = delete;
std::enable_if_t<(U == 0UL), void> adjust_workers(void) = delete;
//--------------------------
template <size_t U = adoptive_tick>
std::enable_if_t<!U, void> adjustment_thread_function(const std::stop_token& stoken) = delete;
std::enable_if_t<(U == 0UL), void> adjustment_thread_function(const std::stop_token& stoken) = delete;
//--------------------------
void stop(void){
void stop(void) {
//--------------------------
{
//--------------------------
Expand Down Expand Up @@ -921,7 +917,7 @@ namespace ThreadPool {
}// end void push_task(ThreadTask&& task)
//--------------------------
template<typename... Args, bool U = static_cast<bool>(use_priority_queue), typename = std::enable_if_t<U>>
void emplace_task(Args&&... args){
void emplace_task(Args&&... args) {
//--------------------------
m_tasks.emplace(std::forward<Args>(args)...);
//--------------------------
Expand All @@ -931,11 +927,10 @@ namespace ThreadPool {
//--------------------------
// Method for the case when priority queue is used
template <bool U = static_cast<bool>(use_priority_queue), typename std::enable_if_t<U, int> = 0>
void handle_error(ThreadTask&& task, const char* error){
void handle_error(ThreadTask&& task, const char* error) {
//--------------------------
if (task.get_retries() > 0) {
//--------------------------
std::scoped_lock lock(m_mutex);
//--------------------------;
task.decrease_retries();
m_tasks.push(std::move(task));
//--------------------------
Expand All @@ -949,7 +944,7 @@ namespace ThreadPool {
//--------------------------
// Method for the case when priority queue is NOT used
template <bool U = static_cast<bool>(use_priority_queue), typename std::enable_if_t<!U, int> = 0>
void handle_error(const char* error) {
void handle_error(const char* error) const {
//--------------------------
std::cerr << "Error in task: " << error << std::endl;
//--------------------------
Expand All @@ -975,7 +970,7 @@ namespace ThreadPool {
//--------------------------
std::optional<std::jthread> assign_adoptive_thread(void) {
//--------------------------
if constexpr (adoptive_tick){
if constexpr (adoptive_tick) {
return std::optional<std::jthread>([this](const std::stop_token& stoken){this->adjustment_thread_function(stoken);});
}// end if constexpr (adoptive_tick)
//--------------------------
Expand All @@ -987,7 +982,7 @@ namespace ThreadPool {
//--------------------------------------------------------------
const size_t m_upper_threshold;
//--------------------------
using WorkersType = std::conditional_t<adoptive_tick, std::unordered_map<std::thread::id, std::jthread>, std::vector<std::jthread>>;
using WorkersType = std::conditional_t<(adoptive_tick != 0UL), std::unordered_map<std::thread::id, std::jthread>, std::vector<std::jthread>>;
WorkersType m_workers;
//--------------------------
std::optional<std::unordered_set<std::thread::id>> m_idle_threads;
Expand Down

0 comments on commit 792b6ab

Please sign in to comment.