Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:change thread sheduling method in ThreadPool class #2648

Open
wants to merge 24 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8342c96
change thread sheduling method and the logic is based on rocksdb
QlQlqiqi May 12, 2024
79ca6c6
add Copyright and replace "#param once" with "#ifdef"
QlQlqiqi May 12, 2024
1bb167c
change the lisence Copyright start date
QlQlqiqi May 13, 2024
f9a15cd
add comment for the order between unlock and consumption
QlQlqiqi May 13, 2024
ce80eee
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 16, 2024
d89cd2e
fix bug
QlQlqiqi May 16, 2024
3778461
add some tips for failing to start codis
QlQlqiqi May 16, 2024
a583036
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 18, 2024
43bd1ab
fix bug: addtional introduced packages maybe cause core dump when sta…
QlQlqiqi May 18, 2024
14f59b3
fix bug: failed to start redis server
QlQlqiqi May 22, 2024
a107816
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi May 22, 2024
807a1c4
one worker thread, one list
QlQlqiqi May 23, 2024
0d1b00f
Merge branch 'OpenAtomFoundation:unstable' into unstable
QlQlqiqi Jun 7, 2024
7020519
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jun 16, 2024
c21fd6e
mutil threads per link and mutil links
QlQlqiqi Jun 19, 2024
cedbbca
Merge branch 'change-thread-shedule-with-mutil-list-per-worker' into …
QlQlqiqi Jun 26, 2024
9dab5c4
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jun 26, 2024
8694da0
fix: timer task maybe block worker thread
QlQlqiqi Jun 27, 2024
6e6b808
change default queue_slow_size_ and nworkers_per_link_
QlQlqiqi Jul 13, 2024
055a479
Merge branch 'change-thread-shedule' into unstable
QlQlqiqi Jul 13, 2024
3d9f002
Merge branch 'std' into change-thread-shedule
QlQlqiqi Jul 22, 2024
ec24c39
remove timer_task and add some default value
QlQlqiqi Jul 22, 2024
30e2a96
Merge branch 'change-thread-shedule' into unstable
QlQlqiqi Jul 22, 2024
8946098
Merge branch 'std' into unstable
QlQlqiqi Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/net/include/likely.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef LIKELY_H
#define LIKELY_H

#if defined(__GNUC__) && __GNUC__ >= 4
# define LIKELY(x) (__builtin_expect((x), 1))
# define UNLIKELY(x) (__builtin_expect((x), 0))
#else
# define LIKELY(x) (x)
# define UNLIKELY(x) (x)
#endif

#endif // LIKELY_H
2 changes: 0 additions & 2 deletions src/net/include/net_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
#ifndef NET_INCLUDE_NET_DEFINE_H_
#define NET_INCLUDE_NET_DEFINE_H_

#include <functional>
#include <iostream>
#include <map>

