diff --git a/src/net/include/likely.h b/src/net/include/likely.h new file mode 100644 index 0000000000..6e07121f0b --- /dev/null +++ b/src/net/include/likely.h @@ -0,0 +1,17 @@ +// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef LIKELY_H +#define LIKELY_H + +#if defined(__GNUC__) && __GNUC__ >= 4 +# define LIKELY(x) (__builtin_expect((x), 1)) +# define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +# define LIKELY(x) (x) +# define UNLIKELY(x) (x) +#endif + +#endif // LIKELY_H diff --git a/src/net/include/net_define.h b/src/net/include/net_define.h index 4ec16cc4e3..3974f07517 100644 --- a/src/net/include/net_define.h +++ b/src/net/include/net_define.h @@ -6,9 +6,7 @@ #ifndef NET_INCLUDE_NET_DEFINE_H_ #define NET_INCLUDE_NET_DEFINE_H_ -#include #include -#include namespace net { diff --git a/src/net/include/random.h b/src/net/include/random.h new file mode 100644 index 0000000000..595f0e9e3d --- /dev/null +++ b/src/net/include/random.h @@ -0,0 +1,88 @@ +// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef RANDOM_H +#define RANDOM_H + +#include +#include + +#include "net/include/likely.h" + +namespace net { + +class Random { + private: + enum : uint32_t { + M = 2147483647L // 2^31-1 + }; + enum : uint64_t { + A = 16807 // bits 14, 8, 7, 5, 2, 1, 0 + }; + + uint32_t seed_; + + static uint32_t GoodSeed(uint32_t s) { return (s & M) != 0 ? (s & M) : 1; } + + public: + // This is the largest value that can be returned from Next() + enum : uint32_t { kMaxNext = M }; + + explicit Random(uint32_t s) : seed_(GoodSeed(s)) {} + + void Reset(uint32_t s) { seed_ = GoodSeed(s); } + + uint32_t Next() { + // We are computing + // seed_ = (seed_ * A) % M, where M = 2^31-1 + // + // seed_ must not be zero or M, or else all subsequent computed values + // will be zero or M respectively. For all other values, seed_ will end + // up cycling through every number in [1,M-1] + uint64_t product = seed_ * A; + + // Compute (product % M) using the fact that ((x << 31) % M) == x. + seed_ = static_cast((product >> 31) + (product & M)); + // The first reduction may overflow by 1 bit, so we may need to + // repeat. mod == M is not possible; using > allows the faster + // sign-bit-based test. + if (seed_ > M) { + seed_ -= M; + } + return seed_; + } + + // Returns a uniformly distributed value in the range [0..n-1] + // REQUIRES: n > 0 + uint32_t Uniform(int n) { return Next() % n; } + + // Randomly returns true ~"1/n" of the time, and false otherwise. + // REQUIRES: n > 0 + bool OneIn(int n) { return (Next() % n) == 0; } + + // Skewed: pick "base" uniformly from range [0,max_log] and then + // return "base" random bits. The effect is to pick a number in the + // range [0,2^max_log-1] with exponential bias towards smaller numbers. + uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); } + + // Returns a Random instance for use by the current thread without + // additional locking + static Random* GetTLSInstance() { + static __thread Random* tls_instance; + static __thread std::aligned_storage::type tls_instance_bytes; + + auto rv = tls_instance; + if (UNLIKELY(rv == nullptr)) { + size_t seed = std::hash()(std::this_thread::get_id()); + rv = new (&tls_instance_bytes) Random((uint32_t)seed); + tls_instance = rv; + } + return rv; + } +}; + +} // namespace net + +#endif // RANDOM_H diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 0ec3d1bcb1..8d20957194 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -8,22 +8,16 @@ #include #include -#include #include +#include #include "net/include/net_define.h" +#include "net/include/random.h" #include "pstd/include/pstd_mutex.h" namespace net { -using TaskFunc = void (*)(void *); - -struct Task { - Task() = default; - TaskFunc func = nullptr; - void* arg = nullptr; - Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {} -}; +using TaskFunc = void (*)(void*); struct TimeTask { uint64_t exec_time; @@ -37,7 +31,13 @@ class ThreadPool : public pstd::noncopyable { public: class Worker { public: - explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){}; + struct Arg { + Arg(void* p, int i) : arg(p), idx(i) {} + void* arg; + int idx; + }; + + explicit Worker(ThreadPool* tp, int idx = 0) : start_(false), thread_pool_(tp), idx_(idx), arg_(tp, idx){}; static void* WorkerMain(void* arg); int start(); @@ -48,9 +48,11 @@ class ThreadPool : public pstd::noncopyable { std::atomic start_; ThreadPool* const thread_pool_; std::string worker_name_; + int idx_; + Arg arg_; }; - explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); + explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); virtual ~ThreadPool(); int start_thread_pool(); @@ -67,21 +69,73 @@ class ThreadPool : public pstd::noncopyable { std::string thread_pool_name(); private: - void runInThread(); + void runInThread(const int idx = 0); + + public: + struct AdaptationContext { + std::atomic value; + + explicit AdaptationContext() : value(0) {} + }; + + private: + struct Node { + Node* link_older = nullptr; + Node* link_newer = nullptr; - size_t worker_num_; + // true if task is TimeTask + bool is_time_task; + TimeTask task; + + Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} + Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} + + inline void Exec() { task.func(task.arg); } + inline Node* Next() { return link_newer; } + }; + + // re-push some timer tasks which has been poped + void ReDelaySchedule(Node* nodes); + + static inline void AsmVolatilePause() { +#if defined(__i386__) || defined(__x86_64__) + asm volatile("pause"); +#elif defined(__aarch64__) + asm volatile("wfe"); +#elif defined(__powerpc64__) + asm volatile("or 27,27,27"); +#endif + // it's okay for other platforms to be no-ops + } + + Node* CreateMissingNewerLinks(Node* head, int* cnt); + bool LinkOne(Node* node, std::atomic* newest_node); + + uint16_t task_idx_; + + const uint8_t nworkers_per_link_ = 1; // numer of workers per link + const uint8_t nlinks_; // number of links (upper around) + std::vector> newest_node_; + std::atomic node_cnt_; // for task + std::vector> time_newest_node_; + std::atomic time_node_cnt_; // for time task + + const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) size_t max_queue_size_; + + const uint64_t max_yield_usec_; + const uint64_t slow_yield_usec_; + + AdaptationContext adp_ctx; + + const size_t worker_num_; std::string thread_pool_name_; - std::queue queue_; - std::priority_queue time_queue_; std::vector workers_; std::atomic running_; std::atomic should_stop_; - pstd::Mutex mu_; - pstd::CondVar rsignal_; - pstd::CondVar wsignal_; - + std::vector mu_; + std::vector rsignal_; }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 8e20694244..b7e4c988de 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -5,23 +5,27 @@ #include "net/include/thread_pool.h" #include "net/src/net_thread_name.h" +#include "pstd/include/env.h" #include #include +#include +#include #include namespace net { -void* ThreadPool::Worker::WorkerMain(void* arg) { - auto tp = static_cast(arg); - tp->runInThread(); +void* ThreadPool::Worker::WorkerMain(void* p) { + auto arg = static_cast(p); + auto tp = static_cast(arg->arg); + tp->runInThread(arg->idx); return nullptr; } int ThreadPool::Worker::start() { if (!start_.load()) { - if (pthread_create(&thread_id_, nullptr, &WorkerMain, thread_pool_) != 0) { + if (pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_) != 0) { return -1; } else { start_.store(true); @@ -43,12 +47,30 @@ int ThreadPool::Worker::stop() { return 0; } -ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) - : worker_num_(worker_num), - max_queue_size_(max_queue_size), +ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) + : nlinks_(nworkers_per_link_ > worker_num ? 1 : (worker_num + nworkers_per_link_ - 1) / nworkers_per_link_), + // : nlinks_(worker_num), + newest_node_(nlinks_), + node_cnt_(0), + time_newest_node_(nlinks_), + time_node_cnt_(0), + max_queue_size_(1000 * nlinks_), + queue_slow_size_(worker_num > 10 ? (100 * nlinks_) : std::max(worker_num, 3UL)), + // queue_slow_size_(std::max(10UL, std::min(worker_num * max_queue_size / 100, max_queue_size))), + max_yield_usec_(100), + slow_yield_usec_(3), + adp_ctx(), + worker_num_(worker_num), thread_pool_name_(std::move(thread_pool_name)), running_(false), - should_stop_(false) {} + should_stop_(false), + mu_(nlinks_), + rsignal_(nlinks_) { + for (size_t i = 0; i < nlinks_; ++i) { + newest_node_[i] = nullptr; + time_newest_node_[i] = nullptr; + } +} ThreadPool::~ThreadPool() { stop_thread_pool(); } @@ -56,7 +78,7 @@ int ThreadPool::start_thread_pool() { if (!running_.load()) { should_stop_.store(false); for (size_t i = 0; i < worker_num_; ++i) { - workers_.push_back(new Worker(this)); + workers_.push_back(new Worker(this, i)); int res = workers_[i]->start(); if (res != 0) { return kCreateThreadError; @@ -71,8 +93,9 @@ int ThreadPool::stop_thread_pool() { int res = 0; if (running_.load()) { should_stop_.store(true); - rsignal_.notify_all(); - wsignal_.notify_all(); + for (auto& r : rsignal_) { + r.notify_all(); + } for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { @@ -92,12 +115,22 @@ bool ThreadPool::should_stop() { return should_stop_.load(); } void ThreadPool::set_should_stop() { should_stop_.store(true); } void ThreadPool::Schedule(TaskFunc func, void* arg) { - std::unique_lock lock(mu_); - wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); }); + node_cnt_++; + // stop until the size of tasks queue is not greater than max_queue_size_ + while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { + std::this_thread::yield(); + // pstd::SleepForMicroseconds(1); + } + // slow like above + if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { + std::this_thread::yield(); + } - if (!should_stop()) { - queue_.emplace(func, arg); - rsignal_.notify_one(); + if (LIKELY(!should_stop())) { + auto node = new Node(func, arg); + auto idx = ++task_idx_; + LinkOne(node, &newest_node_[idx % nlinks_]); + rsignal_[idx % nlinks_].notify_one(); } } @@ -109,59 +142,102 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; - std::lock_guard lock(mu_); - if (!should_stop()) { - time_queue_.emplace(exec_time, func, arg); - rsignal_.notify_all(); + if (LIKELY(!should_stop())) { + auto idx = ++task_idx_; + auto node = new Node(exec_time, func, arg); + LinkOne(node, &newest_node_[idx % nlinks_]); + time_node_cnt_++; + rsignal_[idx % nlinks_].notify_all(); } } size_t ThreadPool::max_queue_size() { return max_queue_size_; } -void ThreadPool::cur_queue_size(size_t* qsize) { - std::lock_guard lock(mu_); - *qsize = queue_.size(); -} +size_t ThreadPool::worker_size() { return worker_num_; } -void ThreadPool::cur_time_queue_size(size_t* qsize) { - std::lock_guard lock(mu_); - *qsize = time_queue_.size(); -} +void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } + +void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.load(std::memory_order_relaxed); } std::string ThreadPool::thread_pool_name() { return thread_pool_name_; } -void ThreadPool::runInThread() { - while (!should_stop()) { - std::unique_lock lock(mu_); - rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); }); +void ThreadPool::runInThread(const int idx) { + Node* tmp = nullptr; + Node* last = nullptr; + Node* time_last = nullptr; + + auto& newest_node = newest_node_[idx % nlinks_]; + auto& mu = mu_[idx % nlinks_]; + auto& rsignal = rsignal_[idx % nlinks_]; - if (should_stop()) { + while (LIKELY(!should_stop())) { + std::unique_lock lock(mu); + rsignal.wait(lock, [this, &newest_node]() { + return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop()); + }); + lock.unlock(); + + if (UNLIKELY(should_stop())) { break; } - if (!time_queue_.empty()) { - auto now = std::chrono::system_clock::now(); - uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); - - auto [exec_time, func, arg] = time_queue_.top(); - if (unow >= exec_time) { - time_queue_.pop(); - lock.unlock(); - (*func)(arg); - continue; - } else if (queue_.empty() && !should_stop()) { - rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); - lock.unlock(); - continue; - } + + retry: + last = newest_node.exchange(nullptr); + if (last == nullptr) { + continue; } - if (!queue_.empty()) { - auto [func, arg] = queue_.front(); - queue_.pop(); - wsignal_.notify_one(); - lock.unlock(); - (*func)(arg); + // do all normal tasks older than this task pointed last + int cnt = 1; + auto first = CreateMissingNewerLinks(last, &cnt); + assert(!first->is_time_task); + do { + first->Exec(); + tmp = first; + first = first->Next(); + node_cnt_--; + delete tmp; + } while (first != nullptr); + goto retry; + } +} + +void ThreadPool::ReDelaySchedule(Node* nodes) { + while (LIKELY(!should_stop()) && nodes != nullptr) { + auto idx = ++task_idx_; + auto nxt = nodes->Next(); + nodes->link_newer = nullptr; + // auto node = new Node(exec_time, func, arg); + LinkOne(nodes, &newest_node_[idx % nlinks_]); + time_node_cnt_++; + rsignal_[idx % nlinks_].notify_all(); + nodes = nxt; + } +} + +ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { + assert(head != nullptr); + assert(cnt != nullptr && *cnt == 1); + Node* next = nullptr; + while (true) { + next = head->link_older; + if (next == nullptr) { + return head; + } + ++(*cnt); + next->link_newer = head; + head = next; + } +} + +bool ThreadPool::LinkOne(Node* node, std::atomic* newest_node) { + assert(newest_node != nullptr); + auto nodes = newest_node->load(std::memory_order_relaxed); + while (true) { + node->link_older = nodes; + if (newest_node->compare_exchange_weak(nodes, node)) { + return (nodes == nullptr); } } } -} // namespace net +} // namespace net \ No newline at end of file diff --git a/tests/integration/start_codis.sh b/tests/integration/start_codis.sh index c686251eb3..2ab1ddbdfc 100755 --- a/tests/integration/start_codis.sh +++ b/tests/integration/start_codis.sh @@ -1,10 +1,16 @@ #!/bin/bash -#pkill -9 pika -#pkill -9 codis -#rm -rf /tmp/codis -#rm -rf codis_data_1 -#rm -rf codis_data_2 +# sugguest clear these before starting codis +# pkill -9 pika +# pkill -9 codis +# rm -rf /tmp/codis +# rm -rf codis_data* +# rm -rf acl*data +# rm -rf dump +# rm -rf db +# rm -rf log +# rm -rf slave_data +# rm -rf master_data CODIS_DASHBOARD_ADDR=127.0.0.1:18080 @@ -39,6 +45,8 @@ echo 'startup codis dashboard and codis proxy' sleep 20 +# if start failed, maybe for your struct/class that be defined NOT be used +# or addtional introduced packages echo 'assign codis slots to groups and resync groups' ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=1 ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=2