-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadPool.hpp
168 lines (143 loc) · 4.74 KB
/
ThreadPool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <cassert>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
namespace dpool
{
class ThreadPool
{
public:
using MutexGuard = std::lock_guard<std::mutex>;
using UniqueLock = std::unique_lock<std::mutex>;
using Thread = std::thread;
using ThreadID = std::thread::id;
using Task = std::function<void()>;
ThreadPool()
: ThreadPool(Thread::hardware_concurrency())
{
}
explicit ThreadPool(size_t maxThreads)
: quit_(false),
currentThreads_(0),
idleThreads_(0),
maxThreads_(maxThreads)
{
}
// disable the copy operations
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
~ThreadPool()
{
{
MutexGuard guard(mutex_);
quit_ = true;
}
cv_.notify_all();
for (auto &elem : threads_)
{
assert(elem.second.joinable());
elem.second.join();
}
}
template <typename Func, typename... Ts>
auto submit(Func &&func, Ts &&...params)
-> std::future<typename std::result_of<Func(Ts...)>::type>
{
auto execute = std::bind(std::forward<Func>(func), std::forward<Ts>(params)...);
using ReturnType = typename std::result_of<Func(Ts...)>::type;
using PackagedTask = std::packaged_task<ReturnType()>;
auto task = std::make_shared<PackagedTask>(std::move(execute));
auto result = task->get_future();
MutexGuard guard(mutex_);
assert(!quit_);
tasks_.emplace([task]()
{ (*task)(); });
if (idleThreads_ > 0)
{
cv_.notify_one();
}
else if (currentThreads_ < maxThreads_)
{
Thread t(&ThreadPool::worker, this);
assert(threads_.find(t.get_id()) == threads_.end());
threads_[t.get_id()] = std::move(t);
++currentThreads_;
}
return result;
}
size_t threadsNum() const
{
MutexGuard guard(mutex_);
return currentThreads_;
}
private:
void worker()
{
while (true)
{
Task task;
{
UniqueLock uniqueLock(mutex_);
++idleThreads_;
auto hasTimedout = !cv_.wait_for(uniqueLock,
std::chrono::seconds(WAIT_SECONDS),
[this]()
{
return quit_ || !tasks_.empty();
});
--idleThreads_;
if (tasks_.empty())
{
if (quit_)
{
--currentThreads_;
return;
}
if (hasTimedout)
{
--currentThreads_;
joinFinishedThreads();
finishedThreadIDs_.emplace(std::this_thread::get_id());
return;
}
}
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
}
void joinFinishedThreads()
{
while (!finishedThreadIDs_.empty())
{
auto id = std::move(finishedThreadIDs_.front());
finishedThreadIDs_.pop();
auto iter = threads_.find(id);
assert(iter != threads_.end());
assert(iter->second.joinable());
iter->second.join();
threads_.erase(iter);
}
}
static constexpr size_t WAIT_SECONDS = 2;
bool quit_;
size_t currentThreads_;
size_t idleThreads_;
size_t maxThreads_;
mutable std::mutex mutex_;
std::condition_variable cv_;
std::queue<Task> tasks_;
std::queue<ThreadID> finishedThreadIDs_;
std::unordered_map<ThreadID, Thread> threads_;
};
constexpr size_t ThreadPool::WAIT_SECONDS;
} // namespace dpool
#endif /* THREADPOOL_H */