namespace net {

Expand Down
88 changes: 88 additions & 0 deletions src/net/include/random.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a license declaration at the start of the file.

As suggested in previous comments, a license declaration should be consistently present at the start of each file.

+ // Copyright (c) 2024-present, OpenAtom Foundation. All rights reserved.
+ // This source code is licensed under the BSD-style license found in the
+ // LICENSE file in the root directory of this source tree. An additional grant
+ // of patent rights can be found in the PATENTS file in the same directory.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.
// Copyright (c) 2024-present, OpenAtom Foundation. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// Copyright (c) 2024-present, Qihoo, Inc. All rights reserved.

// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RANDOM_H
#define RANDOM_H

#include <random>
#include <thread>

#include "net/include/likely.h"

namespace net {

class Random {
private:
enum : uint32_t {
M = 2147483647L // 2^31-1
};
enum : uint64_t {
A = 16807 // bits 14, 8, 7, 5, 2, 1, 0
};

uint32_t seed_;

static uint32_t GoodSeed(uint32_t s) { return (s & M) != 0 ? (s & M) : 1; }

public:
// This is the largest value that can be returned from Next()
enum : uint32_t { kMaxNext = M };

explicit Random(uint32_t s) : seed_(GoodSeed(s)) {}

void Reset(uint32_t s) { seed_ = GoodSeed(s); }

uint32_t Next() {
// We are computing
// seed_ = (seed_ * A) % M, where M = 2^31-1
//
// seed_ must not be zero or M, or else all subsequent computed values
// will be zero or M respectively. For all other values, seed_ will end
// up cycling through every number in [1,M-1]
uint64_t product = seed_ * A;

// Compute (product % M) using the fact that ((x << 31) % M) == x.
seed_ = static_cast<uint32_t>((product >> 31) + (product & M));
// The first reduction may overflow by 1 bit, so we may need to
// repeat. mod == M is not possible; using > allows the faster
// sign-bit-based test.
if (seed_ > M) {
seed_ -= M;
}
return seed_;
}

// Returns a uniformly distributed value in the range [0..n-1]
// REQUIRES: n > 0
uint32_t Uniform(int n) { return Next() % n; }

// Randomly returns true ~"1/n" of the time, and false otherwise.
// REQUIRES: n > 0
bool OneIn(int n) { return (Next() % n) == 0; }

// Skewed: pick "base" uniformly from range [0,max_log] and then
// return "base" random bits. The effect is to pick a number in the
// range [0,2^max_log-1] with exponential bias towards smaller numbers.
uint32_t Skewed(int max_log) { return Uniform(1 << Uniform(max_log + 1)); }

// Returns a Random instance for use by the current thread without
// additional locking
static Random* GetTLSInstance() {
static __thread Random* tls_instance;
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;

auto rv = tls_instance;
if (UNLIKELY(rv == nullptr)) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
rv = new (&tls_instance_bytes) Random((uint32_t)seed);
tls_instance = rv;
}
Comment on lines +73 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use smart pointers for thread-local storage.

To manage memory more safely and efficiently, consider using smart pointers instead of raw pointers for the thread-local instance of the Random class.

- static __thread Random* tls_instance;
- static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;
+ static thread_local std::unique_ptr<Random> tls_instance;

- auto rv = tls_instance;
- if (UNLIKELY(rv == nullptr)) {
-   size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
-   rv = new (&tls_instance_bytes) Random((uint32_t)seed);
-   tls_instance = rv;
- }
+ if (!tls_instance) {
+   size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
+   tls_instance = std::make_unique<Random>((uint32_t)seed);
+ }
+ return tls_instance.get();
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static __thread Random* tls_instance;
static __thread std::aligned_storage<sizeof(Random)>::type tls_instance_bytes;
auto rv = tls_instance;
if (UNLIKELY(rv == nullptr)) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
rv = new (&tls_instance_bytes) Random((uint32_t)seed);
tls_instance = rv;
}
static thread_local std::unique_ptr<Random> tls_instance;
if (!tls_instance) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
tls_instance = std::make_unique<Random>((uint32_t)seed);
}
return tls_instance.get();

return rv;
}
};

} // namespace net

#endif // RANDOM_H
92 changes: 73 additions & 19 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,16 @@

#include <pthread.h>
#include <atomic>
#include <queue>
#include <string>
#include <vector>

#include "net/include/net_define.h"
#include "net/include/random.h"
#include "pstd/include/pstd_mutex.h"

