From 8342c96a265e52edfe5e630e36cdeeb85b3e961e Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Sun, 12 May 2024 12:51:43 +0800 Subject: [PATCH 01/13] change thread sheduling method and the logic is based on rocksdb --- src/net/include/likely.h | 12 +++ src/net/include/random.h | 78 +++++++++++++++++ src/net/include/thread_pool.h | 51 ++++++++++- src/net/src/thread_pool.cc | 159 +++++++++++++++++++++++++++------- 4 files changed, 264 insertions(+), 36 deletions(-) create mode 100644 src/net/include/likely.h create mode 100644 src/net/include/random.h diff --git a/src/net/include/likely.h b/src/net/include/likely.h new file mode 100644 index 0000000000..ac27a8168f --- /dev/null +++ b/src/net/include/likely.h @@ -0,0 +1,12 @@ +#ifndef PORT_LIKELY_H_ +#define PORT_LIKELY_H_ + +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + +#endif // PORT_LIKELY_H_ \ No newline at end of file diff --git a/src/net/include/random.h b/src/net/include/random.h new file mode 100644 index 0000000000..515ca7252b --- /dev/null +++ b/src/net/include/random.h @@ -0,0 +1,78 @@ +#pragma once +#include +#include +#include +#include +#include + +#include "net/include/likely.h" + +class Random { + private: + enum : uint32_t { + M = 2147483647L // 2^31-1 + }; + enum : uint64_t { + A = 16807 // bits 14, 8, 7, 5, 2, 1, 0 + }; + + uint32_t seed_; + + static uint32_t GoodSeed(uint32_t s) { return (s & M) != 0 ? (s & M) : 1; } + + public: + // This is the largest value that can be returned from Next() + enum : uint32_t { kMaxNext = M }; + + explicit Random(uint32_t s) : seed_(GoodSeed(s)) {} + + void Reset(uint32_t s) { seed_ = GoodSeed(s); } + + uint32_t Next() { + // We are computing + // seed_ = (seed_ * A) % M, where M = 2^31-1 + // + // seed_ must not be zero or M, or else all subsequent computed values + // will be zero or M respectively. For all other values, seed_ will end + // up cycling through every number in [1,M-1] + uint64_t product = seed_ * A; + + // Compute (product % M) using the fact that ((x << 31) % M) == x. + seed_ = static_cast((product >> 31) + (product & M)); + // The first reduction may overflow by 1 bit, so we may need to + // repeat. mod == M is not possible; using > allows the faster + // sign-bit-based test. + if (seed_ > M) { + seed_ -= M; + } + return seed_; + } + + // Returns a uniformly distributed value in the range [0..n-1] + // REQUIRES: n > 0 + uint32_t Uniform(int n) { return Next() % n; } + + // Randomly returns true ~"1/n" of the time, and false otherwise. + // REQUIRES: n > 0 + bool OneIn(int n) { return (Next() % n) == 0; } + + // Skewed: pick "base" uniformly from range [0,max_log] and then + // return "base" random bits. The effect is to pick a number in the + // range [0,2^max_log-1] with exponential bias towards smaller numbers. + uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); } + + // Returns a Random instance for use by the current thread without + // additional locking + static Random* GetTLSInstance() { + static __thread Random* tls_instance; + static __thread std::aligned_storage::type tls_instance_bytes; + + auto rv = tls_instance; + if (UNLIKELY(rv == nullptr)) { + size_t seed = std::hash()(std::this_thread::get_id()); + rv = new (&tls_instance_bytes) Random((uint32_t)seed); + tls_instance = rv; + } + return rv; + } +}; \ No newline at end of file diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 0ec3d1bcb1..aa53e5ec77 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -12,11 +12,12 @@ #include #include "net/include/net_define.h" +#include "net/include/random.h" #include "pstd/include/pstd_mutex.h" namespace net { -using TaskFunc = void (*)(void *); +using TaskFunc = void (*)(void*); struct Task { Task() = default; @@ -50,7 +51,7 @@ class ThreadPool : public pstd::noncopyable { std::string worker_name_; }; - explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); + explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); virtual ~ThreadPool(); int start_thread_pool(); @@ -69,8 +70,51 @@ class ThreadPool : public pstd::noncopyable { private: void runInThread(); - size_t worker_num_; + public: + struct AdaptationContext { + std::atomic value; + + explicit AdaptationContext() : value(0) {} + }; + + private: + struct Node { + Node* link_older = nullptr; + Node* link_newer = nullptr; + + Task task; + + Node(TaskFunc func, void* arg) : task(func, arg) {} + + inline void Exec() { task.func(task.arg); } + inline Node* Next() { return link_newer; } + }; + + static inline void AsmVolatilePause() { +#if defined(__i386__) || defined(__x86_64__) + asm volatile("pause"); +#elif defined(__aarch64__) + asm volatile("wfe"); +#elif defined(__powerpc64__) + asm volatile("or 27,27,27"); +#endif + // it's okay for other platforms to be no-ops + } + + Node* CreateMissingNewerLinks(Node* head, int* cnt); + bool LinkOne(Node* node, std::atomic* newest_node); + + std::atomic newest_node_; + std::atomic node_cnt_; + const int queue_slow_size_; // default value: worker_num_ * 100 size_t max_queue_size_; + + const uint64_t max_yield_usec_; + const uint64_t slow_yield_usec_; + + AdaptationContext adp_ctx; + + size_t worker_num_; std::string thread_pool_name_; std::queue queue_; std::priority_queue time_queue_; @@ -81,7 +125,6 @@ class ThreadPool : public pstd::noncopyable { pstd::Mutex mu_; pstd::CondVar rsignal_; pstd::CondVar wsignal_; - }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 4ea4b82125..656cb2cdc6 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -8,6 +8,8 @@ #include +#include +#include #include namespace net { @@ -41,8 +43,14 @@ int ThreadPool::Worker::stop() { return 0; } -ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) - : worker_num_(worker_num), +ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) + : newest_node_(nullptr), + node_cnt_(0), + queue_slow_size_(worker_num * 50), + max_yield_usec_(100), + slow_yield_usec_(3), + adp_ctx(), + worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), @@ -90,13 +98,18 @@ bool ThreadPool::should_stop() { return should_stop_.load(); } void ThreadPool::set_should_stop() { should_stop_.store(true); } void ThreadPool::Schedule(TaskFunc func, void* arg) { - std::unique_lock lock(mu_); - wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); }); - - if (!should_stop()) { - queue_.emplace(func, arg); - rsignal_.notify_one(); + // stop until the size of tasks queue is not greater than max_queue_size_ + while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { + std::this_thread::yield(); } + // slow like above + if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { + std::this_thread::yield(); + } + auto node = new Node(func, arg); + LinkOne(node, &newest_node_); + node_cnt_++; + rsignal_.notify_one(); } /* @@ -109,6 +122,8 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { std::lock_guard lock(mu_); if (!should_stop()) { + // now the member time_queue_ maybe NOT be used + throw std::logic_error("unreachable logic"); time_queue_.emplace(exec_time, func, arg); rsignal_.notify_all(); } @@ -116,10 +131,7 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { size_t ThreadPool::max_queue_size() { return max_queue_size_; } -void ThreadPool::cur_queue_size(size_t* qsize) { - std::lock_guard lock(mu_); - *qsize = queue_.size(); -} +void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } void ThreadPool::cur_time_queue_size(size_t* qsize) { std::lock_guard lock(mu_); @@ -131,34 +143,117 @@ std::string ThreadPool::thread_pool_name() { return thread_pool_name_; } void ThreadPool::runInThread() { while (!should_stop()) { std::unique_lock lock(mu_); - rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); }); + rsignal_.wait(lock, [this]() { return newest_node_.load(std::memory_order_relaxed) != nullptr || should_stop(); }); + lock.unlock(); + retry: if (should_stop()) { break; } - if (!time_queue_.empty()) { - auto now = std::chrono::system_clock::now(); - uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); - - auto [exec_time, func, arg] = time_queue_.top(); - if (unow >= exec_time) { - time_queue_.pop(); - lock.unlock(); - (*func)(arg); - continue; - } else if (queue_.empty() && !should_stop()) { - rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); - lock.unlock(); + + auto last = newest_node_.exchange(nullptr); + if (last == nullptr) { + // 1. loop for short time + for (uint32_t tries = 0; tries < 200; ++tries) { + if (newest_node_.load(std::memory_order_acquire) != nullptr) { + last = newest_node_.exchange(nullptr); + if (last != nullptr) { + goto exec; + } + } + AsmVolatilePause(); + } + + // 2. loop for a little short time again + const size_t kMaxSlowYieldsWhileSpinning = 3; + auto& yield_credit = adp_ctx.value; + bool update_ctx = false; + bool would_spin_again = false; + const int sampling_base = 256; + + update_ctx = Random::GetTLSInstance()->OneIn(sampling_base); + + if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) { + auto spin_begin = std::chrono::steady_clock::now(); + + size_t slow_yield_count = 0; + + auto iter_begin = spin_begin; + while ((iter_begin - spin_begin) <= std::chrono::microseconds(max_yield_usec_)) { + std::this_thread::yield(); + + if (newest_node_.load(std::memory_order_acquire) != nullptr) { + last = newest_node_.exchange(nullptr); + if (last != nullptr) { + would_spin_again = true; + // success + break; + } + } + + auto now = std::chrono::steady_clock::now(); + if (now == iter_begin || now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { + ++slow_yield_count; + if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { + update_ctx = true; + break; + } + } + iter_begin = now; + } + } + + // update percentage of next loop 2 + if (update_ctx) { + auto v = yield_credit.load(std::memory_order_relaxed); + v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; + yield_credit.store(v, std::memory_order_relaxed); + } + + if (!would_spin_again) { + // 3. wait for new task continue; } } - if (!queue_.empty()) { - auto [func, arg] = queue_.front(); - queue_.pop(); - wsignal_.notify_one(); - lock.unlock(); - (*func)(arg); + exec: + // do all tasks older than this task pointed last + int cnt = 0; + auto first = CreateMissingNewerLinks(last, &cnt); + node_cnt_ -= cnt; + Node* tmp = nullptr; + do { + first->Exec(); + tmp = first; + first = first->Next(); + delete tmp; + } while (first != nullptr); + goto retry; + } +} + +ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { + assert(head != nullptr); + Node* next = nullptr; + cnt++; + while (true) { + next = head->link_older; + if (next == nullptr) { + return head; + } + cnt++; + next->link_newer = head; + head = next; + } +} + +bool ThreadPool::LinkOne(Node* node, std::atomic* newest_node) { + assert(newest_node != nullptr); + auto nodes = newest_node->load(std::memory_order_relaxed); + while (true) { + node->link_older = nodes; + if (newest_node->compare_exchange_weak(nodes, node)) { + return (nodes == nullptr); } } } From 79ca6c6306bbdc20e99182cce834c8cb42147327 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Sun, 12 May 2024 14:42:46 +0800 Subject: [PATCH 02/13] add Copyright and replace "#param once" with "#ifdef" --- src/net/include/likely.h | 19 ++++++++++++------- src/net/include/random.h | 13 +++++++++++-- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/net/include/likely.h b/src/net/include/likely.h index ac27a8168f..4dd0865ac6 100644 --- a/src/net/include/likely.h +++ b/src/net/include/likely.h @@ -1,12 +1,17 @@ -#ifndef PORT_LIKELY_H_ -#define PORT_LIKELY_H_ +// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef LIKELY_H +#define LIKELY_H #if defined(__GNUC__) && __GNUC__ >= 4 -#define LIKELY(x) (__builtin_expect((x), 1)) -#define UNLIKELY(x) (__builtin_expect((x), 0)) +# define LIKELY(x) (__builtin_expect((x), 1)) +# define UNLIKELY(x) (__builtin_expect((x), 0)) #else -#define LIKELY(x) (x) -#define UNLIKELY(x) (x) +# define LIKELY(x) (x) +# define UNLIKELY(x) (x) #endif -#endif // PORT_LIKELY_H_ \ No newline at end of file +#endif // LIKELY_H diff --git a/src/net/include/random.h b/src/net/include/random.h index 515ca7252b..6112f6bf31 100644 --- a/src/net/include/random.h +++ b/src/net/include/random.h @@ -1,4 +1,11 @@ -#pragma once +// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef RANDOM_H +#define RANDOM_H + #include #include #include @@ -75,4 +82,6 @@ class Random { } return rv; } -}; \ No newline at end of file +}; + +#endif // LIKELY_H From 1bb167cf42b7fab3d8e6f20b7e9a3120aff91529 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Mon, 13 May 2024 14:42:13 +0800 Subject: [PATCH 03/13] change the lisence Copyright start date --- src/net/include/likely.h | 2 +- src/net/include/random.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/include/likely.h b/src/net/include/likely.h index 4dd0865ac6..6e07121f0b 100644 --- a/src/net/include/likely.h +++ b/src/net/include/likely.h @@ -1,4 +1,4 @@ -// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved. +// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. diff --git a/src/net/include/random.h b/src/net/include/random.h index 6112f6bf31..94140a8bae 100644 --- a/src/net/include/random.h +++ b/src/net/include/random.h @@ -1,4 +1,4 @@ -// Copyright (c) 2018-present, Qihoo, Inc. All rights reserved. +// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. From f9a15cde5ecdc5ec16f4a54cd9381b502406e522 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Mon, 13 May 2024 23:18:50 +0800 Subject: [PATCH 04/13] add comment for the order between unlock and consumption --- src/net/src/thread_pool.cc | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 656cb2cdc6..d7c975207a 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -116,14 +116,15 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { * timeout is in millisecond */ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { + // now the member time_queue_ maybe NOT be used + throw std::logic_error("unreachable logic"); + auto now = std::chrono::system_clock::now(); uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; std::lock_guard lock(mu_); if (!should_stop()) { - // now the member time_queue_ maybe NOT be used - throw std::logic_error("unreachable logic"); time_queue_.emplace(exec_time, func, arg); rsignal_.notify_all(); } @@ -134,6 +135,10 @@ size_t ThreadPool::max_queue_size() { return max_queue_size_; } void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } void ThreadPool::cur_time_queue_size(size_t* qsize) { + // now the member time_queue_ maybe NOT be used + *qsize = 0; + return; + std::lock_guard lock(mu_); *qsize = time_queue_.size(); } @@ -144,6 +149,15 @@ void ThreadPool::runInThread() { while (!should_stop()) { std::unique_lock lock(mu_); rsignal_.wait(lock, [this]() { return newest_node_.load(std::memory_order_relaxed) != nullptr || should_stop(); }); + // Consuming tasks before unlock can cause spurious wakeup + // with little possibility in low throughput. Because when + // all worker threads are waiting and a task coming, only one + // worker thread can be awakened. Only when a thread is awakened + // but the task has not been consumed, and at this time a new task + // is coming and make a new worker thread awakened, the spurious + // wakeup occur. Because the former thread can consume all tasks + // and the latter thread will consume NOTHING. So, for high + // throughput it MUST consume before unlock. lock.unlock(); retry: From d89cd2e4a286d9e92523de4c9eaf68e5967511d2 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Thu, 16 May 2024 22:36:12 +0800 Subject: [PATCH 05/13] fix bug --- src/net/include/random.h | 6 +- src/net/include/thread_pool.h | 14 ++-- src/net/src/thread_pool.cc | 127 ++++++++++++++++++++++------------ 3 files changed, 96 insertions(+), 51 deletions(-) diff --git a/src/net/include/random.h b/src/net/include/random.h index 94140a8bae..54d067f548 100644 --- a/src/net/include/random.h +++ b/src/net/include/random.h @@ -14,6 +14,8 @@ #include "net/include/likely.h" +namespace net { + class Random { private: enum : uint32_t { @@ -84,4 +86,6 @@ class Random { } }; -#endif // LIKELY_H +} // namespace net + +#endif // RANDOM_H \ No newline at end of file diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index aa53e5ec77..9187faa3f2 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -82,9 +82,12 @@ class ThreadPool : public pstd::noncopyable { Node* link_older = nullptr; Node* link_newer = nullptr; - Task task; + // true if task is TimeTask + bool is_time_task; + TimeTask task; - Node(TaskFunc func, void* arg) : task(func, arg) {} + Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {} + Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {} inline void Exec() { task.func(task.arg); } inline Node* Next() { return link_newer; } @@ -105,8 +108,11 @@ class ThreadPool : public pstd::noncopyable { bool LinkOne(Node* node, std::atomic* newest_node); std::atomic newest_node_; - std::atomic node_cnt_; - const int queue_slow_size_; // default value: worker_num_ * 100 + std::atomic node_cnt_; // for task + std::atomic time_newest_node_; + std::atomic time_node_cnt_; // for time task + + const int queue_slow_size_; // default value: max(worker_num_ * 100, max_queue_size_) size_t max_queue_size_; const uint64_t max_yield_usec_; diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index d7c975207a..3f146566da 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -46,7 +46,7 @@ int ThreadPool::Worker::stop() { ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) : newest_node_(nullptr), node_cnt_(0), - queue_slow_size_(worker_num * 50), + queue_slow_size_(std::max(worker_num_ * 100, max_queue_size_)), max_yield_usec_(100), slow_yield_usec_(3), adp_ctx(), @@ -106,26 +106,26 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { std::this_thread::yield(); } - auto node = new Node(func, arg); - LinkOne(node, &newest_node_); - node_cnt_++; - rsignal_.notify_one(); + if (LIKELY(!should_stop())) { + auto node = new Node(func, arg); + LinkOne(node, &newest_node_); + node_cnt_++; + rsignal_.notify_one(); + } } /* * timeout is in millisecond */ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { - // now the member time_queue_ maybe NOT be used - throw std::logic_error("unreachable logic"); - auto now = std::chrono::system_clock::now(); uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; - std::lock_guard lock(mu_); - if (!should_stop()) { - time_queue_.emplace(exec_time, func, arg); + if (LIKELY(!should_stop())) { + auto node = new Node(exec_time, func, arg); + LinkOne(node, &time_newest_node_); + time_node_cnt_++; rsignal_.notify_all(); } } @@ -134,39 +134,30 @@ size_t ThreadPool::max_queue_size() { return max_queue_size_; } void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } -void ThreadPool::cur_time_queue_size(size_t* qsize) { - // now the member time_queue_ maybe NOT be used - *qsize = 0; - return; - - std::lock_guard lock(mu_); - *qsize = time_queue_.size(); -} +void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.load(std::memory_order_relaxed); } std::string ThreadPool::thread_pool_name() { return thread_pool_name_; } void ThreadPool::runInThread() { - while (!should_stop()) { + Node* tmp = nullptr; + Node* last = nullptr; + Node* time_last = nullptr; + while (LIKELY(!should_stop())) { std::unique_lock lock(mu_); - rsignal_.wait(lock, [this]() { return newest_node_.load(std::memory_order_relaxed) != nullptr || should_stop(); }); - // Consuming tasks before unlock can cause spurious wakeup - // with little possibility in low throughput. Because when - // all worker threads are waiting and a task coming, only one - // worker thread can be awakened. Only when a thread is awakened - // but the task has not been consumed, and at this time a new task - // is coming and make a new worker thread awakened, the spurious - // wakeup occur. Because the former thread can consume all tasks - // and the latter thread will consume NOTHING. So, for high - // throughput it MUST consume before unlock. + rsignal_.wait(lock, [this]() { + return newest_node_.load(std::memory_order_relaxed) != nullptr || + UNLIKELY(time_newest_node_.load(std::memory_order_relaxed) != nullptr) || UNLIKELY(should_stop()); + }); lock.unlock(); retry: - if (should_stop()) { + if (UNLIKELY(should_stop())) { break; } - auto last = newest_node_.exchange(nullptr); - if (last == nullptr) { + last = newest_node_.exchange(nullptr); + time_last = time_newest_node_.exchange(nullptr); + if (last == nullptr && LIKELY(time_last == nullptr)) { // 1. loop for short time for (uint32_t tries = 0; tries < 200; ++tries) { if (newest_node_.load(std::memory_order_acquire) != nullptr) { @@ -175,6 +166,12 @@ void ThreadPool::runInThread() { goto exec; } } + if (UNLIKELY(time_newest_node_.load(std::memory_order_acquire) != nullptr)) { + time_last = time_newest_node_.exchange(nullptr); + if (time_last != nullptr) { + goto exec; + } + } AsmVolatilePause(); } @@ -204,6 +201,14 @@ void ThreadPool::runInThread() { break; } } + if (UNLIKELY(time_newest_node_.load(std::memory_order_acquire) != nullptr)) { + time_last = time_newest_node_.exchange(nullptr); + if (time_last != nullptr) { + would_spin_again = true; + // success + break; + } + } auto now = std::chrono::steady_clock::now(); if (now == iter_begin || now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { @@ -231,17 +236,47 @@ void ThreadPool::runInThread() { } exec: - // do all tasks older than this task pointed last - int cnt = 0; - auto first = CreateMissingNewerLinks(last, &cnt); - node_cnt_ -= cnt; - Node* tmp = nullptr; - do { - first->Exec(); - tmp = first; - first = first->Next(); - delete tmp; - } while (first != nullptr); + // do all normal tasks older than this task pointed last + if (LIKELY(last != nullptr)) { + int cnt = 0; + auto first = CreateMissingNewerLinks(last, &cnt); + assert(!first->is_time_task); + node_cnt_ -= cnt; + do { + first->Exec(); + // node_cnt_--; + tmp = first; + first = first->Next(); + delete tmp; + } while (first != nullptr); + } + + // do all time tasks older than this task pointed time_last + if (UNLIKELY(time_last != nullptr)) { + int time_cnt = 0; + auto time_first = CreateMissingNewerLinks(time_last, &time_cnt); + // time_node_cnt_ -= time_cnt; + do { + // time task may block normal task + auto now = std::chrono::system_clock::now(); + uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); + + auto [exec_time, func, arg] = time_first->task; + assert(time_first->is_time_task); + if (unow >= exec_time) { + time_first->Exec(); + time_node_cnt_--; + } else { + lock.lock(); + rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); + time_first->Exec(); + time_node_cnt_--; + } + tmp = time_first; + time_first = time_first->Next(); + delete tmp; + } while (time_first != nullptr); + } goto retry; } } @@ -249,13 +284,13 @@ void ThreadPool::runInThread() { ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { assert(head != nullptr); Node* next = nullptr; - cnt++; + *cnt = 1; while (true) { next = head->link_older; if (next == nullptr) { return head; } - cnt++; + ++(*cnt); next->link_newer = head; head = next; } From 37784619d8b397cd0b6c09578f05f0ea95173d46 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Thu, 16 May 2024 23:17:20 +0800 Subject: [PATCH 06/13] add some tips for failing to start codis --- tests/integration/start_codis.sh | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/integration/start_codis.sh b/tests/integration/start_codis.sh index c686251eb3..cbea0bdbd5 100755 --- a/tests/integration/start_codis.sh +++ b/tests/integration/start_codis.sh @@ -1,10 +1,16 @@ #!/bin/bash -#pkill -9 pika -#pkill -9 codis -#rm -rf /tmp/codis -#rm -rf codis_data_1 -#rm -rf codis_data_2 +# sugguest clear these before starting codis +# pkill -9 pika +# pkill -9 codis +# rm -rf /tmp/codis +# rm -rf codis_data* +# rm -rf acl*data +# rm -rf dump +# rm -rf db +# rm -rf log +# rm -rf slave_data +# rm -rf master_data CODIS_DASHBOARD_ADDR=127.0.0.1:18080 @@ -39,6 +45,7 @@ echo 'startup codis dashboard and codis proxy' sleep 20 +# if start failed, maybe for your struct/class that be defined NOT be used echo 'assign codis slots to groups and resync groups' ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=1 ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=2 From 43bd1ab11edcca523268f37c61cfb0283180f06d Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Sat, 18 May 2024 10:45:44 +0800 Subject: [PATCH 07/13] fix bug: addtional introduced packages maybe cause core dump when starting pika in centos --- src/net/include/thread_pool.h | 21 ++++++++++----------- src/net/src/thread_pool.cc | 2 +- tests/integration/start_codis.sh | 1 + 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 9187faa3f2..6ca84006d7 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -8,23 +8,22 @@ #include #include -#include #include #include "net/include/net_define.h" #include "net/include/random.h" #include "pstd/include/pstd_mutex.h" - + namespace net { using TaskFunc = void (*)(void*); -struct Task { - Task() = default; - TaskFunc func = nullptr; - void* arg = nullptr; - Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {} -}; +// struct Task { +// Task() = default; +// TaskFunc func = nullptr; +// void* arg = nullptr; +// Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {} +// }; struct TimeTask { uint64_t exec_time; @@ -122,15 +121,15 @@ class ThreadPool : public pstd::noncopyable { size_t worker_num_; std::string thread_pool_name_; - std::queue queue_; - std::priority_queue time_queue_; + // std::queue queue_; + // std::priority_queue time_queue_; std::vector workers_; std::atomic running_; std::atomic should_stop_; pstd::Mutex mu_; pstd::CondVar rsignal_; - pstd::CondVar wsignal_; + // pstd::CondVar wsignal_; }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 3f146566da..6b4b43e134 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -78,7 +78,7 @@ int ThreadPool::stop_thread_pool() { if (running_.load()) { should_stop_.store(true); rsignal_.notify_all(); - wsignal_.notify_all(); + // wsignal_.notify_all(); for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { diff --git a/tests/integration/start_codis.sh b/tests/integration/start_codis.sh index cbea0bdbd5..2ab1ddbdfc 100755 --- a/tests/integration/start_codis.sh +++ b/tests/integration/start_codis.sh @@ -46,6 +46,7 @@ echo 'startup codis dashboard and codis proxy' sleep 20 # if start failed, maybe for your struct/class that be defined NOT be used +# or addtional introduced packages echo 'assign codis slots to groups and resync groups' ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=1 ./bin/codis-admin --dashboard=$CODIS_DASHBOARD_ADDR --create-group --gid=2 From 14f59b307c68ce06316dffba7150151f9482ceab Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Wed, 22 May 2024 15:37:23 +0800 Subject: [PATCH 08/13] fix bug: failed to start redis server --- src/net/include/net_define.h | 2 -- src/net/include/random.h | 5 +---- src/net/include/thread_pool.h | 14 ++------------ src/net/src/thread_pool.cc | 31 +++++++++++++++---------------- 4 files changed, 18 insertions(+), 34 deletions(-) diff --git a/src/net/include/net_define.h b/src/net/include/net_define.h index 4ec16cc4e3..3974f07517 100644 --- a/src/net/include/net_define.h +++ b/src/net/include/net_define.h @@ -6,9 +6,7 @@ #ifndef NET_INCLUDE_NET_DEFINE_H_ #define NET_INCLUDE_NET_DEFINE_H_ -#include #include -#include namespace net { diff --git a/src/net/include/random.h b/src/net/include/random.h index 54d067f548..595f0e9e3d 100644 --- a/src/net/include/random.h +++ b/src/net/include/random.h @@ -6,11 +6,8 @@ #ifndef RANDOM_H #define RANDOM_H -#include -#include #include #include -#include #include "net/include/likely.h" @@ -88,4 +85,4 @@ class Random { } // namespace net -#endif // RANDOM_H \ No newline at end of file +#endif // RANDOM_H diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 6ca84006d7..bd2e6d8609 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -18,13 +18,6 @@ namespace net { using TaskFunc = void (*)(void*); -// struct Task { -// Task() = default; -// TaskFunc func = nullptr; -// void* arg = nullptr; -// Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {} -// }; - struct TimeTask { uint64_t exec_time; TaskFunc func; @@ -103,7 +96,7 @@ class ThreadPool : public pstd::noncopyable { // it's okay for other platforms to be no-ops } - Node* CreateMissingNewerLinks(Node* head, int* cnt); + Node* CreateMissingNewerLinks(Node* head); bool LinkOne(Node* node, std::atomic* newest_node); std::atomic newest_node_; @@ -111,7 +104,7 @@ class ThreadPool : public pstd::noncopyable { std::atomic time_newest_node_; std::atomic time_node_cnt_; // for time task - const int queue_slow_size_; // default value: max(worker_num_ * 100, max_queue_size_) + const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) size_t max_queue_size_; const uint64_t max_yield_usec_; @@ -121,15 +114,12 @@ class ThreadPool : public pstd::noncopyable { size_t worker_num_; std::string thread_pool_name_; - // std::queue queue_; - // std::priority_queue time_queue_; std::vector workers_; std::atomic running_; std::atomic should_stop_; pstd::Mutex mu_; pstd::CondVar rsignal_; - // pstd::CondVar wsignal_; }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 6b4b43e134..0a55ff9e0a 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -46,12 +46,14 @@ int ThreadPool::Worker::stop() { ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) : newest_node_(nullptr), node_cnt_(0), - queue_slow_size_(std::max(worker_num_ * 100, max_queue_size_)), + time_newest_node_(nullptr), + time_node_cnt_(0), + queue_slow_size_(std::min(worker_num * 10, max_queue_size)), + max_queue_size_(max_queue_size), max_yield_usec_(100), slow_yield_usec_(3), adp_ctx(), worker_num_(worker_num), - max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {} @@ -78,7 +80,6 @@ int ThreadPool::stop_thread_pool() { if (running_.load()) { should_stop_.store(true); rsignal_.notify_all(); - // wsignal_.notify_all(); for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { @@ -106,6 +107,7 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { std::this_thread::yield(); } + // std::unique_lock lock(mu_); if (LIKELY(!should_stop())) { auto node = new Node(func, arg); LinkOne(node, &newest_node_); @@ -122,6 +124,7 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; + // std::unique_lock lock(mu_); if (LIKELY(!should_stop())) { auto node = new Node(exec_time, func, arg); LinkOne(node, &time_newest_node_); @@ -132,6 +135,8 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { size_t ThreadPool::max_queue_size() { return max_queue_size_; } +size_t ThreadPool::worker_size() { return worker_num_; } + void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); } void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.load(std::memory_order_relaxed); } @@ -174,7 +179,7 @@ void ThreadPool::runInThread() { } AsmVolatilePause(); } - + // 2. loop for a little short time again const size_t kMaxSlowYieldsWhileSpinning = 3; auto& yield_credit = adp_ctx.value; @@ -238,24 +243,20 @@ void ThreadPool::runInThread() { exec: // do all normal tasks older than this task pointed last if (LIKELY(last != nullptr)) { - int cnt = 0; - auto first = CreateMissingNewerLinks(last, &cnt); + auto first = CreateMissingNewerLinks(last); assert(!first->is_time_task); - node_cnt_ -= cnt; do { first->Exec(); - // node_cnt_--; tmp = first; first = first->Next(); + node_cnt_--; delete tmp; } while (first != nullptr); } // do all time tasks older than this task pointed time_last if (UNLIKELY(time_last != nullptr)) { - int time_cnt = 0; - auto time_first = CreateMissingNewerLinks(time_last, &time_cnt); - // time_node_cnt_ -= time_cnt; + auto time_first = CreateMissingNewerLinks(time_last); do { // time task may block normal task auto now = std::chrono::system_clock::now(); @@ -265,15 +266,15 @@ void ThreadPool::runInThread() { assert(time_first->is_time_task); if (unow >= exec_time) { time_first->Exec(); - time_node_cnt_--; } else { lock.lock(); rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); + lock.unlock(); time_first->Exec(); - time_node_cnt_--; } tmp = time_first; time_first = time_first->Next(); + time_node_cnt_--; delete tmp; } while (time_first != nullptr); } @@ -281,16 +282,14 @@ void ThreadPool::runInThread() { } } -ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { +ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head) { assert(head != nullptr); Node* next = nullptr; - *cnt = 1; while (true) { next = head->link_older; if (next == nullptr) { return head; } - ++(*cnt); next->link_newer = head; head = next; } From 807a1c48dd689767dfb287ba2eedbc6ad503c4d6 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Fri, 24 May 2024 00:17:55 +0800 Subject: [PATCH 09/13] one worker thread, one list --- src/net/include/thread_pool.h | 29 +++++++---- src/net/src/thread_pool.cc | 91 +++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 44 deletions(-) diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index bd2e6d8609..8909a1c071 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -9,11 +9,12 @@ #include #include #include +#include #include "net/include/net_define.h" #include "net/include/random.h" #include "pstd/include/pstd_mutex.h" - + namespace net { using TaskFunc = void (*)(void*); @@ -30,7 +31,13 @@ class ThreadPool : public pstd::noncopyable { public: class Worker { public: - explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){}; + struct Arg { + Arg(void* p, int i) : arg(p), idx(i) {} + void* arg; + int idx; + }; + + explicit Worker(ThreadPool* tp, int idx = 0) : start_(false), thread_pool_(tp), idx_(idx), arg_(tp, idx){}; static void* WorkerMain(void* arg); int start(); @@ -41,6 +48,8 @@ class ThreadPool : public pstd::noncopyable { std::atomic start_; ThreadPool* const thread_pool_; std::string worker_name_; + int idx_; + Arg arg_; }; explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool"); @@ -60,7 +69,7 @@ class ThreadPool : public pstd::noncopyable { std::string thread_pool_name(); private: - void runInThread(); + void runInThread(const int idx = 0); public: struct AdaptationContext { @@ -96,12 +105,14 @@ class ThreadPool : public pstd::noncopyable { // it's okay for other platforms to be no-ops } - Node* CreateMissingNewerLinks(Node* head); + Node* CreateMissingNewerLinks(Node* head, int* cnt); bool LinkOne(Node* node, std::atomic* newest_node); - std::atomic newest_node_; + int task_idx_; +std::vector> asd; + std::vector> newest_node_; std::atomic node_cnt_; // for task - std::atomic time_newest_node_; + std::vector> time_newest_node_; std::atomic time_node_cnt_; // for time task const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_) @@ -112,14 +123,14 @@ class ThreadPool : public pstd::noncopyable { AdaptationContext adp_ctx; - size_t worker_num_; + const size_t worker_num_; std::string thread_pool_name_; std::vector workers_; std::atomic running_; std::atomic should_stop_; - pstd::Mutex mu_; - pstd::CondVar rsignal_; + std::vector mu_; + std::vector rsignal_; }; } // namespace net diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 0a55ff9e0a..a1f615c831 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -14,15 +14,16 @@ namespace net { -void* ThreadPool::Worker::WorkerMain(void* arg) { - auto tp = static_cast(arg); - tp->runInThread(); +void* ThreadPool::Worker::WorkerMain(void* p) { + auto arg = static_cast(p); + auto tp = static_cast(arg->arg); + tp->runInThread(arg->idx); return nullptr; } int ThreadPool::Worker::start() { if (!start_.load()) { - if (pthread_create(&thread_id_, nullptr, &WorkerMain, thread_pool_) != 0) { + if (pthread_create(&thread_id_, nullptr, &WorkerMain, &arg_) != 0) { return -1; } else { start_.store(true); @@ -44,9 +45,9 @@ int ThreadPool::Worker::stop() { } ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) - : newest_node_(nullptr), + : newest_node_(worker_num), node_cnt_(0), - time_newest_node_(nullptr), + time_newest_node_(worker_num), time_node_cnt_(0), queue_slow_size_(std::min(worker_num * 10, max_queue_size)), max_queue_size_(max_queue_size), @@ -56,7 +57,14 @@ ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thr worker_num_(worker_num), thread_pool_name_(std::move(thread_pool_name)), running_(false), - should_stop_(false) {} + should_stop_(false), + mu_(worker_num), + rsignal_(worker_num) { + for (size_t i = 0; i < worker_num_; ++i) { + newest_node_[i] = nullptr; + time_newest_node_[i] = nullptr; + } +} ThreadPool::~ThreadPool() { stop_thread_pool(); } @@ -64,7 +72,7 @@ int ThreadPool::start_thread_pool() { if (!running_.load()) { should_stop_.store(false); for (size_t i = 0; i < worker_num_; ++i) { - workers_.push_back(new Worker(this)); + workers_.push_back(new Worker(this, i)); int res = workers_[i]->start(); if (res != 0) { return kCreateThreadError; @@ -79,7 +87,9 @@ int ThreadPool::stop_thread_pool() { int res = 0; if (running_.load()) { should_stop_.store(true); - rsignal_.notify_all(); + for (auto& r : rsignal_) { + r.notify_all(); + } for (const auto worker : workers_) { res = worker->stop(); if (res != 0) { @@ -107,12 +117,13 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { std::this_thread::yield(); } - // std::unique_lock lock(mu_); + if (LIKELY(!should_stop())) { auto node = new Node(func, arg); - LinkOne(node, &newest_node_); + auto idx = ++task_idx_ % worker_num_; + LinkOne(node, &newest_node_[idx]); node_cnt_++; - rsignal_.notify_one(); + rsignal_[idx].notify_one(); } } @@ -124,12 +135,12 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); uint64_t exec_time = unow + timeout * 1000; - // std::unique_lock lock(mu_); if (LIKELY(!should_stop())) { + auto idx = ++task_idx_ % worker_num_; auto node = new Node(exec_time, func, arg); - LinkOne(node, &time_newest_node_); + LinkOne(node, &newest_node_[idx]); time_node_cnt_++; - rsignal_.notify_all(); + rsignal_[idx].notify_all(); } } @@ -143,15 +154,20 @@ void ThreadPool::cur_time_queue_size(size_t* qsize) { *qsize = time_node_cnt_.lo std::string ThreadPool::thread_pool_name() { return thread_pool_name_; } -void ThreadPool::runInThread() { +void ThreadPool::runInThread(const int idx) { Node* tmp = nullptr; Node* last = nullptr; Node* time_last = nullptr; + auto& newest_node = newest_node_[idx]; + auto& time_newest_node = time_newest_node_[idx]; + auto& mu = mu_[idx]; + auto& rsignal = rsignal_[idx]; + while (LIKELY(!should_stop())) { - std::unique_lock lock(mu_); - rsignal_.wait(lock, [this]() { - return newest_node_.load(std::memory_order_relaxed) != nullptr || - UNLIKELY(time_newest_node_.load(std::memory_order_relaxed) != nullptr) || UNLIKELY(should_stop()); + std::unique_lock lock(mu); + rsignal.wait(lock, [this, &newest_node, &time_newest_node]() { + return newest_node.load(std::memory_order_relaxed) != nullptr || + UNLIKELY(time_newest_node.load(std::memory_order_relaxed) != nullptr) || UNLIKELY(should_stop()); }); lock.unlock(); @@ -160,26 +176,26 @@ void ThreadPool::runInThread() { break; } - last = newest_node_.exchange(nullptr); - time_last = time_newest_node_.exchange(nullptr); + last = newest_node.exchange(nullptr); + time_last = time_newest_node.exchange(nullptr); if (last == nullptr && LIKELY(time_last == nullptr)) { // 1. loop for short time for (uint32_t tries = 0; tries < 200; ++tries) { - if (newest_node_.load(std::memory_order_acquire) != nullptr) { - last = newest_node_.exchange(nullptr); + if (newest_node.load(std::memory_order_acquire) != nullptr) { + last = newest_node.exchange(nullptr); if (last != nullptr) { goto exec; } } - if (UNLIKELY(time_newest_node_.load(std::memory_order_acquire) != nullptr)) { - time_last = time_newest_node_.exchange(nullptr); + if (UNLIKELY(time_newest_node.load(std::memory_order_acquire) != nullptr)) { + time_last = time_newest_node.exchange(nullptr); if (time_last != nullptr) { goto exec; } } AsmVolatilePause(); } - + // 2. loop for a little short time again const size_t kMaxSlowYieldsWhileSpinning = 3; auto& yield_credit = adp_ctx.value; @@ -198,16 +214,16 @@ void ThreadPool::runInThread() { while ((iter_begin - spin_begin) <= std::chrono::microseconds(max_yield_usec_)) { std::this_thread::yield(); - if (newest_node_.load(std::memory_order_acquire) != nullptr) { - last = newest_node_.exchange(nullptr); + if (newest_node.load(std::memory_order_acquire) != nullptr) { + last = newest_node.exchange(nullptr); if (last != nullptr) { would_spin_again = true; // success break; } } - if (UNLIKELY(time_newest_node_.load(std::memory_order_acquire) != nullptr)) { - time_last = time_newest_node_.exchange(nullptr); + if (UNLIKELY(time_newest_node.load(std::memory_order_acquire) != nullptr)) { + time_last = time_newest_node.exchange(nullptr); if (time_last != nullptr) { would_spin_again = true; // success @@ -243,7 +259,9 @@ void ThreadPool::runInThread() { exec: // do all normal tasks older than this task pointed last if (LIKELY(last != nullptr)) { - auto first = CreateMissingNewerLinks(last); + int cnt = 1; + auto first = CreateMissingNewerLinks(last, &cnt); + // node_cnt_ -= cnt; assert(!first->is_time_task); do { first->Exec(); @@ -256,7 +274,8 @@ void ThreadPool::runInThread() { // do all time tasks older than this task pointed time_last if (UNLIKELY(time_last != nullptr)) { - auto time_first = CreateMissingNewerLinks(time_last); + int cnt = 1; + auto time_first = CreateMissingNewerLinks(time_last, &cnt); do { // time task may block normal task auto now = std::chrono::system_clock::now(); @@ -268,7 +287,7 @@ void ThreadPool::runInThread() { time_first->Exec(); } else { lock.lock(); - rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow)); + rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow)); lock.unlock(); time_first->Exec(); } @@ -282,14 +301,16 @@ void ThreadPool::runInThread() { } } -ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head) { +ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { assert(head != nullptr); + assert(cnt != nullptr && *cnt == 1); Node* next = nullptr; while (true) { next = head->link_older; if (next == nullptr) { return head; } + ++(*cnt); next->link_newer = head; head = next; } From c21fd6eef4c29ce9aaae456306ba1688bcfd1212 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Wed, 19 Jun 2024 22:06:02 +0800 Subject: [PATCH 10/13] mutil threads per link and mutil links --- src/net/include/thread_pool.h | 6 ++++-- src/net/src/thread_pool.cc | 35 +++++++++++++++++++---------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 8909a1c071..9935512a9f 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -108,8 +108,10 @@ class ThreadPool : public pstd::noncopyable { Node* CreateMissingNewerLinks(Node* head, int* cnt); bool LinkOne(Node* node, std::atomic* newest_node); - int task_idx_; -std::vector> asd; + uint16_t task_idx_; + + const uint8_t nworkers_per_link_ = 2; // numer of workers per link + const uint8_t nlinks_; // number of links (upper around) std::vector> newest_node_; std::atomic node_cnt_; // for task std::vector> time_newest_node_; diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index a1f615c831..5ca9723947 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -45,9 +45,11 @@ int ThreadPool::Worker::stop() { } ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) - : newest_node_(worker_num), + : nlinks_((worker_num + nworkers_per_link_ - 1) / nworkers_per_link_), + // : nlinks_(worker_num), + newest_node_(nlinks_), node_cnt_(0), - time_newest_node_(worker_num), + time_newest_node_(nlinks_), time_node_cnt_(0), queue_slow_size_(std::min(worker_num * 10, max_queue_size)), max_queue_size_(max_queue_size), @@ -58,9 +60,9 @@ ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thr thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false), - mu_(worker_num), - rsignal_(worker_num) { - for (size_t i = 0; i < worker_num_; ++i) { + mu_(nlinks_), + rsignal_(nlinks_) { + for (size_t i = 0; i < nlinks_; ++i) { newest_node_[i] = nullptr; time_newest_node_[i] = nullptr; } @@ -71,7 +73,7 @@ ThreadPool::~ThreadPool() { stop_thread_pool(); } int ThreadPool::start_thread_pool() { if (!running_.load()) { should_stop_.store(false); - for (size_t i = 0; i < worker_num_; ++i) { + for (size_t i = 0; i < nlinks_; ++i) { workers_.push_back(new Worker(this, i)); int res = workers_[i]->start(); if (res != 0) { @@ -120,10 +122,10 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { if (LIKELY(!should_stop())) { auto node = new Node(func, arg); - auto idx = ++task_idx_ % worker_num_; - LinkOne(node, &newest_node_[idx]); + auto idx = ++task_idx_; + LinkOne(node, &newest_node_[idx % nlinks_]); node_cnt_++; - rsignal_[idx].notify_one(); + rsignal_[idx % nlinks_].notify_one(); } } @@ -136,11 +138,11 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) { uint64_t exec_time = unow + timeout * 1000; if (LIKELY(!should_stop())) { - auto idx = ++task_idx_ % worker_num_; + auto idx = ++task_idx_; auto node = new Node(exec_time, func, arg); - LinkOne(node, &newest_node_[idx]); + LinkOne(node, &newest_node_[idx % nlinks_]); time_node_cnt_++; - rsignal_[idx].notify_all(); + rsignal_[idx % nlinks_].notify_all(); } } @@ -158,10 +160,11 @@ void ThreadPool::runInThread(const int idx) { Node* tmp = nullptr; Node* last = nullptr; Node* time_last = nullptr; - auto& newest_node = newest_node_[idx]; - auto& time_newest_node = time_newest_node_[idx]; - auto& mu = mu_[idx]; - auto& rsignal = rsignal_[idx]; + + auto& newest_node = newest_node_[idx % nlinks_]; + auto& time_newest_node = time_newest_node_[idx % nlinks_]; + auto& mu = mu_[idx % nlinks_]; + auto& rsignal = rsignal_[idx % nlinks_]; while (LIKELY(!should_stop())) { std::unique_lock lock(mu); From 8694da02e530bf6e74893a99ff6766258e39c066 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Thu, 27 Jun 2024 19:01:01 +0800 Subject: [PATCH 11/13] fix: timer task maybe block worker thread --- src/net/include/thread_pool.h | 3 +++ src/net/src/thread_pool.cc | 23 ++++++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index 9935512a9f..fed8f42ddb 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -94,6 +94,9 @@ class ThreadPool : public pstd::noncopyable { inline Node* Next() { return link_newer; } }; + // re-push some timer tasks which has been poped + void ReDelaySchedule(Node* nodes); + static inline void AsmVolatilePause() { #if defined(__i386__) || defined(__x86_64__) asm volatile("pause"); diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index b0497a8606..5f067b1148 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -291,8 +291,16 @@ void ThreadPool::runInThread(const int idx) { time_first->Exec(); } else { lock.lock(); - rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow)); + // if task is coming now, do task immediately + auto res = rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow), [this, &newest_node]() { + return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop()); + }); lock.unlock(); + if (res) { + // re-push the timer tasks + ReDelaySchedule(time_first); + goto retry; + } time_first->Exec(); } tmp = time_first; @@ -305,6 +313,19 @@ void ThreadPool::runInThread(const int idx) { } } +void ThreadPool::ReDelaySchedule(Node* nodes) { + while (LIKELY(!should_stop()) && nodes != nullptr) { + auto idx = ++task_idx_; + auto nxt = nodes->Next(); + nodes->link_newer = nullptr; + // auto node = new Node(exec_time, func, arg); + LinkOne(nodes, &newest_node_[idx % nlinks_]); + time_node_cnt_++; + rsignal_[idx % nlinks_].notify_all(); + nodes = nxt; + } +} + ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) { assert(head != nullptr); assert(cnt != nullptr && *cnt == 1); From 6e6b808cad731426b0f7fd5d76abc12fcf8142c0 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Sat, 13 Jul 2024 11:44:54 +0800 Subject: [PATCH 12/13] change default queue_slow_size_ and nworkers_per_link_ --- src/net/include/thread_pool.h | 2 +- src/net/src/thread_pool.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index fed8f42ddb..8d20957194 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -113,7 +113,7 @@ class ThreadPool : public pstd::noncopyable { uint16_t task_idx_; - const uint8_t nworkers_per_link_ = 2; // numer of workers per link + const uint8_t nworkers_per_link_ = 1; // numer of workers per link const uint8_t nlinks_; // number of links (upper around) std::vector> newest_node_; std::atomic node_cnt_; // for task diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index 5f067b1148..c37f08df15 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -52,7 +52,7 @@ ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thr node_cnt_(0), time_newest_node_(nlinks_), time_node_cnt_(0), - queue_slow_size_(std::min(worker_num * 10, max_queue_size)), + queue_slow_size_(std::max(10UL, std::min(worker_num * max_queue_size / 100, max_queue_size))), max_queue_size_(max_queue_size), max_yield_usec_(100), slow_yield_usec_(3), From ec24c39dfe4cf7be79174ffec0b642c722631a16 Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Mon, 22 Jul 2024 21:26:45 +0800 Subject: [PATCH 13/13] remove timer_task and add some default value --- conf/pika.conf | 1357 +++++++++++++++++++----------------- src/net/src/thread_pool.cc | 161 +---- 2 files changed, 757 insertions(+), 761 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index 496d974174..4d1c4f9519 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -1,639 +1,746 @@ -########################### -# Pika configuration file # -########################### +# ########################### +# # Pika configuration file # +# ########################### + +# # Pika port, the default value is 9221. +# # [NOTICE] Port Magic offsets of port+1000 / port+2000 are used by Pika at present. +# # Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221. +# port : 9221 + +# db-instance-num : 3 +# rocksdb-ttl-second : 86400 * 7; +# rocksdb-periodic-second : 86400 * 3; + +# # Random value identifying the Pika server, its string length must be 40. +# # If not set, Pika will generate a random string with a length of 40 random characters. +# # run-id : + +# # Master's run-id +# # master-run-id : + +# # The number of threads for running Pika. +# # It's not recommended to set this value exceeds +# # the number of CPU cores on the deployment server. +# thread-num : 1 + +# # Size of the thread pool, The threads within this pool +# # are dedicated to handling user requests. +# thread-pool-size : 12 + +# # This parameter is used to control whether to separate fast and slow commands. +# # When slow-cmd-pool is set to yes, fast and slow commands are separated. +# # When set to no, they are not separated. +# slow-cmd-pool : no + +# # Size of the low level thread pool, The threads within this pool +# # are dedicated to handling slow user requests. +# slow-cmd-thread-pool-size : 1 + +# # Size of the low level thread pool, The threads within this pool +# # are dedicated to handling slow user requests. +# admin-thread-pool-size : 2 + +# # Slow cmd list e.g. hgetall, mset +# slow-cmd-list : + +# # List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed. +# # Default commands: info, ping, monitor +# # This parameter is only supported by the CONFIG GET command and not by CONFIG SET. +# admin-cmd-list : info, ping, monitor + +# # The number of threads to write DB in slaveNode when replicating. +# # It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. +# sync-thread-num : 6 + +# # The num of threads to write binlog in slaveNode when replicating, +# # each DB cloud only bind to one sync-binlog-thread to write binlog in maximum +# #[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog), +# # eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8 +# # Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases) +# sync-binlog-thread-num : 1 + +# # Directory to store log files of Pika, which contains multiple types of logs, +# # Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which +# # is used for replication. +# log-path : ./log/ + +# # Directory to store the data of Pika. +# db-path : ./db/ + +# # The size of a single RocksDB memtable at the Pika's bottom layer(Pika use RocksDB to store persist data). +# # [Tip] Big write-buffer-size can improve writing performance, +# # but this will generate heavier IO load when flushing from buffer to disk, +# # you should configure it based on you usage scenario. +# # Supported Units [K|M|G], write-buffer-size default unit is in [bytes]. +# write-buffer-size : 256M + +# # The size of one block in arena memory allocation. +# # If <= 0, a proper value is automatically calculated. +# # (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB) +# # Supported Units [K|M|G], arena-block-size default unit is in [bytes]. +# arena-block-size : + +# # Timeout of Pika's connection, counting down starts When there are no requests +# # on a connection (it enters sleep state), when the countdown reaches 0, the connection +# # will be closed by Pika. +# # [Tip] The issue of running out of Pika's connections may be avoided if this value +# # is configured properly. +# # The Unit of timeout is in [seconds] and its default value is 60(s). +# timeout : 60 + +# # The [password of administrator], which is empty by default. +# # [NOTICE] If this admin password is the same as user password (including both being empty), +# # the value of userpass will be ignored and all users are considered as administrators, +# # in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# # PS: "user password" refers to value of the parameter below: userpass. +# requirepass : + +# # Password for replication verify, used for authentication when a slave +# # connects to a master to request replication. +# # [NOTICE] The value of this parameter must match the "requirepass" setting on the master. +# masterauth : + +# # The [password of user], which is empty by default. +# # [NOTICE] If this user password is the same as admin password (including both being empty), +# # the value of this parameter will be ignored and all users are considered as administrators, +# # in this scenario, users are not subject to the restrictions imposed by the userblacklist. +# # PS: "admin password" refers to value of the parameter above: requirepass. +# # userpass : + +# # The blacklist of commands for users that logged in by userpass, +# # the commands that added to this list will not be available for users except for administrator. +# # [Advice] It's recommended to add high-risk commands to this list. +# # [Format] Commands should be separated by ",". For example: FLUSHALL, SHUTDOWN, KEYS, CONFIG +# # By default, this list is empty. +# # userblacklist : + +# # Running Mode of Pika, The current version only supports running in "classic mode". +# # If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases". +# instance-mode : classic + +# # The number of databases when Pika runs in classic mode. +# # The default database id is DB 0. You can select a different one on +# # a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. +# # The value range of this parameter is [1, 8]. +# # [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases), +# # if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper. +# databases : 1 + +# # The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present. +# # By default, this num is set to 0, which means this feature is [not enabled] +# # and the Pika runs in standalone mode. +# replication-num : 0 + +# # consensus level defines the num of confirms(ACKs) the leader node needs to receive from +# # follower nodes before returning the result to the client that sent the request. +# # The [value range] of this parameter is: [0, ...replicaiton-num]. +# # The default value of consensus-level is 0, which means this feature is not enabled. +# consensus-level : 0 + +# # The Prefix of dump file's name. +# # All the files that generated by command "bgsave" will be name with this prefix. +# dump-prefix : + +# # daemonize [yes | no]. +# #daemonize : yes + +# # The directory to stored dump files that generated by command "bgsave". +# dump-path : ./dump/ + +# # TTL of dump files that generated by command "bgsave". +# # Any dump files which exceed this TTL will be deleted. +# # Unit of dump-expire is in [days] and the default value is 0(day), +# # which means dump files never expire. +# dump-expire : 0 + +# # Pid file Path of Pika. +# pidfile : ./pika.pid + +# # The Maximum number of Pika's Connection. +# maxclients : 20000 + +# # The size of sst file in RocksDB(Pika is based on RocksDB). +# # sst files are hierarchical, the smaller the sst file size, the higher the performance and the lower the merge cost, +# # the price is that the number of sst files could be huge. On the contrary, the bigger the sst file size, the lower +# # the performance and the higher the merge cost, while the number of files is fewer. +# # Supported Units [K|M|G], target-file-size-base default unit is in [bytes] and the default value is 20M. +# target-file-size-base : 20M + +# # Expire-time of binlog(write2file) files that stored within log-path. +# # Any binlog(write2file) files that exceed this expire time will be cleaned up. +# # The unit of expire-logs-days is in [days] and the default value is 7(days). +# # The [Minimum value] of this parameter is 1(day). +# expire-logs-days : 7 + +# # The maximum number of binlog(write2file) files. +# # Once the total number of binlog files exceed this value, +# # automatic cleaning will start to ensure the maximum number +# # of binlog files is equal to expire-logs-nums. +# # The [Minimum value] of this parameter is 10. +# expire-logs-nums : 10 + +# # The number of guaranteed connections for root user. +# # This parameter guarantees that there are 2(By default) connections available +# # for root user to log in Pika from 127.0.0.1, even if the maximum connection limit is reached. +# # PS: The maximum connection refers to the parameter above: maxclients. +# # The default value of root-connection-num is 2. +# root-connection-num : 2 + +# # Slowlog-write-errorlog +# slowlog-write-errorlog : no + +# # The time threshold for slow log recording. +# # Any command whose execution time exceeds this threshold will be recorded in pika-ERROR.log, +# # which is stored in log-path. +# # The unit of slowlog-log-slower-than is in [microseconds(μs)] and the default value is 10000 μs / 10 ms. +# slowlog-log-slower-than : 10000 + +# # Slowlog-max-len +# slowlog-max-len : 128 + +# # Pika db sync path +# db-sync-path : ./dbsync/ + +# # The maximum Transmission speed during full synchronization. +# # The exhaustion of network can be prevented by setting this parameter properly. +# # The value range of this parameter is [1,1024] with unit in [MB/s]. +# # [NOTICE] If this parameter is set to an invalid value(smaller than 0 or bigger than 1024), +# # it will be automatically reset to 1024. +# # The default value of db-sync-speed is -1 (1024MB/s). +# db-sync-speed : -1 + +# # The priority of slave node when electing new master node. +# # The slave node with [lower] value of slave-priority will have [higher priority] to be elected as the new master node. +# # This parameter is only used in conjunction with sentinel and serves no other purpose. +# # The default value of slave-priority is 100. +# slave-priority : 100 + +# # Specify network interface that work with Pika. +# #network-interface : eth1 + +# # The IP and port of the master node are specified by this parameter for +# # replication between master and slaves. +# # [Format] is "ip:port" , for example: "192.168.1.2:6666" indicates that +# # the slave instances that configured with this value will automatically send +# # SLAVEOF command to port 6666 of 192.168.1.2 after startup. +# # This parameter should be configured on slave nodes. +# #slaveof : master-ip:master-port + + +# # Daily/Weekly Automatic full compaction task is configured by compact-cron. +# # +# # [Format-daily]: start time(hour)-end time(hour)/disk-free-space-ratio, +# # example: with value of "02-04/60", Pika will perform full compaction task between 2:00-4:00 AM everyday if +# # the disk-free-size / disk-size > 60%. +# # +# # [Format-weekly]: week/start time(hour)-end time(hour)/disk-free-space-ratio, +# # example: with value of "3/02-04/60", Pika will perform full compaction task between 2:00-4:00 AM every Wednesday if +# # the disk-free-size / disk-size > 60%. +# # +# # [Tip] Automatic full compaction is suitable for scenarios with multiple data structures +# # and lots of items are expired or deleted, or key names are frequently reused. +# # +# # [NOTICE]: If compact-interval is set, compact-cron will be masked and disabled. +# # +# #compact-cron : 3/02-04/60 + + +# # Automatic full synchronization task between a time interval is configured by compact-interval. +# # [Format]: time interval(hour)/disk-free-space-ratio, example: "6/60", Pika will perform full compaction every 6 hours, +# # if the disk-free-size / disk-size > 60%. +# # [NOTICE]: compact-interval is prior than compact-cron. +# #compact-interval : + +# # The disable_auto_compactions option is [true | false] +# disable_auto_compactions : false + +# # Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task +# # it's recommended to increase it's value if large compaction is found in you instance +# max-subcompactions : 1 +# # The minimum disk usage ratio for checking resume. +# # If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. +# # Its default value is 0.7. +# #min-check-resume-ratio : 0.7 + +# # The minimum free disk space to trigger db resume. +# # If the db has a background error, only the free disk size is larger than this configuration can trigger manually resume db. +# # Its default value is 256MB. +# # [NOTICE]: least-free-disk-resume-size should not smaller than write-buffer-size! +# #least-free-disk-resume-size : 256M + +# # Manually trying to resume db interval is configured by manually-resume-interval. +# # If db has a background error, it will try to manually call resume() to resume db if satisfy the least free disk to resume. +# # Its default value is 60 seconds. +# #manually-resume-interval : 60 + +# # This window-size determines the amount of data that can be transmitted in a single synchronization process. +# # [Tip] In the scenario of high network latency. Increasing this size can improve synchronization efficiency. +# # Its default value is 9000. the [maximum] value is 90000. +# sync-window-size : 9000 + +# # Maximum buffer size of a client connection. +# # [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size. +# # Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB]. +# max-conn-rbuf-size : 268435456 + + +# #######################################################################E####### +# #! Critical Settings !# +# #######################################################################E####### + +# # write_binlog [yes | no] +# write-binlog : yes + +# # The size of binlog file, which can not be modified once Pika instance started. +# # [NOTICE] Master and slaves must have exactly the same value for the binlog-file-size. +# # The [value range] of binlog-file-size is [1K, 2G]. +# # Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M. +# binlog-file-size : 104857600 + +# # Automatically triggers a small compaction according to statistics +# # Use the cache to store up to 'max-cache-statistic-keys' keys +# # If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function +# # and this automatic small compaction feature is disabled. +# max-cache-statistic-keys : 0 + +# # When 'delete' or 'overwrite' a specific multi-data structure key 'small-compaction-threshold' times, +# # a small compact is triggered automatically if the small compaction feature is enabled. +# # small-compaction-threshold default value is 5000 and the value range is [1, 100000]. +# small-compaction-threshold : 5000 +# small-compaction-duration-threshold : 10000 + +# # The maximum total size of all live memtables of the RocksDB instance that owned by Pika. +# # Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB +# # exceeds max-write-buffer-size when next write operation is issued. +# # [RocksDB-Basic-Tuning](https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) +# # Supported Units [K|M|G], max-write-buffer-size default unit is in [bytes]. +# max-write-buffer-size : 10737418240 + +# # The maximum number of write buffers(memtables) that are built up in memory for one ColumnFamily in DB. +# # The default and the minimum number is 2. It means that Pika(RocksDB) will write to a write buffer +# # when it flushes the data of another write buffer to storage. +# # If max-write-buffer-num > 3, writing will be slowed down. +# max-write-buffer-num : 2 + +# # `min_write_buffer_number_to_merge` is the minimum number of memtables +# # that need to be merged before placing the order. For example, if the +# # option is set to 2, immutable memtables will only be flushed if there +# # are two of them - a single immutable memtable will never be flushed. +# # If multiple memtables are merged together, less data will be written +# # to storage because the two updates are merged into a single key. However, +# # each Get() must linearly traverse all unmodifiable memtables and check +# # whether the key exists. Setting this value too high may hurt performance. +# min-write-buffer-number-to-merge : 1 + +# # The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families +# # whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when +# # process restart. +# max-total-wal-size : 1073741824 + +# # rocksdb level0_stop_writes_trigger +# level0-stop-writes-trigger : 36 + +# # rocksdb level0_slowdown_writes_trigger +# level0-slowdown-writes-trigger : 20 + +# # rocksdb level0_file_num_compaction_trigger +# level0-file-num-compaction-trigger : 4 + +# # The maximum size of the response package to client to prevent memory +# # exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response. +# # Supported Units [K|M|G]. The default unit is in [bytes]. +# max-client-response-size : 1073741824 + +# # The compression algorithm. You can not change it when Pika started. +# # Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. +# # [NOTICE] The Pika official binary release just linking the snappy library statically, which means that +# # you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. +# compression : snappy + +# # if the vector size is smaller than the level number, the undefined lower level uses the +# # last option in the configurable array, for example, for 3 level +# # LSM tree the following settings are the same: +# # configurable array: [none:snappy] +# # LSM settings: [none:snappy:snappy] +# # When this configurable is enabled, compression is ignored, +# # default l0 l1 noCompression, l2 and more use `compression` option +# # https://github.com/facebook/rocksdb/wiki/Compression +# #compression_per_level : [none:none:snappy:lz4:lz4] + +# # The number of rocksdb background threads(sum of max-background-compactions and max-background-flushes) +# # If max-background-jobs has a valid value AND both 'max-background-flushs' and 'max-background-compactions' is set to -1, +# # then max-background-flushs' and 'max-background-compactions will be auto config by rocksdb, specifically: +# # 1/4 of max-background-jobs will be given to max-background-flushs' and the rest(3/4) will be given to 'max-background-compactions'. +# # 'max-background-jobs' default value is 3 and the value range is [2, 12]. +# max-background-jobs : 3 + +# # The number of background flushing threads. +# # max-background-flushes default value is -1 and the value range is [1, 4] or -1. +# # if 'max-background-flushes' is set to -1, the 'max-background-compactions' should also be set to -1, +# # which means let rocksdb to auto config them based on the value of 'max-background-jobs' +# max-background-flushes : -1 + +# # [NOTICE] you MUST NOT set one of the max-background-flushes or max-background-compactions to -1 while setting another one to other values(not -1). +# # They SHOULD both be -1 or both not(if you want to config them manually). + +# # The number of background compacting threads. +# # max-background-compactions default value is -1 and the value range is [1, 8] or -1. +# # if 'max-background-compactions' is set to -1, the 'max-background-flushes' should also be set to -1, +# # which means let rocksdb to auto config them based on the value of 'max-background-jobs' +# max-background-compactions : -1 + +# # RocksDB delayed-write-rate, default is 0(infer from rate-limiter by RocksDB) +# # Ref from rocksdb: Whenever stall conditions are triggered, RocksDB will reduce write rate to delayed_write_rate, +# # and could possibly reduce write rate to even lower than delayed_write_rate if estimated pending compaction bytes accumulates. +# # If the value is 0, RcoksDB will infer a value from `rater_limiter` value if it is not empty, or 16MB if `rater_limiter` is empty. +# # Note that if users change the rate in `rate_limiter` after DB is opened, delayed_write_rate won't be adjusted. +# # [Support Dynamically changeable] send 'config set delayed-write-rate' to a running pika can change it's value dynamically +# delayed-write-rate : 0 + + +# # RocksDB will try to limit number of bytes in one compaction to be lower than this max-compaction-bytes. +# # But it's NOT guaranteed. +# # default value is -1, means let it be 25 * target-file-size-base (Which is RocksDB's default value) +# max-compaction-bytes : -1 + + +# # maximum value of RocksDB cached open file descriptors +# max-cache-files : 5000 + +# # The ratio between the total size of RocksDB level-(L+1) files and the total size of RocksDB level-L files for all L. +# # Its default value is 10(x). You can also change it to 5(x). +# max-bytes-for-level-multiplier : 10 + +# # slotmigrate is mainly used to migrate slots, usually we will set it to no. +# # When you migrate slots, you need to set it to yes, and reload slotskeys before. +# # slotmigrate [yes | no] +# slotmigrate : no + +# # slotmigrate thread num +# slotmigrate-thread-num : 1 + +# # thread-migrate-keys-num 1/8 of the write_buffer_size_ +# thread-migrate-keys-num : 64 + +# # BlockBasedTable block_size, default 4k +# # block-size: 4096 + +# # block LRU cache, default 8M, 0 to disable +# # Supported Units [K|M|G], default unit [bytes] +# # block-cache: 8388608 + +# # num-shard-bits default -1, the number of bits from cache keys to be use as shard id. +# # The cache will be sharded into 2^num_shard_bits shards. +# # https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#lru-cache +# # num-shard-bits: -1 + +# # whether the block cache is shared among the RocksDB instances, default is per CF +# # share-block-cache: no + +# # The slot number of pika when used with codis. +# default-slot-num : 1024 + +# # enable-partitioned-index-filters [yes | no] +# # When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` +# # and `cache-index-and-filter-blocks` is suggested to be enabled +# # https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters +# # enable-partitioned-index-filters: default no + +# # whether or not index and filter blocks is stored in block cache +# # cache-index-and-filter-blocks: no + +# # pin_l0_filter_and_index_blocks_in_cache [yes | no] +# # When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` is suggested to be enabled +# # pin_l0_filter_and_index_blocks_in_cache : no + +# # when set to yes, bloomfilter of the last level will not be built +# # optimize-filters-for-hits: no +# # https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#levels-target-size +# # level-compaction-dynamic-level-bytes: no + +# ################################## RocksDB Rate Limiter ####################### +# # rocksdb rate limiter +# # https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html +# # https://github.com/EighteenZi/rocksdb_wiki/blob/master/Rate-Limiter.md +# #######################################################################E####### + +# # rate limiter mode +# # 0: Read 1: Write 2: ReadAndWrite +# # rate-limiter-mode : default 1 + +# # rate limiter bandwidth, units in bytes, default 1024GB/s (No limit) +# # [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically +# #rate-limiter-bandwidth : 1099511627776 + +# #rate-limiter-refill-period-us : 100000 +# # +# #rate-limiter-fairness: 10 + +# # if auto_tuned is true: Enables dynamic adjustment of rate limit within the range +# #`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O. +# # rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true. +# #rate-limiter-auto-tuned : true + +# ################################## RocksDB Blob Configure ##################### +# # rocksdb blob configure +# # https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html +# # wiki https://github.com/facebook/rocksdb/wiki/BlobDB +# #######################################################################E####### + +# # enable rocksdb blob, default no +# # enable-blob-files : yes + +# # values at or above this threshold will be written to blob files during flush or compaction. +# # Supported Units [K|M|G], default unit is in [bytes]. +# # min-blob-size : 4K + +# # the size limit for blob files +# # Supported Units [K|M|G], default unit is in [bytes]. +# # blob-file-size : 256M + +# # the compression type to use for blob files. All blobs in the same file are compressed using the same algorithm. +# # Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. +# # [NOTICE] The Pika official binary release just link the snappy library statically, which means that +# # you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. +# # blob-compression-type : lz4 + +# # set this to open to make BlobDB actively relocate valid blobs from the oldest blob files as they are encountered during compaction. +# # The value option is [yes | no] +# # enable-blob-garbage-collection : no + +# # the cutoff that the GC logic uses to determine which blob files should be considered “old“. +# # This parameter can be tuned to adjust the trade-off between write amplification and space amplification. +# # blob-garbage-collection-age-cutoff : 0.25 + +# # if the ratio of garbage in the oldest blob files exceeds this threshold, +# # targeted compactions are scheduled in order to force garbage collecting the blob files in question +# # blob_garbage_collection_force_threshold : 1.0 + +# # the Cache object to use for blobs, default not open +# # blob-cache : 0 + +# # blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id. +# # The cache will be sharded into 2^blob-num-shard-bits shards. +# # blob-num-shard-bits : -1 + +# # Rsync Rate limiting configuration [Default value is 200MB/s] +# # [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. +# # [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). +# throttle-bytes-per-second : 207200000 +# # Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. +# # [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command +# # [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. +# rsync-timeout-ms : 1000 +# # The valid range for max-rsync-parallel-num is [1, 4]. +# # If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4. +# max-rsync-parallel-num : 4 + +# # The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same +# # replication-id : + +# ################### +# ## Cache Settings +# ################### +# # the number of caches for every db +# cache-num : 16 + +# # cache-model 0:cache_none 1:cache_read +# cache-model : 1 +# # cache-type: string, set, zset, list, hash, bit +# cache-type: string, set, zset, list, hash, bit -# Pika port, the default value is 9221. -# [NOTICE] Port Magic offsets of port+1000 / port+2000 are used by Pika at present. -# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221. +# # Maximum number of keys in the zset redis cache +# # On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum +# # number of keys that can exist in a zset, which is zset-zset-cache-field-num-per-key, with a +# # default value of 512. +# zset-cache-field-num-per-key : 512 + +# # If the number of elements in a zset in the DB exceeds zset-cache-field-num-per-key, +# # we determine whether to cache the first 512[zset-cache-field-num-per-key] elements +# # or the last 512[zset-cache-field-num-per-key] elements in the zset based on zset-cache-start-direction. +# # +# # If zset-cache-start-direction is 0, cache the first 512[zset-cache-field-num-per-key] elements from the header +# # If zset-cache-start-direction is -1, cache the last 512[zset-cache-field-num-per-key] elements +# zset-cache-start-direction : 0 + + +# # the cache maxmemory of every db, configuration 10G +# cache-maxmemory : 10737418240 + +# # cache-maxmemory-policy +# # 0: volatile-lru -> Evict using approximated LRU among the keys with an expire set. +# # 1: allkeys-lru -> Evict any key using approximated LRU. +# # 2: volatile-lfu -> Evict using approximated LFU among the keys with an expire set. +# # 3: allkeys-lfu -> Evict any key using approximated LFU. +# # 4: volatile-random -> Remove a random key among the ones with an expire set. +# # 5: allkeys-random -> Remove a random key, any key. +# # 6: volatile-ttl -> Remove the key with the nearest expire time (minor TTL) +# # 7: noeviction -> Don't evict anything, just return an error on write operations. +# cache-maxmemory-policy : 1 + +# # cache-maxmemory-samples +# cache-maxmemory-samples: 5 + +# # cache-lfu-decay-time +# cache-lfu-decay-time: 1 + + +# # is possible to manage access to Pub/Sub channels with ACL rules as well. The +# # default Pub/Sub channels permission if new users is controlled by the +# # acl-pubsub-default configuration directive, which accepts one of these values: +# # +# # allchannels: grants access to all Pub/Sub channels +# # resetchannels: revokes access to all Pub/Sub channels +# # +# # acl-pubsub-default defaults to 'resetchannels' permission. +# # acl-pubsub-default : resetchannels + +# # ACL users are defined in the following format: +# # user : ... acl rules ... +# # +# # For example: +# # +# # user : worker on >password ~key* +@all + +# # Using an external ACL file +# # +# # Instead of configuring users here in this file, it is possible to use +# # a stand-alone file just listing users. The two methods cannot be mixed: +# # if you configure users here and at the same time you activate the external +# # ACL file, the server will refuse to start. +# # +# # The format of the external ACL user file is exactly the same as the +# # format that is used inside pika.conf to describe users. +# # +# # aclfile : ../conf/users.acl + +# # (experimental) +# # It is possible to change the name of dangerous commands in a shared environment. +# # For instance the CONFIG command may be renamed into something Warning: To prevent +# # data inconsistency caused by different configuration files, do not use the rename +# # command to modify write commands on the primary and secondary servers. If necessary, +# # ensure that the configuration files of the primary and secondary servers are consistent +# # In addition, when using the command rename, you must not use "" to modify the command, +# # for example, rename-command: FLUSHDB "360flushdb" is incorrect; instead, use +# # rename-command: FLUSHDB 360flushdb is correct. After the rename command is executed, +# # it is most appropriate to use a numeric string with uppercase or lowercase letters +# # for example: rename-command : FLUSHDB joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR +# # Warning: Currently only applies to flushdb, slaveof, bgsave, shutdown, config command +# # Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent +# # +# # Example: +# # rename-command : FLUSHDB 360flushdb + +# # [You can ignore this item] +# # This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage. +# # 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' +# # which serves for the scenario of codis-pika cluster reelection +# # You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] +# internal-used-unfinished-full-sync : + + + + + + +block_cache_size +# 360 dba pika conf pika3.5.2 port : 9221 - -db-instance-num : 3 -rocksdb-ttl-second : 86400 * 7; -rocksdb-periodic-second : 86400 * 3; - -# Random value identifying the Pika server, its string length must be 40. -# If not set, Pika will generate a random string with a length of 40 random characters. -# run-id : - -# Master's run-id -# master-run-id : - -# The number of threads for running Pika. -# It's not recommended to set this value exceeds -# the number of CPU cores on the deployment server. -thread-num : 1 - -# Size of the thread pool, The threads within this pool -# are dedicated to handling user requests. -thread-pool-size : 12 - -# This parameter is used to control whether to separate fast and slow commands. -# When slow-cmd-pool is set to yes, fast and slow commands are separated. -# When set to no, they are not separated. -slow-cmd-pool : no - -# Size of the low level thread pool, The threads within this pool -# are dedicated to handling slow user requests. -slow-cmd-thread-pool-size : 1 - -# Size of the low level thread pool, The threads within this pool -# are dedicated to handling slow user requests. -admin-thread-pool-size : 2 - -# Slow cmd list e.g. hgetall, mset -slow-cmd-list : - -# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed. -# Default commands: info, ping, monitor -# This parameter is only supported by the CONFIG GET command and not by CONFIG SET. -admin-cmd-list : info, ping, monitor - -# The number of threads to write DB in slaveNode when replicating. -# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. -sync-thread-num : 6 - -# The num of threads to write binlog in slaveNode when replicating, -# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum -#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog), -# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8 -# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases) -sync-binlog-thread-num : 1 - -# Directory to store log files of Pika, which contains multiple types of logs, -# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which -# is used for replication. +thread-num : 8 log-path : ./log/ - -# Directory to store the data of Pika. +loglevel : info db-path : ./db/ - -# The size of a single RocksDB memtable at the Pika's bottom layer(Pika use RocksDB to store persist data). -# [Tip] Big write-buffer-size can improve writing performance, -# but this will generate heavier IO load when flushing from buffer to disk, -# you should configure it based on you usage scenario. -# Supported Units [K|M|G], write-buffer-size default unit is in [bytes]. write-buffer-size : 256M - -# The size of one block in arena memory allocation. -# If <= 0, a proper value is automatically calculated. -# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB) -# Supported Units [K|M|G], arena-block-size default unit is in [bytes]. -arena-block-size : - -# Timeout of Pika's connection, counting down starts When there are no requests -# on a connection (it enters sleep state), when the countdown reaches 0, the connection -# will be closed by Pika. -# [Tip] The issue of running out of Pika's connections may be avoided if this value -# is configured properly. -# The Unit of timeout is in [seconds] and its default value is 60(s). -timeout : 60 - -# The [password of administrator], which is empty by default. -# [NOTICE] If this admin password is the same as user password (including both being empty), -# the value of userpass will be ignored and all users are considered as administrators, -# in this scenario, users are not subject to the restrictions imposed by the userblacklist. -# PS: "user password" refers to value of the parameter below: userpass. -requirepass : - -# Password for replication verify, used for authentication when a slave -# connects to a master to request replication. -# [NOTICE] The value of this parameter must match the "requirepass" setting on the master. -masterauth : - -# The [password of user], which is empty by default. -# [NOTICE] If this user password is the same as admin password (including both being empty), -# the value of this parameter will be ignored and all users are considered as administrators, -# in this scenario, users are not subject to the restrictions imposed by the userblacklist. -# PS: "admin password" refers to value of the parameter above: requirepass. -# userpass : - -# The blacklist of commands for users that logged in by userpass, -# the commands that added to this list will not be available for users except for administrator. -# [Advice] It's recommended to add high-risk commands to this list. -# [Format] Commands should be separated by ",". For example: FLUSHALL, SHUTDOWN, KEYS, CONFIG -# By default, this list is empty. -# userblacklist : - -# Running Mode of Pika, The current version only supports running in "classic mode". -# If set to 'classic', Pika will create multiple DBs whose number is the value of configure item "databases". -instance-mode : classic - -# The number of databases when Pika runs in classic mode. -# The default database id is DB 0. You can select a different one on -# a per-connection by using SELECT. The db id range is [0, 'databases' value -1]. -# The value range of this parameter is [1, 8]. -# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases), -# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper. -databases : 1 - -# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present. -# By default, this num is set to 0, which means this feature is [not enabled] -# and the Pika runs in standalone mode. -replication-num : 0 - -# consensus level defines the num of confirms(ACKs) the leader node needs to receive from -# follower nodes before returning the result to the client that sent the request. -# The [value range] of this parameter is: [0, ...replicaiton-num]. -# The default value of consensus-level is 0, which means this feature is not enabled. -consensus-level : 0 - -# The Prefix of dump file's name. -# All the files that generated by command "bgsave" will be name with this prefix. -dump-prefix : - -# daemonize [yes | no]. -#daemonize : yes - -# The directory to stored dump files that generated by command "bgsave". -dump-path : ./dump/ - -# TTL of dump files that generated by command "bgsave". -# Any dump files which exceed this TTL will be deleted. -# Unit of dump-expire is in [days] and the default value is 0(day), -# which means dump files never expire. -dump-expire : 0 - -# Pid file Path of Pika. -pidfile : ./pika.pid - -# The Maximum number of Pika's Connection. +timeout : 30 +#requirepass : 06154eee364854d5 +#masterauth : 06154eee364854d5 +#userpass : 06154eee364854d5360 +#userblacklist : bgsave,dumpoff,client +dump-prefix : pika- +dump-expire : 1 +pidfile : .pika.pid +daemonize : yes +dump-path : ./dump/block_cache_size maxclients : 20000 - -# The size of sst file in RocksDB(Pika is based on RocksDB). -# sst files are hierarchical, the smaller the sst file size, the higher the performance and the lower the merge cost, -# the price is that the number of sst files could be huge. On the contrary, the bigger the sst file size, the lower -# the performance and the higher the merge cost, while the number of files is fewer. -# Supported Units [K|M|G], target-file-size-base default unit is in [bytes] and the default value is 20M. -target-file-size-base : 20M - -# Expire-time of binlog(write2file) files that stored within log-path. -# Any binlog(write2file) files that exceed this expire time will be cleaned up. -# The unit of expire-logs-days is in [days] and the default value is 7(days). -# The [Minimum value] of this parameter is 1(day). +target-file-size-base : 20971520 expire-logs-days : 7 - -# The maximum number of binlog(write2file) files. -# Once the total number of binlog files exceed this value, -# automatic cleaning will start to ensure the maximum number -# of binlog files is equal to expire-logs-nums. -# The [Minimum value] of this parameter is 10. -expire-logs-nums : 10 - -# The number of guaranteed connections for root user. -# This parameter guarantees that there are 2(By default) connections available -# for root user to log in Pika from 127.0.0.1, even if the maximum connection limit is reached. -# PS: The maximum connection refers to the parameter above: maxclients. -# The default value of root-connection-num is 2. -root-connection-num : 2 - -# Slowlog-write-errorlog -slowlog-write-errorlog : no - -# The time threshold for slow log recording. -# Any command whose execution time exceeds this threshold will be recorded in pika-ERROR.log, -# which is stored in log-path. -# The unit of slowlog-log-slower-than is in [microseconds(μs)] and the default value is 10000 μs / 10 ms. -slowlog-log-slower-than : 10000 - -# Slowlog-max-len -slowlog-max-len : 128 - -# Pika db sync path -db-sync-path : ./dbsync/ - -# The maximum Transmission speed during full synchronization. -# The exhaustion of network can be prevented by setting this parameter properly. -# The value range of this parameter is [1,1024] with unit in [MB/s]. -# [NOTICE] If this parameter is set to an invalid value(smaller than 0 or bigger than 1024), -# it will be automatically reset to 1024. -# The default value of db-sync-speed is -1 (1024MB/s). -db-sync-speed : -1 - -# The priority of slave node when electing new master node. -# The slave node with [lower] value of slave-priority will have [higher priority] to be elected as the new master node. -# This parameter is only used in conjunction with sentinel and serves no other purpose. -# The default value of slave-priority is 100. -slave-priority : 100 - -# Specify network interface that work with Pika. -#network-interface : eth1 - -# The IP and port of the master node are specified by this parameter for -# replication between master and slaves. -# [Format] is "ip:port" , for example: "192.168.1.2:6666" indicates that -# the slave instances that configured with this value will automatically send -# SLAVEOF command to port 6666 of 192.168.1.2 after startup. -# This parameter should be configured on slave nodes. -#slaveof : master-ip:master-port - - -# Daily/Weekly Automatic full compaction task is configured by compact-cron. -# -# [Format-daily]: start time(hour)-end time(hour)/disk-free-space-ratio, -# example: with value of "02-04/60", Pika will perform full compaction task between 2:00-4:00 AM everyday if -# the disk-free-size / disk-size > 60%. -# -# [Format-weekly]: week/start time(hour)-end time(hour)/disk-free-space-ratio, -# example: with value of "3/02-04/60", Pika will perform full compaction task between 2:00-4:00 AM every Wednesday if -# the disk-free-size / disk-size > 60%. -# -# [Tip] Automatic full compaction is suitable for scenarios with multiple data structures -# and lots of items are expired or deleted, or key names are frequently reused. -# -# [NOTICE]: If compact-interval is set, compact-cron will be masked and disabled. -# -#compact-cron : 3/02-04/60 - - -# Automatic full synchronization task between a time interval is configured by compact-interval. -# [Format]: time interval(hour)/disk-free-space-ratio, example: "6/60", Pika will perform full compaction every 6 hours, -# if the disk-free-size / disk-size > 60%. -# [NOTICE]: compact-interval is prior than compact-cron. -#compact-interval : - -# The disable_auto_compactions option is [true | false] -disable_auto_compactions : false - -# Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task -# it's recommended to increase it's value if large compaction is found in you instance -max-subcompactions : 1 -# The minimum disk usage ratio for checking resume. -# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. -# Its default value is 0.7. -#min-check-resume-ratio : 0.7 - -# The minimum free disk space to trigger db resume. -# If the db has a background error, only the free disk size is larger than this configuration can trigger manually resume db. -# Its default value is 256MB. -# [NOTICE]: least-free-disk-resume-size should not smaller than write-buffer-size! -#least-free-disk-resume-size : 256M - -# Manually trying to resume db interval is configured by manually-resume-interval. -# If db has a background error, it will try to manually call resume() to resume db if satisfy the least free disk to resume. -# Its default value is 60 seconds. -#manually-resume-interval : 60 - -# This window-size determines the amount of data that can be transmitted in a single synchronization process. -# [Tip] In the scenario of high network latency. Increasing this size can improve synchronization efficiency. -# Its default value is 9000. the [maximum] value is 90000. -sync-window-size : 9000 - -# Maximum buffer size of a client connection. -# [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size. -# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB]. -max-conn-rbuf-size : 268435456 - - -#######################################################################E####### -#! Critical Settings !# -#######################################################################E####### - -# write_binlog [yes | no] -write-binlog : yes - -# The size of binlog file, which can not be modified once Pika instance started. -# [NOTICE] Master and slaves must have exactly the same value for the binlog-file-size. -# The [value range] of binlog-file-size is [1K, 2G]. -# Supported Units [K|M|G], binlog-file-size default unit is in [bytes] and the default value is 100M. +expire-logs-nums : 300 +root-connection-num : 10 +slowlog-log-slower-than : 100000 binlog-file-size : 104857600 - -# Automatically triggers a small compaction according to statistics -# Use the cache to store up to 'max-cache-statistic-keys' keys -# If 'max-cache-statistic-keys' set to '0', that means turn off the statistics function -# and this automatic small compaction feature is disabled. -max-cache-statistic-keys : 0 - -# When 'delete' or 'overwrite' a specific multi-data structure key 'small-compaction-threshold' times, -# a small compact is triggered automatically if the small compaction feature is enabled. -# small-compaction-threshold default value is 5000 and the value range is [1, 100000]. -small-compaction-threshold : 5000 -small-compaction-duration-threshold : 10000 - -# The maximum total size of all live memtables of the RocksDB instance that owned by Pika. -# Flushing from memtable to disk will be triggered if the actual memory usage of RocksDB -# exceeds max-write-buffer-size when next write operation is issued. -# [RocksDB-Basic-Tuning](https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning) -# Supported Units [K|M|G], max-write-buffer-size default unit is in [bytes]. -max-write-buffer-size : 10737418240 - -# The maximum number of write buffers(memtables) that are built up in memory for one ColumnFamily in DB. -# The default and the minimum number is 2. It means that Pika(RocksDB) will write to a write buffer -# when it flushes the data of another write buffer to storage. -# If max-write-buffer-num > 3, writing will be slowed down. -max-write-buffer-num : 2 - -# `min_write_buffer_number_to_merge` is the minimum number of memtables -# that need to be merged before placing the order. For example, if the -# option is set to 2, immutable memtables will only be flushed if there -# are two of them - a single immutable memtable will never be flushed. -# If multiple memtables are merged together, less data will be written -# to storage because the two updates are merged into a single key. However, -# each Get() must linearly traverse all unmodifiable memtables and check -# whether the key exists. Setting this value too high may hurt performance. -min-write-buffer-number-to-merge : 1 - -# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families -# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when -# process restart. -max-total-wal-size : 1073741824 - -# rocksdb level0_stop_writes_trigger -level0-stop-writes-trigger : 36 - -# rocksdb level0_slowdown_writes_trigger -level0-slowdown-writes-trigger : 20 - -# rocksdb level0_file_num_compaction_trigger -level0-file-num-compaction-trigger : 4 - -# The maximum size of the response package to client to prevent memory -# exhaustion caused by commands like 'keys *' and 'Scan' which can generate huge response. -# Supported Units [K|M|G]. The default unit is in [bytes]. -max-client-response-size : 1073741824 - -# The compression algorithm. You can not change it when Pika started. -# Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. -# [NOTICE] The Pika official binary release just linking the snappy library statically, which means that -# you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. compression : snappy - -# if the vector size is smaller than the level number, the undefined lower level uses the -# last option in the configurable array, for example, for 3 level -# LSM tree the following settings are the same: -# configurable array: [none:snappy] -# LSM settings: [none:snappy:snappy] -# When this configurable is enabled, compression is ignored, -# default l0 l1 noCompression, l2 and more use `compression` option -# https://github.com/facebook/rocksdb/wiki/Compression -#compression_per_level : [none:none:snappy:lz4:lz4] - -# The number of rocksdb background threads(sum of max-background-compactions and max-background-flushes) -# If max-background-jobs has a valid value AND both 'max-background-flushs' and 'max-background-compactions' is set to -1, -# then max-background-flushs' and 'max-background-compactions will be auto config by rocksdb, specifically: -# 1/4 of max-background-jobs will be given to max-background-flushs' and the rest(3/4) will be given to 'max-background-compactions'. -# 'max-background-jobs' default value is 3 and the value range is [2, 12]. -max-background-jobs : 3 - -# The number of background flushing threads. -# max-background-flushes default value is -1 and the value range is [1, 4] or -1. -# if 'max-background-flushes' is set to -1, the 'max-background-compactions' should also be set to -1, -# which means let rocksdb to auto config them based on the value of 'max-background-jobs' -max-background-flushes : -1 - -# [NOTICE] you MUST NOT set one of the max-background-flushes or max-background-compactions to -1 while setting another one to other values(not -1). -# They SHOULD both be -1 or both not(if you want to config them manually). - -# The number of background compacting threads. -# max-background-compactions default value is -1 and the value range is [1, 8] or -1. -# if 'max-background-compactions' is set to -1, the 'max-background-flushes' should also be set to -1, -# which means let rocksdb to auto config them based on the value of 'max-background-jobs' -max-background-compactions : -1 - -# RocksDB delayed-write-rate, default is 0(infer from rate-limiter by RocksDB) -# Ref from rocksdb: Whenever stall conditions are triggered, RocksDB will reduce write rate to delayed_write_rate, -# and could possibly reduce write rate to even lower than delayed_write_rate if estimated pending compaction bytes accumulates. -# If the value is 0, RcoksDB will infer a value from `rater_limiter` value if it is not empty, or 16MB if `rater_limiter` is empty. -# Note that if users change the rate in `rate_limiter` after DB is opened, delayed_write_rate won't be adjusted. -# [Support Dynamically changeable] send 'config set delayed-write-rate' to a running pika can change it's value dynamically -delayed-write-rate : 0 - - -# RocksDB will try to limit number of bytes in one compaction to be lower than this max-compaction-bytes. -# But it's NOT guaranteed. -# default value is -1, means let it be 25 * target-file-size-base (Which is RocksDB's default value) -max-compaction-bytes : -1 - - -# maximum value of RocksDB cached open file descriptors -max-cache-files : 5000 - -# The ratio between the total size of RocksDB level-(L+1) files and the total size of RocksDB level-L files for all L. -# Its default value is 10(x). You can also change it to 5(x). -max-bytes-for-level-multiplier : 10 - -# slotmigrate is mainly used to migrate slots, usually we will set it to no. -# When you migrate slots, you need to set it to yes, and reload slotskeys before. -# slotmigrate [yes | no] -slotmigrate : no - -# slotmigrate thread num -slotmigrate-thread-num : 1 - -# thread-migrate-keys-num 1/8 of the write_buffer_size_ -thread-migrate-keys-num : 64 - -# BlockBasedTable block_size, default 4k -# block-size: 4096 - -# block LRU cache, default 8M, 0 to disable -# Supported Units [K|M|G], default unit [bytes] -# block-cache: 8388608 - -# num-shard-bits default -1, the number of bits from cache keys to be use as shard id. -# The cache will be sharded into 2^num_shard_bits shards. -# https://github.com/EighteenZi/rocksdb_wiki/blob/master/Block-Cache.md#lru-cache -# num-shard-bits: -1 - -# whether the block cache is shared among the RocksDB instances, default is per CF -# share-block-cache: no - -# The slot number of pika when used with codis. +db-sync-path : ./dbsync +db-sync-speed : 60 +slowlog-write-errorlog : yes +small-compaction-threshold : 5000 +max-write-buffer-size : 20737418240 +max-cache-files : 8000 +replication-num : 0 +consensus-level : 0 +max-cache-statistic-keys : 0 +thread-pool-size : 50 +slowlog-write-errorlog : yes default-slot-num : 1024 - -# enable-partitioned-index-filters [yes | no] -# When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` -# and `cache-index-and-filter-blocks` is suggested to be enabled -# https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters -# enable-partitioned-index-filters: default no - -# whether or not index and filter blocks is stored in block cache -# cache-index-and-filter-blocks: no - -# pin_l0_filter_and_index_blocks_in_cache [yes | no] -# When `cache-index-and-filter-blocks` is enabled, `pin_l0_filter_and_index_blocks_in_cache` is suggested to be enabled -# pin_l0_filter_and_index_blocks_in_cache : no - -# when set to yes, bloomfilter of the last level will not be built -# optimize-filters-for-hits: no -# https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#levels-target-size -# level-compaction-dynamic-level-bytes: no - -################################## RocksDB Rate Limiter ####################### -# rocksdb rate limiter -# https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html -# https://github.com/EighteenZi/rocksdb_wiki/blob/master/Rate-Limiter.md -#######################################################################E####### - -# rate limiter mode -# 0: Read 1: Write 2: ReadAndWrite -# rate-limiter-mode : default 1 - -# rate limiter bandwidth, units in bytes, default 1024GB/s (No limit) -# [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically -#rate-limiter-bandwidth : 1099511627776 - -#rate-limiter-refill-period-us : 100000 -# -#rate-limiter-fairness: 10 - -# if auto_tuned is true: Enables dynamic adjustment of rate limit within the range -#`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O. -# rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true. -#rate-limiter-auto-tuned : true - -################################## RocksDB Blob Configure ##################### -# rocksdb blob configure -# https://rocksdb.org/blog/2021/05/26/integrated-blob-db.html -# wiki https://github.com/facebook/rocksdb/wiki/BlobDB -#######################################################################E####### - -# enable rocksdb blob, default no -# enable-blob-files : yes - -# values at or above this threshold will be written to blob files during flush or compaction. -# Supported Units [K|M|G], default unit is in [bytes]. -# min-blob-size : 4K - -# the size limit for blob files -# Supported Units [K|M|G], default unit is in [bytes]. -# blob-file-size : 256M - -# the compression type to use for blob files. All blobs in the same file are compressed using the same algorithm. -# Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none. -# [NOTICE] The Pika official binary release just link the snappy library statically, which means that -# you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself. -# blob-compression-type : lz4 - -# set this to open to make BlobDB actively relocate valid blobs from the oldest blob files as they are encountered during compaction. -# The value option is [yes | no] -# enable-blob-garbage-collection : no - -# the cutoff that the GC logic uses to determine which blob files should be considered “old“. -# This parameter can be tuned to adjust the trade-off between write amplification and space amplification. -# blob-garbage-collection-age-cutoff : 0.25 - -# if the ratio of garbage in the oldest blob files exceeds this threshold, -# targeted compactions are scheduled in order to force garbage collecting the blob files in question -# blob_garbage_collection_force_threshold : 1.0 - -# the Cache object to use for blobs, default not open -# blob-cache : 0 - -# blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id. -# The cache will be sharded into 2^blob-num-shard-bits shards. -# blob-num-shard-bits : -1 - -# Rsync Rate limiting configuration [Default value is 200MB/s] -# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. -# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). -throttle-bytes-per-second : 207200000 -# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. -# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command -# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. -rsync-timeout-ms : 1000 -# The valid range for max-rsync-parallel-num is [1, 4]. -# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4. -max-rsync-parallel-num : 4 - -# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same -# replication-id : - -################### -## Cache Settings -################### -# the number of caches for every db -cache-num : 16 - -# cache-model 0:cache_none 1:cache_read -cache-model : 1 -# cache-type: string, set, zset, list, hash, bit -cache-type: string, set, zset, list, hash, bit - -# Maximum number of keys in the zset redis cache -# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum -# number of keys that can exist in a zset, which is zset-zset-cache-field-num-per-key, with a -# default value of 512. +instance-mode : classic +databases : 1 +sync-thread-num : 1 +arena-block-size : 33554432 +max-background-jobs : 12 +max-background-flushes : 3 +max-background-compactions : 9 +rate-limiter-bandwidth : 1099511627776 +db-instance-num : 1 +block-size : 4096 +#block-cache : 5368709120 +block-cache : 4294967296 +max-subcompactions : 8 + +#cache-maxmemory : 5368709120 +cache-lfu-decay-time : 1 +cache-maxmemory-samples : 5 +cache-maxmemory-policy : 1 +cache-num : 8 +cache-model : 0 zset-cache-field-num-per-key : 512 - -# If the number of elements in a zset in the DB exceeds zset-cache-field-num-per-key, -# we determine whether to cache the first 512[zset-cache-field-num-per-key] elements -# or the last 512[zset-cache-field-num-per-key] elements in the zset based on zset-cache-start-direction. -# -# If zset-cache-start-direction is 0, cache the first 512[zset-cache-field-num-per-key] elements from the header -# If zset-cache-start-direction is -1, cache the last 512[zset-cache-field-num-per-key] elements zset-cache-start-direction : 0 +cache-type : +share-block-cache : yes +throttle-bytes-per-second : 102400000 +max-rsync-parallel-num : 4 +write-binlog : no +slotmigrate : no -# the cache maxmemory of every db, configuration 10G -cache-maxmemory : 10737418240 - -# cache-maxmemory-policy -# 0: volatile-lru -> Evict using approximated LRU among the keys with an expire set. -# 1: allkeys-lru -> Evict any key using approximated LRU. -# 2: volatile-lfu -> Evict using approximated LFU among the keys with an expire set. -# 3: allkeys-lfu -> Evict any key using approximated LFU. -# 4: volatile-random -> Remove a random key among the ones with an expire set. -# 5: allkeys-random -> Remove a random key, any key. -# 6: volatile-ttl -> Remove the key with the nearest expire time (minor TTL) -# 7: noeviction -> Don't evict anything, just return an error on write operations. -cache-maxmemory-policy : 1 - -# cache-maxmemory-samples -cache-maxmemory-samples: 5 - -# cache-lfu-decay-time -cache-lfu-decay-time: 1 - - -# is possible to manage access to Pub/Sub channels with ACL rules as well. The -# default Pub/Sub channels permission if new users is controlled by the -# acl-pubsub-default configuration directive, which accepts one of these values: -# -# allchannels: grants access to all Pub/Sub channels -# resetchannels: revokes access to all Pub/Sub channels -# -# acl-pubsub-default defaults to 'resetchannels' permission. -# acl-pubsub-default : resetchannels - -# ACL users are defined in the following format: -# user : ... acl rules ... -# -# For example: -# -# user : worker on >password ~key* +@all - -# Using an external ACL file -# -# Instead of configuring users here in this file, it is possible to use -# a stand-alone file just listing users. The two methods cannot be mixed: -# if you configure users here and at the same time you activate the external -# ACL file, the server will refuse to start. -# -# The format of the external ACL user file is exactly the same as the -# format that is used inside pika.conf to describe users. -# -# aclfile : ../conf/users.acl - -# (experimental) -# It is possible to change the name of dangerous commands in a shared environment. -# For instance the CONFIG command may be renamed into something Warning: To prevent -# data inconsistency caused by different configuration files, do not use the rename -# command to modify write commands on the primary and secondary servers. If necessary, -# ensure that the configuration files of the primary and secondary servers are consistent -# In addition, when using the command rename, you must not use "" to modify the command, -# for example, rename-command: FLUSHDB "360flushdb" is incorrect; instead, use -# rename-command: FLUSHDB 360flushdb is correct. After the rename command is executed, -# it is most appropriate to use a numeric string with uppercase or lowercase letters -# for example: rename-command : FLUSHDB joYAPNXRPmcarcR4ZDgC81TbdkSmLAzRPmcarcR -# Warning: Currently only applies to flushdb, slaveof, bgsave, shutdown, config command -# Warning: Ensure that the Settings of rename-command on the master and slave servers are consistent -# -# Example: -# rename-command : FLUSHDB 360flushdb - -# [You can ignore this item] -# This is NOT a regular conf item, it is a internal used metric that relies on pika.conf for persistent storage. -# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' -# which serves for the scenario of codis-pika cluster reelection -# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] -internal-used-unfinished-full-sync : \ No newline at end of file +# Pika automatic compact compact strategy, a complement to rocksdb compact. +# Trigger the compact background task periodically according to `compact-interval` +# Can choose `full-compact` or `obd-compact`. +# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255 +compaction-strategy : obd-compact + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +compact-every-num-of-files : 50 + +# For OBD_Compact +# In another search, if the file creation time is +# greater than `force-compact-file-age-seconds`, +# a compaction of the upper and lower boundaries +# of the file will be performed at the same time +# `compact-every-num-of-files` -1 +force-compact-file-age-seconds : 300 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +force-compact-min-delete-ratio : 10 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +dont-compact-sst-created-in-seconds : 20 + +# For OBD_Compact +# According to the number of sst files in rocksdb, +# compact every `compact-every-num-of-files` file. +best-delete-min-ratio : 40 \ No newline at end of file diff --git a/src/net/src/thread_pool.cc b/src/net/src/thread_pool.cc index c37f08df15..b7e4c988de 100644 --- a/src/net/src/thread_pool.cc +++ b/src/net/src/thread_pool.cc @@ -5,9 +5,11 @@ #include "net/include/thread_pool.h" #include "net/src/net_thread_name.h" +#include "pstd/include/env.h" #include +#include #include #include #include @@ -46,14 +48,15 @@ int ThreadPool::Worker::stop() { } ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name) - : nlinks_((worker_num + nworkers_per_link_ - 1) / nworkers_per_link_), + : nlinks_(nworkers_per_link_ > worker_num ? 1 : (worker_num + nworkers_per_link_ - 1) / nworkers_per_link_), // : nlinks_(worker_num), newest_node_(nlinks_), node_cnt_(0), time_newest_node_(nlinks_), time_node_cnt_(0), - queue_slow_size_(std::max(10UL, std::min(worker_num * max_queue_size / 100, max_queue_size))), - max_queue_size_(max_queue_size), + max_queue_size_(1000 * nlinks_), + queue_slow_size_(worker_num > 10 ? (100 * nlinks_) : std::max(worker_num, 3UL)), + // queue_slow_size_(std::max(10UL, std::min(worker_num * max_queue_size / 100, max_queue_size))), max_yield_usec_(100), slow_yield_usec_(3), adp_ctx(), @@ -74,7 +77,7 @@ ThreadPool::~ThreadPool() { stop_thread_pool(); } int ThreadPool::start_thread_pool() { if (!running_.load()) { should_stop_.store(false); - for (size_t i = 0; i < nlinks_; ++i) { + for (size_t i = 0; i < worker_num_; ++i) { workers_.push_back(new Worker(this, i)); int res = workers_[i]->start(); if (res != 0) { @@ -112,9 +115,11 @@ bool ThreadPool::should_stop() { return should_stop_.load(); } void ThreadPool::set_should_stop() { should_stop_.store(true); } void ThreadPool::Schedule(TaskFunc func, void* arg) { + node_cnt_++; // stop until the size of tasks queue is not greater than max_queue_size_ while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) { std::this_thread::yield(); + // pstd::SleepForMicroseconds(1); } // slow like above if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) { @@ -125,7 +130,6 @@ void ThreadPool::Schedule(TaskFunc func, void* arg) { auto node = new Node(func, arg); auto idx = ++task_idx_; LinkOne(node, &newest_node_[idx % nlinks_]); - node_cnt_++; rsignal_[idx % nlinks_].notify_one(); } } @@ -163,152 +167,37 @@ void ThreadPool::runInThread(const int idx) { Node* time_last = nullptr; auto& newest_node = newest_node_[idx % nlinks_]; - auto& time_newest_node = time_newest_node_[idx % nlinks_]; auto& mu = mu_[idx % nlinks_]; auto& rsignal = rsignal_[idx % nlinks_]; while (LIKELY(!should_stop())) { std::unique_lock lock(mu); - rsignal.wait(lock, [this, &newest_node, &time_newest_node]() { - return newest_node.load(std::memory_order_relaxed) != nullptr || - UNLIKELY(time_newest_node.load(std::memory_order_relaxed) != nullptr) || UNLIKELY(should_stop()); + rsignal.wait(lock, [this, &newest_node]() { + return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop()); }); lock.unlock(); - retry: if (UNLIKELY(should_stop())) { break; } + retry: last = newest_node.exchange(nullptr); - time_last = time_newest_node.exchange(nullptr); - if (last == nullptr && LIKELY(time_last == nullptr)) { - // 1. loop for short time - for (uint32_t tries = 0; tries < 200; ++tries) { - if (newest_node.load(std::memory_order_acquire) != nullptr) { - last = newest_node.exchange(nullptr); - if (last != nullptr) { - goto exec; - } - } - if (UNLIKELY(time_newest_node.load(std::memory_order_acquire) != nullptr)) { - time_last = time_newest_node.exchange(nullptr); - if (time_last != nullptr) { - goto exec; - } - } - AsmVolatilePause(); - } - - // 2. loop for a little short time again - const size_t kMaxSlowYieldsWhileSpinning = 3; - auto& yield_credit = adp_ctx.value; - bool update_ctx = false; - bool would_spin_again = false; - const int sampling_base = 256; - - update_ctx = Random::GetTLSInstance()->OneIn(sampling_base); - - if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) { - auto spin_begin = std::chrono::steady_clock::now(); - - size_t slow_yield_count = 0; - - auto iter_begin = spin_begin; - while ((iter_begin - spin_begin) <= std::chrono::microseconds(max_yield_usec_)) { - std::this_thread::yield(); - - if (newest_node.load(std::memory_order_acquire) != nullptr) { - last = newest_node.exchange(nullptr); - if (last != nullptr) { - would_spin_again = true; - // success - break; - } - } - if (UNLIKELY(time_newest_node.load(std::memory_order_acquire) != nullptr)) { - time_last = time_newest_node.exchange(nullptr); - if (time_last != nullptr) { - would_spin_again = true; - // success - break; - } - } - - auto now = std::chrono::steady_clock::now(); - if (now == iter_begin || now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) { - ++slow_yield_count; - if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) { - update_ctx = true; - break; - } - } - iter_begin = now; - } - } - - // update percentage of next loop 2 - if (update_ctx) { - auto v = yield_credit.load(std::memory_order_relaxed); - v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; - yield_credit.store(v, std::memory_order_relaxed); - } - - if (!would_spin_again) { - // 3. wait for new task - continue; - } + if (last == nullptr) { + continue; } - exec: // do all normal tasks older than this task pointed last - if (LIKELY(last != nullptr)) { - int cnt = 1; - auto first = CreateMissingNewerLinks(last, &cnt); - // node_cnt_ -= cnt; - assert(!first->is_time_task); - do { - first->Exec(); - tmp = first; - first = first->Next(); - node_cnt_--; - delete tmp; - } while (first != nullptr); - } - - // do all time tasks older than this task pointed time_last - if (UNLIKELY(time_last != nullptr)) { - int cnt = 1; - auto time_first = CreateMissingNewerLinks(time_last, &cnt); - do { - // time task may block normal task - auto now = std::chrono::system_clock::now(); - uint64_t unow = std::chrono::duration_cast(now.time_since_epoch()).count(); - - auto [exec_time, func, arg] = time_first->task; - assert(time_first->is_time_task); - if (unow >= exec_time) { - time_first->Exec(); - } else { - lock.lock(); - // if task is coming now, do task immediately - auto res = rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow), [this, &newest_node]() { - return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop()); - }); - lock.unlock(); - if (res) { - // re-push the timer tasks - ReDelaySchedule(time_first); - goto retry; - } - time_first->Exec(); - } - tmp = time_first; - time_first = time_first->Next(); - time_node_cnt_--; - delete tmp; - } while (time_first != nullptr); - } + int cnt = 1; + auto first = CreateMissingNewerLinks(last, &cnt); + assert(!first->is_time_task); + do { + first->Exec(); + tmp = first; + first = first->Next(); + node_cnt_--; + delete tmp; + } while (first != nullptr); goto retry; } } @@ -351,4 +240,4 @@ bool ThreadPool::LinkOne(Node* node, std::atomic* newest_node) { } } } -} // namespace net +} // namespace net \ No newline at end of file