diff --git a/src/net/include/net_define.h b/src/net/include/net_define.h index 4ec16cc4e3..3974f07517 100644 --- a/src/net/include/net_define.h +++ b/src/net/include/net_define.h @@ -6,9 +6,7 @@ #ifndef NET_INCLUDE_NET_DEFINE_H_ #define NET_INCLUDE_NET_DEFINE_H_ -#include #include -#include namespace net { diff --git a/src/net/include/random.h b/src/net/include/random.h index 54d067f548..595f0e9e3d 100644 --- a/src/net/include/random.h +++ b/src/net/include/random.h @@ -6,11 +6,8 @@ #ifndef RANDOM_H #define RANDOM_H -#include -#include #include #include -#include #include "net/include/likely.h" @@ -88,4 +85,4 @@ class Random { } // namespace net -#endif // RANDOM_H \ No newline at end of file +#endif // RANDOM_H diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 6ca84006d7..bd2e6d8609 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -18,13 +18,6 @@ namespace net { using TaskFunc = void (*)(void*); -// struct Task { -// Task() = default; -// TaskFunc func = nullptr; -// void* arg = nullptr; -// Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {} -// }; - struct TimeTask { uint64_t exec_time; TaskFunc func; @@ -103,7 +96,7 @@ class ThreadPool : public pstd::noncopyable { // it's okay for other platforms to be no-ops } - Node* CreateMissingNewerLinks(Node* head, int* cnt); + Node* CreateMissingNewerLinks(Node* head); bool LinkOne(Node* node, std::atomic* newest_node); std::atomic newest_node_; @@ -111,7 +104,7 @@ class ThreadPool : public pstd::noncopyable { std::atomic time_newest_node_; std::atomic time_node_cnt_; // for time task - const int queue_slow_size_; // default value: max(worker_num_ * 100, max_queue_size_) + const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) size_t max_queue_size_; const uint64_t max_yield_usec_; @@ -121,15 +114,12 @@ class ThreadPool : public pstd::noncopyable { size_t worker_num_; std::string thread_pool_name_; - // std::queue queue_; - // std::priority_queue time_queue_; std::vector workers_; std::atomic running_; std::atomic should_stop_; pstd::Mutex mu_; pstd::CondVar rsignal_; - // pstd::CondVar wsignal_; }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 6b4b43e134..0a55ff9e0a 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -46,12 +46,14 @@ int ThreadPool::Worker::stop() { ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) : newest_node_(nullptr), node_cnt_(0), - queue_slow_size_(std::max(worker_num_ * 100, max_queue_size_)), + time_newest_node_(nullptr), + time_node_cnt_(0), + queue_slow_size_(std::min(worker_num * 10, max_queue_size)), + max_queue_size_(max_queue_size), max_yield_usec_(100), slow_yield_usec_(3), adp_ctx(), worker_num_(worker_num), - max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {} @@ -78,7 +80,6 @@ int ThreadPool::stop_thread_pool() { if (running_.load()) { should_stop_.store(true); rsignal_.notify_all(); - // wsignal_.notify_all(); for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { @@ -106,6 +107,7 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { std::this_thread::yield(); } + // std::unique_lock lock(mu_); if (LIKELY(!should_stop())) { auto node = new Node(func, arg); LinkOne(node, &newest_node_); @@ -122,6 +124,7 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; + // std::unique_lock lock(mu_); if (LIKELY(!should_stop())) { auto node = new Node(exec_time, func, arg); LinkOne(node, &time_newest_node_); @@ -132,6 +135,8 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { size_t ThreadPool::max_queue_size() { return max_queue_size_; } +size_t ThreadPool::worker_size() { return worker_num_; } + void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.load(std::memory_order_relaxed); } @@ -174,7 +179,7 @@ void ThreadPool::runInThread() { } AsmVolatilePause(); } - + // 2. loop for a little short time again const size_t kMaxSlowYieldsWhileSpinning = 3; auto& yield_credit = adp_ctx.value; @@ -238,24 +243,20 @@ void ThreadPool::runInThread() { exec: // do all normal tasks older than this task pointed last if (LIKELY(last != nullptr)) { - int cnt = 0; - auto first = CreateMissingNewerLinks(last, &cnt); + auto first = CreateMissingNewerLinks(last); assert(!first->is_time_task); - node_cnt_ -= cnt; do { first->Exec(); - // node_cnt_--; tmp = first; first = first->Next(); + node_cnt_--; delete tmp; } while (first != nullptr); } // do all time tasks older than this task pointed time_last if (UNLIKELY(time_last != nullptr)) { - int time_cnt = 0; - auto time_first = CreateMissingNewerLinks(time_last, &time_cnt); - // time_node_cnt_ -= time_cnt; + auto time_first = CreateMissingNewerLinks(time_last); do { // time task may block normal task auto now = std::chrono::system_clock::now(); @@ -265,15 +266,15 @@ void ThreadPool::runInThread() { assert(time_first->is_time_task); if (unow >= exec_time) { time_first->Exec(); - time_node_cnt_--; } else { lock.lock(); rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); + lock.unlock(); time_first->Exec(); - time_node_cnt_--; } tmp = time_first; time_first = time_first->Next(); + time_node_cnt_--; delete tmp; } while (time_first != nullptr); } @@ -281,16 +282,14 @@ void ThreadPool::runInThread() { } } -ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { +ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head) { assert(head != nullptr); Node* next = nullptr; - *cnt = 1; while (true) { next = head->link_older; if (next == nullptr) { return head; } - ++(*cnt); next->link_newer = head; head = next; }