Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:change thread sheduling method in ThreadPool class (ospp 2024) #2648

Merged
merged 24 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8342c96
change thread sheduling method and the logic is based on rocksdb
QlQlqiqi May 12, 2024
79ca6c6
add Copyright and replace "#param once" with "#ifdef"
QlQlqiqi May 12, 2024
1bb167c
change the lisence Copyright start date
QlQlqiqi May 13, 2024
f9a15cd
add comment for the order between unlock and consumption
QlQlqiqi May 13, 2024
ce80eee
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 16, 2024
d89cd2e
fix bug
QlQlqiqi May 16, 2024
3778461
add some tips for failing to start codis
QlQlqiqi May 16, 2024
a583036
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 18, 2024
43bd1ab
fix bug: addtional introduced packages maybe cause core dump when sta…
QlQlqiqi May 18, 2024
14f59b3
fix bug: failed to start redis server
QlQlqiqi May 22, 2024
a107816
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 22, 2024
807a1c4
one worker thread, one list
QlQlqiqi May 23, 2024
0d1b00f
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi Jun 7, 2024
7020519
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jun 16, 2024
c21fd6e
mutil threads per link and mutil links
QlQlqiqi Jun 19, 2024
cedbbca
Merge branch 'change-thread-shedule-with-mutil-list-per-worker' into …
QlQlqiqi Jun 26, 2024
9dab5c4
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jun 26, 2024
8694da0
fix: timer task maybe block worker thread
QlQlqiqi Jun 27, 2024
6e6b808
change default queue_slow_size_ and nworkers_per_link_
QlQlqiqi Jul 13, 2024
055a479
Merge branch 'change-thread-shedule' into unstable
QlQlqiqi Jul 13, 2024
3d9f002
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jul 22, 2024
ec24c39
remove timer_task and add some default value
QlQlqiqi Jul 22, 2024
30e2a96
Merge branch 'change-thread-shedule' into unstable
QlQlqiqi Jul 22, 2024
8946098
Merge branch 'std' into unstable
QlQlqiqi Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/net/include/likely.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#ifndef PORT_LIKELY_H_
QlQlqiqi marked this conversation as resolved.
Show resolved Hide resolved
#define PORT_LIKELY_H_

#if defined(__GNUC__) && __GNUC__ >= 4
#define LIKELY(x) (__builtin_expect((x), 1))
#define UNLIKELY(x) (__builtin_expect((x), 0))
#else
#define LIKELY(x) (x)
#define UNLIKELY(x) (x)
#endif

#endif // PORT_LIKELY_H_
78 changes: 78 additions & 0 deletions src/net/include/random.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once
QlQlqiqi marked this conversation as resolved.
Show resolved Hide resolved
#include <stdint.h>
#include <string.h>
#include <random>
#include <thread>
#include <utility>

#include "net/include/likely.h"

class Random {
private:
enum : uint32_t {
M = 2147483647L // 2^31-1
};
enum : uint64_t {
A = 16807 // bits 14, 8, 7, 5, 2, 1, 0
};

uint32_t seed_;

static uint32_t GoodSeed(uint32_t s) { return (s & M) != 0 ? (s & M) : 1; }

public:
// This is the largest value that can be returned from Next()
enum : uint32_t { kMaxNext = M };

explicit Random(uint32_t s) : seed_(GoodSeed(s)) {}

void Reset(uint32_t s) { seed_ = GoodSeed(s); }

uint32_t Next() {
// We are computing
// seed_ = (seed_ * A) % M, where M = 2^31-1
//
// seed_ must not be zero or M, or else all subsequent computed values
// will be zero or M respectively. For all other values, seed_ will end
// up cycling through every number in [1,M-1]
uint64_t product = seed_ * A;

// Compute (product % M) using the fact that ((x << 31) % M) == x.
seed_ = static_cast<uint32_t>((product >> 31) + (product & M));
// The first reduction may overflow by 1 bit, so we may need to
// repeat. mod == M is not possible; using > allows the faster
// sign-bit-based test.
if (seed_ > M) {
seed_ -= M;
}
return seed_;
}

// Returns a uniformly distributed value in the range [0..n-1]
// REQUIRES: n > 0
uint32_t Uniform(int n) { return Next() % n; }

// Randomly returns true ~"1/n" of the time, and false otherwise.
// REQUIRES: n > 0
bool OneIn(int n) { return (Next() % n) == 0; }

// Skewed: pick "base" uniformly from range [0,max_log] and then
// return "base" random bits. The effect is to pick a number in the
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); }

