diff --git a/include/ThreadPool.hpp b/include/ThreadPool.hpp index 90f1375..d4a3982 100644 --- a/include/ThreadPool.hpp +++ b/include/ThreadPool.hpp @@ -4,15 +4,14 @@ // Standard library //-------------------------------------------------------------- #include -#include #include +#include #include #include #include #include #include #include -#include #include #include #include @@ -471,9 +470,9 @@ namespace ThreadPool { //-------------------------- auto _threads_number = std::clamp(number_threads, m_lower_threshold, m_upper_threshold); //-------------------------- - if constexpr (adoptive_tick){ + if constexpr (adoptive_tick) { m_idle_threads.emplace(); - m_idle_threads->reserve(_threads_number); + m_idle_threads->reserve(_threads_number*4UL); }//end if constexpr (adoptive_tick) //-------------------------- create_task(_threads_number); @@ -719,22 +718,19 @@ namespace ThreadPool { //-------------------------- for (size_t i = 0; i < number_threads; ++i) { //-------------------------- - if constexpr (!adoptive_tick){ + if constexpr (!adoptive_tick) { //-------------------------- m_workers.emplace_back([this](std::stop_token stoken){this->worker_function(stoken);}); //-------------------------- }// end if constexpr (!adoptive_tick) //-------------------------- - if constexpr(adoptive_tick){ + if constexpr(adoptive_tick) { //-------------------------- // Create a jthread with the worker function - std::jthread _thread([this](std::stop_token stoken){this->worker_function(stoken);}); - //-------------------------- - // Obtain the thread ID after the thread has started - const std::thread::id thread_id = _thread.get_id(); + std::jthread thread_([this](std::stop_token stoken){this->worker_function(stoken);}); //-------------------------- // Insert the thread into the worker map using its thread ID - m_workers.emplace(thread_id, std::move(_thread)); + m_workers.emplace(thread_.get_id(), std::move(thread_)); //-------------------------- }// end if constexpr(adoptive_tick) //-------------------------- @@ -744,9 +740,9 @@ namespace ThreadPool { //-------------------------- void worker_function(const std::stop_token& stoken){ //-------------------------- - std::optional id; + std::optional id{std::nullopt}; //-------------------------- - if constexpr (adoptive_tick){ + if constexpr (adoptive_tick) { //-------------------------- id = std::this_thread::get_id(); //-------------------------- @@ -761,13 +757,13 @@ namespace ThreadPool { //-------------------------- std::unique_lock lock(m_mutex); //-------------------------- - if constexpr (adoptive_tick){ + if constexpr (adoptive_tick) { m_idle_threads->insert(id.value()); }// end if constexpr (adoptive_tick) //-------------------------- m_task_available_condition.wait(lock, [this, &stoken] {return stoken.stop_requested() or !m_tasks.empty();}); //-------------------------- - if constexpr (adoptive_tick){ + if constexpr (adoptive_tick) { m_idle_threads->erase(id.value()); }// end if constexpr (adoptive_tick) //-------------------------- @@ -779,7 +775,7 @@ namespace ThreadPool { //-------------------------- }// end if (stoken.stop_requested() and m_tasks.empty()) //-------------------------- - if constexpr (static_cast(use_priority_queue)){ + if constexpr (static_cast(use_priority_queue)) { task = std::move(m_tasks.pop_top().value()); } else{ task = std::move(m_tasks.front()); @@ -790,7 +786,7 @@ namespace ThreadPool { //-------------------------- try { //-------------------------- - if constexpr (static_cast(use_priority_queue)){ + if constexpr (static_cast(use_priority_queue)) { static_cast(task.try_execute()); } else{ task(); @@ -799,7 +795,7 @@ namespace ThreadPool { } // end try catch (const std::exception& e) { //-------------------------- - if constexpr (static_cast(use_priority_queue)){ + if constexpr (static_cast(use_priority_queue)) { handle_error(std::move(task), e.what()); } else{ handle_error(e.what()); @@ -808,7 +804,7 @@ namespace ThreadPool { } // end catch (const std::exception& e) catch (...) { //-------------------------- - if constexpr (static_cast(use_priority_queue)){ + if constexpr (static_cast(use_priority_queue)) { handle_error(std::move(task), "Unknown error"); } else{ handle_error("Unknown error"); @@ -821,11 +817,11 @@ namespace ThreadPool { }// end void worker_function(void) //-------------------------- template - std::enable_if_t adjust_workers(void){ + std::enable_if_t<(U != 0UL), void> adjust_workers(void) { //-------------------------- - static const size_t threshold_ = static_cast(std::ceil(m_upper_threshold*0.2)); + static const size_t threshold_ = static_cast(std::ceil(m_upper_threshold*0.2)); //-------------------------- - const size_t task_count = active_tasks_size(), worker_count = thread_Workers_size(); + const size_t task_count = active_tasks_size(), worker_count = thread_Workers_size(); //-------------------------- { //-------------------------- @@ -864,7 +860,7 @@ namespace ThreadPool { }// end void adjust_workers(void) //-------------------------- template - std::enable_if_t adjustment_thread_function(const std::stop_token& stoken){ + std::enable_if_t<(U != 0UL), void> adjustment_thread_function(const std::stop_token& stoken){ //-------------------------- while (!stoken.stop_requested()) { //-------------------------- @@ -877,12 +873,12 @@ namespace ThreadPool { }// end void adjustment_thread_function(const std::stop_token& stoken) //-------------------------- template - std::enable_if_t adjust_workers(void) = delete; + std::enable_if_t<(U == 0UL), void> adjust_workers(void) = delete; //-------------------------- template - std::enable_if_t adjustment_thread_function(const std::stop_token& stoken) = delete; + std::enable_if_t<(U == 0UL), void> adjustment_thread_function(const std::stop_token& stoken) = delete; //-------------------------- - void stop(void){ + void stop(void) { //-------------------------- { //-------------------------- @@ -921,7 +917,7 @@ namespace ThreadPool { }// end void push_task(ThreadTask&& task) //-------------------------- template(use_priority_queue), typename = std::enable_if_t> - void emplace_task(Args&&... args){ + void emplace_task(Args&&... args) { //-------------------------- m_tasks.emplace(std::forward(args)...); //-------------------------- @@ -931,11 +927,10 @@ namespace ThreadPool { //-------------------------- // Method for the case when priority queue is used template (use_priority_queue), typename std::enable_if_t = 0> - void handle_error(ThreadTask&& task, const char* error){ + void handle_error(ThreadTask&& task, const char* error) { + //-------------------------- if (task.get_retries() > 0) { //-------------------------- - std::scoped_lock lock(m_mutex); - //--------------------------; task.decrease_retries(); m_tasks.push(std::move(task)); //-------------------------- @@ -949,7 +944,7 @@ namespace ThreadPool { //-------------------------- // Method for the case when priority queue is NOT used template (use_priority_queue), typename std::enable_if_t = 0> - void handle_error(const char* error) { + void handle_error(const char* error) const { //-------------------------- std::cerr << "Error in task: " << error << std::endl; //-------------------------- @@ -975,7 +970,7 @@ namespace ThreadPool { //-------------------------- std::optional assign_adoptive_thread(void) { //-------------------------- - if constexpr (adoptive_tick){ + if constexpr (adoptive_tick) { return std::optional([this](const std::stop_token& stoken){this->adjustment_thread_function(stoken);}); }// end if constexpr (adoptive_tick) //-------------------------- @@ -987,7 +982,7 @@ namespace ThreadPool { //-------------------------------------------------------------- const size_t m_upper_threshold; //-------------------------- - using WorkersType = std::conditional_t, std::vector>; + using WorkersType = std::conditional_t<(adoptive_tick != 0UL), std::unordered_map, std::vector>; WorkersType m_workers; //-------------------------- std::optional> m_idle_threads;