Skip to content

Commit

Permalink
mutil threads per link and mutil links
Browse files Browse the repository at this point in the history
  • Loading branch information
QlQlqiqi committed Jun 19, 2024
1 parent 807a1c4 commit c21fd6e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ class ThreadPool : public pstd::noncopyable {
Node* CreateMissingNewerLinks(Node* head, int* cnt);
bool LinkOne(Node* node, std::atomic<Node*>* newest_node);

int task_idx_;
std::vector<std::atomic<void*>> asd;
uint16_t task_idx_;

const uint8_t nworkers_per_link_ = 2; // numer of workers per link
const uint8_t nlinks_; // number of links (upper around)
std::vector<std::atomic<Node*>> newest_node_;
std::atomic<int> node_cnt_; // for task
std::vector<std::atomic<Node*>> time_newest_node_;
Expand Down
35 changes: 19 additions & 16 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ int ThreadPool::Worker::stop() {
}

ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: newest_node_(worker_num),
: nlinks_((worker_num + nworkers_per_link_ - 1) / nworkers_per_link_),
// : nlinks_(worker_num),
newest_node_(nlinks_),
node_cnt_(0),
time_newest_node_(worker_num),
time_newest_node_(nlinks_),
time_node_cnt_(0),
queue_slow_size_(std::min(worker_num * 10, max_queue_size)),
max_queue_size_(max_queue_size),
Expand All @@ -58,9 +60,9 @@ ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thr
thread_pool_name_(std::move(thread_pool_name)),
running_(false),
should_stop_(false),
mu_(worker_num),
rsignal_(worker_num) {
for (size_t i = 0; i < worker_num_; ++i) {
mu_(nlinks_),
rsignal_(nlinks_) {
for (size_t i = 0; i < nlinks_; ++i) {
newest_node_[i] = nullptr;
time_newest_node_[i] = nullptr;
}
Expand All @@ -71,7 +73,7 @@ ThreadPool::~ThreadPool() { stop_thread_pool(); }
int ThreadPool::start_thread_pool() {
if (!running_.load()) {
should_stop_.store(false);
for (size_t i = 0; i < worker_num_; ++i) {
for (size_t i = 0; i < nlinks_; ++i) {
workers_.push_back(new Worker(this, i));
int res = workers_[i]->start();
if (res != 0) {
Expand Down Expand Up @@ -120,10 +122,10 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) {

if (LIKELY(!should_stop())) {
auto node = new Node(func, arg);
auto idx = ++task_idx_ % worker_num_;
LinkOne(node, &newest_node_[idx]);
auto idx = ++task_idx_;
LinkOne(node, &newest_node_[idx % nlinks_]);
node_cnt_++;
rsignal_[idx].notify_one();
rsignal_[idx % nlinks_].notify_one();
}
}

Expand All @@ -136,11 +138,11 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {
uint64_t exec_time = unow + timeout * 1000;

if (LIKELY(!should_stop())) {
auto idx = ++task_idx_ % worker_num_;
auto idx = ++task_idx_;
auto node = new Node(exec_time, func, arg);
LinkOne(node, &newest_node_[idx]);
LinkOne(node, &newest_node_[idx % nlinks_]);
time_node_cnt_++;
rsignal_[idx].notify_all();
rsignal_[idx % nlinks_].notify_all();
}
}

Expand All @@ -158,10 +160,11 @@ void ThreadPool::runInThread(const int idx) {
Node* tmp = nullptr;
Node* last = nullptr;
Node* time_last = nullptr;
auto& newest_node = newest_node_[idx];
auto& time_newest_node = time_newest_node_[idx];
auto& mu = mu_[idx];
auto& rsignal = rsignal_[idx];

auto& newest_node = newest_node_[idx % nlinks_];
auto& time_newest_node = time_newest_node_[idx % nlinks_];
auto& mu = mu_[idx % nlinks_];
auto& rsignal = rsignal_[idx % nlinks_];

while (LIKELY(!should_stop())) {
std::unique_lock lock(mu);
Expand Down

0 comments on commit c21fd6e

Please sign in to comment.