// Returns a Random instance for use by the current thread without
// additional locking
static Random* GetTLSInstance() {
static __thread Random* tls_instance;
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;

auto rv = tls_instance;
if (UNLIKELY(rv == nullptr)) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
rv = new (&tls_instance_bytes) Random((uint32_t)seed);
tls_instance = rv;
}
Comment on lines +73 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use smart pointers for thread-local storage.

To manage memory more safely and efficiently, consider using smart pointers instead of raw pointers for the thread-local instance of the Random class.

- static __thread Random* tls_instance;
- static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;
+ static thread_local std::unique_ptr<Random> tls_instance;

- auto rv = tls_instance;
- if (UNLIKELY(rv == nullptr)) {
-   size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
-   rv = new (&tls_instance_bytes) Random((uint32_t)seed);
-   tls_instance = rv;
- }
+ if (!tls_instance) {
+   size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
+   tls_instance = std::make_unique<Random>((uint32_t)seed);
+ }
+ return tls_instance.get();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static __thread Random* tls_instance;
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;
auto rv = tls_instance;
if (UNLIKELY(rv == nullptr)) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
rv = new (&tls_instance_bytes) Random((uint32_t)seed);
tls_instance = rv;
}
static thread_local std::unique_ptr<Random> tls_instance;
if (!tls_instance) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
tls_instance = std::make_unique<Random>((uint32_t)seed);
}
return tls_instance.get();

return rv;
}
};
51 changes: 47 additions & 4 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
#include <string>

#include "net/include/net_define.h"
#include "net/include/random.h"
#include "pstd/include/pstd_mutex.h"

namespace net {

using TaskFunc = void (*)(void *);
using TaskFunc = void (*)(void*);

struct Task {
Task() = default;
Expand Down Expand Up @@ -50,7 +51,7 @@ class ThreadPool : public pstd::noncopyable {
std::string worker_name_;
};

explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool constructor lacks explicit member initializations.

The constructor should explicitly initialize all its members to ensure that all objects are in a predictable state. This is particularly important for synchronization primitives like mutexes and condition variables.

- explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
+ explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {}

virtual ~ThreadPool();

int start_thread_pool();
Expand All @@ -69,8 +70,51 @@ class ThreadPool : public pstd::noncopyable {
private:
void runInThread();

size_t worker_num_;
public:
struct AdaptationContext {
std::atomic<int32_t> value;

explicit AdaptationContext() : value(0) {}
};

private:
struct Node {
Node* link_older = nullptr;
Node* link_newer = nullptr;

Task task;

Node(TaskFunc func, void* arg) : task(func, arg) {}

inline void Exec() { task.func(task.arg); }
inline Node* Next() { return link_newer; }
};

static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__)
asm volatile("pause");
#elif defined(__aarch64__)
asm volatile("wfe");
#elif defined(__powerpc64__)
asm volatile("or 27,27,27");
#endif
// it's okay for other platforms to be no-ops
}

Node* CreateMissingNewerLinks(Node* head, int* cnt);
bool LinkOne(Node* node, std::atomic<Node*>* newest_node);
Comment on lines +111 to +112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Link management methods need thorough testing.

The methods CreateMissingNewerLinks and LinkOne are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.