namespace net {

using TaskFunc = void (*)(void *);

struct Task {
Task() = default;
TaskFunc func = nullptr;
void* arg = nullptr;
Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {}
};
using TaskFunc = void (*)(void*);

struct TimeTask {
uint64_t exec_time;
Expand All @@ -37,7 +31,13 @@ class ThreadPool : public pstd::noncopyable {
public:
class Worker {
public:
explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp){};
struct Arg {
Arg(void* p, int i) : arg(p), idx(i) {}
void* arg;
int idx;
};

explicit Worker(ThreadPool* tp, int idx = 0) : start_(false), thread_pool_(tp), idx_(idx), arg_(tp, idx){};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure thread safety during worker initialization.

The initialization of Worker objects includes setting start_ to false, which is correct. However, consider using std::atomic for start_ to ensure thread safety during checks and updates.

static void* WorkerMain(void* arg);

int start();
Expand All @@ -48,9 +48,11 @@ class ThreadPool : public pstd::noncopyable {
std::atomic<bool> start_;
ThreadPool* const thread_pool_;
std::string worker_name_;
int idx_;
Arg arg_;
};

explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool constructor lacks explicit member initializations.

The constructor should explicitly initialize all its members to ensure that all objects are in a predictable state. This is particularly important for synchronization primitives like mutexes and condition variables.

- explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
+ explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool");
explicit ThreadPool(size_t worker_num, size_t max_queue_size, std::string thread_pool_name = "ThreadPool") : worker_num_(worker_num), max_queue_size_(max_queue_size), thread_pool_name_(std::move(thread_pool_name)), running_(false), should_stop_(false) {}

virtual ~ThreadPool();

int start_thread_pool();
Expand All @@ -67,21 +69,73 @@ class ThreadPool : public pstd::noncopyable {
std::string thread_pool_name();

private:
void runInThread();
void runInThread(const int idx = 0);

public:
struct AdaptationContext {
std::atomic<int32_t> value;

explicit AdaptationContext() : value(0) {}
};

private:
struct Node {
Node* link_older = nullptr;
Node* link_newer = nullptr;

size_t worker_num_;
// true if task is TimeTask
bool is_time_task;
TimeTask task;

Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {}
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {}

inline void Exec() { task.func(task.arg); }
inline Node* Next() { return link_newer; }
};
Comment on lines +82 to +95
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory management for Node objects needs attention.

The Node struct handles tasks but does not use smart pointers for managing dynamic memory, which can lead to memory leaks.

- Node* link_older = nullptr;
- Node* link_newer = nullptr;
+ std::unique_ptr<Node> link_older;
+ std::unique_ptr<Node> link_newer;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
struct Node {
Node* link_older = nullptr;
Node* link_newer = nullptr;
size_t worker_num_;
// true if task is TimeTask
bool is_time_task;
TimeTask task;
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {}
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {}
inline void Exec() { task.func(task.arg); }
inline Node* Next() { return link_newer; }
};
struct Node {
std::unique_ptr<Node> link_older;
std::unique_ptr<Node> link_newer;
// true if task is TimeTask
bool is_time_task;
TimeTask task;
Node(TaskFunc func, void* arg) : is_time_task(false), task(0, func, arg) {}
Node(uint64_t exec_time, TaskFunc func, void* arg) : is_time_task(true), task(exec_time, func, arg) {}
inline void Exec() { task.func(task.arg); }
inline Node* Next() { return link_newer.get(); }
};


// re-push some timer tasks which has been poped
void ReDelaySchedule(Node* nodes);

static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__)
asm volatile("pause");
#elif defined(__aarch64__)
asm volatile("wfe");
#elif defined(__powerpc64__)
asm volatile("or 27,27,27");
#endif
// it's okay for other platforms to be no-ops
}

Node* CreateMissingNewerLinks(Node* head, int* cnt);
bool LinkOne(Node* node, std::atomic<Node*>* newest_node);
Comment on lines +111 to +112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Link management methods need thorough testing.

The methods CreateMissingNewerLinks and LinkOne are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.

  • No unit tests found for CreateMissingNewerLinks.
  • No unit tests found for LinkOne.
Analysis chain

Link management methods need thorough testing.

The methods CreateMissingNewerLinks and LinkOne are critical for maintaining the integrity of the task list. Ensure these methods are covered by unit tests to prevent issues in task scheduling.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify unit tests for link management methods.

# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne`.
rg --type py 'test_CreateMissingNewerLinks|test_LinkOne'

Length of output: 58



Script:

#!/bin/bash
# Description: Verify unit tests for link management methods.

# Test: Search for tests related to `CreateMissingNewerLinks` and `LinkOne` in all test files.
rg 'CreateMissingNewerLinks|LinkOne' --type-add 'test:*.{cpp,h,py}' --type test

Length of output: 257


uint16_t task_idx_;

const uint8_t nworkers_per_link_ = 2; // numer of workers per link
const uint8_t nlinks_; // number of links (upper around)
std::vector<std::atomic<Node*>> newest_node_;
std::atomic<int> node_cnt_; // for task
std::vector<std::atomic<Node*>> time_newest_node_;
std::atomic<int> time_node_cnt_; // for time task

const int queue_slow_size_; // default value: min(worker_num_ * 10, max_queue_size_)
size_t max_queue_size_;

const uint64_t max_yield_usec_;
const uint64_t slow_yield_usec_;

AdaptationContext adp_ctx;

const size_t worker_num_;
std::string thread_pool_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
std::vector<Worker*> workers_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;

pstd::Mutex mu_;
pstd::CondVar rsignal_;
pstd::CondVar wsignal_;

std::vector<pstd::Mutex> mu_;
std::vector<pstd::CondVar> rsignal_;
};

} // namespace net
Expand Down
Loading
Loading