  • No unit tests found for CreateMissingNewerLinks.
  • No unit tests found for LinkOne.
Analysis chain

Link management methods need thorough testing.

The methods CreateMissingNewerLinks and LinkOne are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify unit tests for link management methods.

# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne`.
rg --type py 'test_CreateMissingNewerLinks|test_LinkOne'

Length of output: 58



Script:

#!/bin/bash
# Description: Verify unit tests for link management methods.

# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne` in all test files.
rg 'CreateMissingNewerLinks|LinkOne' --type-add 'test:*.{cpp,h,py}' --type test

Length of output: 257


std::atomic<Node*> newest_node_;
std::atomic<int> node_cnt_;
const int queue_slow_size_; // default value: worker_num_ * 100
size_t max_queue_size_;

const uint64_t max_yield_usec_;
const uint64_t slow_yield_usec_;

AdaptationContext adp_ctx;

size_t worker_num_;
std::string thread_pool_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
Expand All @@ -81,7 +125,6 @@ class ThreadPool : public pstd::noncopyable {
pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;

};

} // namespace net
Expand Down
159 changes: 127 additions & 32 deletions src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <sys/time.h>

#include <cassert>
#include <thread>
#include <utility>

namespace net {
Expand Down Expand Up @@ -41,8 +43,14 @@ int ThreadPool::Worker::stop() {
return 0;
}

ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: worker_num_(worker_num),
ThreadPool::ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name)
: newest_node_(nullptr),
node_cnt_(0),
queue_slow_size_(worker_num * 50),
max_yield_usec_(100),
slow_yield_usec_(3),
adp_ctx(),
worker_num_(worker_num),
max_queue_size_(max_queue_size),
thread_pool_name_(std::move(thread_pool_name)),
running_(false),
Expand Down Expand Up @@ -90,13 +98,18 @@ bool ThreadPool::should_stop() { return should_stop_.load(); }
void ThreadPool::set_should_stop() { should_stop_.store(true); }

void ThreadPool::Schedule(TaskFunc func, void* arg) {
std::unique_lock lock(mu_);
wsignal_.wait(lock, [this]() { return queue_.size() < max_queue_size_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(func, arg);
rsignal_.notify_one();
// stop until the size of tasks queue is not greater than max_queue_size_
while (node_cnt_.load(std::memory_order_relaxed) >= max_queue_size_) {
std::this_thread::yield();
}
// slow like above
if (node_cnt_.load(std::memory_order_relaxed) >= queue_slow_size_) {
std::this_thread::yield();
}
auto node = new Node(func, arg);
LinkOne(node, &newest_node_);
node_cnt_++;
rsignal_.notify_one();
}

/*
Expand All @@ -109,17 +122,16 @@ void ThreadPool::DelaySchedule(uint64_t timeout, TaskFunc func, void* arg) {

std::lock_guard lock(mu_);
if (!should_stop()) {
// now the member time_queue_ maybe NOT be used
throw std::logic_error("unreachable logic");
time_queue_.emplace(exec_time, func, arg);
rsignal_.notify_all();
}
}

size_t ThreadPool::max_queue_size() { return max_queue_size_; }

void ThreadPool::cur_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
*qsize = queue_.size();
}
void ThreadPool::cur_queue_size(size_t* qsize) { *qsize = node_cnt_.load(std::memory_order_relaxed); }

void ThreadPool::cur_time_queue_size(size_t* qsize) {
std::lock_guard lock(mu_);
Expand All @@ -131,34 +143,117 @@ std::string ThreadPool::thread_pool_name() { return thread_pool_name_; }
void ThreadPool::runInThread() {
while (!should_stop()) {
std::unique_lock lock(mu_);
rsignal_.wait(lock, [this]() { return !queue_.empty() || !time_queue_.empty() || should_stop(); });
rsignal_.wait(lock, [this]() { return newest_node_.load(std::memory_order_relaxed) != nullptr || should_stop(); });
lock.unlock();

retry:
if (should_stop()) {
break;
}
if (!time_queue_.empty()) {
auto now = std::chrono::system_clock::now();
uint64_t unow = std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();

auto [exec_time, func, arg] = time_queue_.top();
if (unow >= exec_time) {
time_queue_.pop();
lock.unlock();
(*func)(arg);
continue;
} else if (queue_.empty() && !should_stop()) {
rsignal_.wait_for(lock, std::chrono::microseconds(exec_time - unow));
lock.unlock();

auto last = newest_node_.exchange(nullptr);
if (last == nullptr) {
// 1. loop for short time
for (uint32_t tries = 0; tries < 200; ++tries) {
if (newest_node_.load(std::memory_order_acquire) != nullptr) {
last = newest_node_.exchange(nullptr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里先到的线程直接摘了整个链表,据为己有,在去线性消费,可能会导致延迟波动大,建议尽量将任务均匀分给线程池里的worker。毕竟Pika读写链路上都是自己的线程,和rocksdb的线程模型差异比较大(Rocksdb这块都是application线程在对每个writer并发),这一块可能得多一些考量。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我想了一下,一次只取一定数量的 task 大概有两种办法:
1、一个 worker 对应一个无锁链表,然后新的 task 就随机或者遍历地往这些链表中加;
2、依旧使用一个无锁链表,但是无锁链表的容量较低,比如为 10 个这样的,这样保证一个 worker 一次最多取 10 个。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

第二个方法直接测试就行,第一个方法见我新的分支:https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我这里测的结果是这两个方法速度不相上下,当然如果调参合适的话应该会有较大的差距

if (last != nullptr) {
goto exec;
}
}
AsmVolatilePause();
}

// 2. loop for a little short time again
const size_t kMaxSlowYieldsWhileSpinning = 3;
auto& yield_credit = adp_ctx.value;
bool update_ctx = false;
bool would_spin_again = false;
const int sampling_base = 256;

update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);

if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
auto spin_begin = std::chrono::steady_clock::now();

size_t slow_yield_count = 0;

auto iter_begin = spin_begin;
while ((iter_begin - spin_begin) <= std::chrono::microseconds(max_yield_usec_)) {
std::this_thread::yield();

if (newest_node_.load(std::memory_order_acquire) != nullptr) {
last = newest_node_.exchange(nullptr);
if (last != nullptr) {
would_spin_again = true;
// success
break;
}
}

auto now = std::chrono::steady_clock::now();
if (now == iter_begin || now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
++slow_yield_count;
if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
update_ctx = true;
break;
}
}
iter_begin = now;
}
}

// update percentage of next loop 2
if (update_ctx) {
auto v = yield_credit.load(std::memory_order_relaxed);
v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072;
yield_credit.store(v, std::memory_order_relaxed);
}

if (!would_spin_again) {
// 3. wait for new task
continue;
}
}

if (!queue_.empty()) {
auto [func, arg] = queue_.front();
queue_.pop();
wsignal_.notify_one();
lock.unlock();
(*func)(arg);
exec:
// do all tasks older than this task pointed last
int cnt = 0;
auto first = CreateMissingNewerLinks(last, &cnt);
node_cnt_ -= cnt;
Node* tmp = nullptr;
do {
first->Exec();
tmp = first;
first = first->Next();
delete tmp;
} while (first != nullptr);
goto retry;
}
}

ThreadPool::Node* ThreadPool::CreateMissingNewerLinks(Node* head, int* cnt) {
assert(head != nullptr);
Node* next = nullptr;
cnt++;
while (true) {
next = head->link_older;
if (next == nullptr) {
return head;
}
cnt++;
next->link_newer = head;
head = next;
}
}

bool ThreadPool::LinkOne(Node* node, std::atomic<Node*>* newest_node) {
assert(newest_node != nullptr);
auto nodes = newest_node->load(std::memory_order_relaxed);
while (true) {
node->link_older = nodes;
if (newest_node->compare_exchange_weak(nodes, node)) {
return (nodes == nullptr);
}
}
}
Expand Down
Loading