Skip to content

Commit

Permalink
ThreadPool: Spend less time busy waiting. (microsoft#21545)
Browse files Browse the repository at this point in the history
The purpose of the patch is primarily to save power, but it also has
nice perf benefits (mostly from allowing the system to better distribute
power to cores doing meaningful work).

Changes are twofold:

1)    Decrease WorkerLoop spin count dramatically ~10^6 -> ~10^4. The
       reality is after ~10^4 spins, if there hasn't been any new work
       added its unlikely any new work is imminent so sleep to
       preserve power. This aligns more closely with upstream EigenV3.

2)   Use exponential backoff for waiting on memory. This saves a bit
       more power, and important increases the time between iterations
       in WorkerLoop to help accomidate the dramatically lowering spin
       counts.

Since the tuning for both the iteration counts / backoff counts are
dramatically different for hybrid/non-hybrid systems, this patch
templates the affected functions and dynamically choses based on
`CPUIDInfo::IsHybrid()`. This seemed like the "lightest weight" way of
getting the change in, although its likely we could incur less dynamic
overhead if we added the template argument to the entirety of
`ThreadPoolTempl`.

Measured performance on an [Intel Meteor Lake
CPU](https://www.intel.com/content/www/us/en/products/sku/237329/intel-core-ultra-7-processor-165u-12m-cache-up-to-4-90-ghz/specifications.html)
across a range of models.

Below are the result of 3 runs with each metric being the
value-before-patch / value-after-patch (so for something like inference
time, lower is better).
<div align="center">
<table>
<tr>
<th>Session creation time cost</th>
<td>0.7179</td>
</tr>
<tr>
<th>First inference time cost</th>
<td>0.7156</td>
</tr>
<tr>
<th>Total inference time cost</th>
<td>1.0146</td>
</tr>
<tr>
<th>Total inference requests</th>
<td>0.8874</td>
</tr>
<tr>
<th>Average inference time cost</th>
<td>0.8800</td>
</tr>
<tr>
<th>Total inference run time</th>
<td>1.0146</td>
</tr>
<tr>
<th>Number of inferences per second</th>
<td>0.8955</td>
</tr>
<tr>
<th>Avg CPU usage</th>
<td>0.9462</td>
</tr>
<tr>
<th>Peak working set size</th>
<td>0.9922</td>
</tr>
<tr>
<th>Runs</th>
<td>1.1552</td>
</tr>
<tr>
<th>Min Latency</th>
<td>0.7283</td>
</tr>
<tr>
<th>Max Latency</th>
<td>0.9258</td>
</tr>
<tr>
<th>P50 Latency</th>
<td>0.9534</td>
</tr>
<tr>
<th>P90 Latency</th>
<td>0.9639</td>
</tr>
<tr>
<th>P95 Latency</th>
<td>0.9659</td>
</tr>
<tr>
<th>P99 Latency</th>
<td>0.9640</td>
</tr>
</table>
</div>

So the net result is a 1.16x improvement in throughput and between
1.08-1.37x improvement in latency.
  • Loading branch information
goldsteinn authored Oct 2, 2024
1 parent 14d1bfc commit 4e15b22
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 25 deletions.
65 changes: 55 additions & 10 deletions include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ class RunQueue {

static std::atomic<uint32_t> next_tag{1};

template <typename Environment>
template <typename Environment, bool kIsHybrid>
class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInterface {
private:
struct PerThread;
Expand Down Expand Up @@ -767,6 +767,29 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
typedef std::function<void()> Task;
typedef RunQueue<Task, Tag, 1024> Queue;

// Class for waiting w/ exponential backoff.
// Template argument is maximum number of spins in backoff loop.
template <unsigned kMaxBackoff>
class ThreadPoolWaiter {
// Current number if spins in backoff loop
unsigned pause_time_;

public:
void wait() {
// If kMaxBackoff is zero don't do any pausing.
if constexpr (kMaxBackoff == 1) {
onnxruntime::concurrency::SpinPause();
} else if constexpr (kMaxBackoff > 1) {
// Exponential backoff
unsigned pause_time = pause_time_ + 1U;
for (unsigned i = 0; i < pause_time; ++i) {
onnxruntime::concurrency::SpinPause();
}
pause_time_ = (pause_time * 2U) % kMaxBackoff;
}
}
};

ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env,
const ThreadOptions& thread_options)
: profiler_(num_threads, name),
Expand Down Expand Up @@ -908,8 +931,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// finish dispatch work. This avoids new tasks being started
// concurrently with us attempting to end the parallel section.
if (ps.dispatch_q_idx != -1) {
ThreadPoolWaiter<4> waiter{};
while (!ps.dispatch_done.load(std::memory_order_acquire)) {
onnxruntime::concurrency::SpinPause();
waiter.wait();
}
}

Expand All @@ -931,15 +955,17 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter

// Wait for the dispatch task's own work...
if (ps.dispatch_q_idx > -1) {
ThreadPoolWaiter<kIsHybrid ? 0 : 1> waiter{};
while (!ps.work_done.load(std::memory_order_acquire)) {
onnxruntime::concurrency::SpinPause();
waiter.wait();
}
}

// ...and wait for any other tasks not revoked to finish their work
auto tasks_to_wait_for = tasks_started - ps.tasks_revoked;
ThreadPoolWaiter<kIsHybrid ? 0 : 1> waiter{};
while (ps.tasks_finished < tasks_to_wait_for) {
onnxruntime::concurrency::SpinPause();
waiter.wait();
}

// Clear status to allow the ThreadPoolParallelSection to be
Expand Down Expand Up @@ -1257,9 +1283,10 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// Increase the worker count if needed. Each worker will pick up
// loops to execute from the current parallel section.
std::function<void(unsigned)> worker_fn = [&ps](unsigned par_idx) {
ThreadPoolWaiter<kIsHybrid ? 4 : 0> waiter{};
while (ps.active) {
if (ps.current_loop.load() == nullptr) {
onnxruntime::concurrency::SpinPause();
waiter.wait();
} else {
ps.workers_in_loop++;
ThreadPoolLoop* work_item = ps.current_loop;
Expand All @@ -1280,8 +1307,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter

// Wait for workers to exit the loop
ps.current_loop = 0;
ThreadPoolWaiter<kIsHybrid ? 1 : 4> waiter{};
while (ps.workers_in_loop) {
onnxruntime::concurrency::SpinPause();
waiter.wait();
}
profiler_.LogEnd(ThreadPoolProfiler::WAIT);
}
Expand Down Expand Up @@ -1532,13 +1560,30 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter

assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);

constexpr int log2_spin = 20;
const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
const int steal_count = spin_count / 100;
// The exact value of spin_count and steal_count are arbitrary and
// were experimentally determined. These numbers yielded the best
// performance across a range of workloads and
// machines. Generally, the goal of tuning spin_count is to make
// the number as small as possible while ensuring there is enough
// slack so that if each core is doing the same amount of work it
// won't sleep before they have all finished. The idea here is
// that in pipelined workloads, it won't sleep during each stage
// if it's done a bit faster than its neighbors, but that if there
// are non-equal sizes of work distributed, it won't take too long
// to reach sleep giving power (and thus frequency/performance) to
// its neighbors. Since hybrid has P/E cores, a lower value is
// chosen. On hybrid systems, even with equal sized workloads
// distributed the compute time won't stay synced. Typically in
// the hybrid case the P cores finish first (and are thus waiting)
// which is essentially a priority inversion.
constexpr int pref_spin_count = kIsHybrid ? 5000 : 10000;
const int spin_count = allow_spinning_ ? pref_spin_count : 0;
constexpr int steal_count = pref_spin_count / (kIsHybrid ? 25 : 100);

SetDenormalAsZero(set_denormal_as_zero_);
profiler_.LogThreadId(thread_id);

ThreadPoolWaiter<kIsHybrid ? 1 : 8> waiter{};
while (!should_exit) {
Task t = q.PopFront();
if (!t) {
Expand All @@ -1554,7 +1599,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
if (spin_loop_status_.load(std::memory_order_relaxed) == SpinLoopStatus::kIdle) {
break;
}
onnxruntime::concurrency::SpinPause();
waiter.wait();
}

// Attempt to block
Expand Down
5 changes: 3 additions & 2 deletions include/onnxruntime/core/platform/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct TensorOpCost {

namespace concurrency {

template <typename Environment>
template <typename Environment, bool kIsHybrid>
class ThreadPoolTempl;

class ExtendedThreadPoolInterface;
Expand Down Expand Up @@ -424,7 +424,8 @@ class ThreadPool {
ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr;

// If used, underlying_threadpool_ is instantiated and owned by the ThreadPool.
std::unique_ptr<ThreadPoolTempl<Env> > extended_eigen_threadpool_;
std::unique_ptr<ThreadPoolTempl<Env, true>> extended_eigen_hybrid_threadpool_;
std::unique_ptr<ThreadPoolTempl<Env, false>> extended_eigen_normal_threadpool_;

// Force the thread pool to run in hybrid mode on a normal cpu.
bool force_hybrid_ = false;
Expand Down
38 changes: 25 additions & 13 deletions onnxruntime/core/common/threadpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,23 @@ ThreadPool::ThreadPool(Env* env,
assert(thread_options_.affinities.size() >= size_t(threads_to_create));
}

extended_eigen_threadpool_ =
std::make_unique<ThreadPoolTempl<Env> >(name,
threads_to_create,
low_latency_hint,
*env,
thread_options_);
underlying_threadpool_ = extended_eigen_threadpool_.get();
if (force_hybrid_) {
extended_eigen_hybrid_threadpool_ =
std::make_unique<ThreadPoolTempl<Env, true> >(name,
threads_to_create,
low_latency_hint,
*env,
thread_options_);
underlying_threadpool_ = extended_eigen_hybrid_threadpool_.get();
} else {
extended_eigen_normal_threadpool_ =
std::make_unique<ThreadPoolTempl<Env, false> >(name,
threads_to_create,
low_latency_hint,
*env,
thread_options_);
underlying_threadpool_ = extended_eigen_normal_threadpool_.get();
}
}
}

Expand Down Expand Up @@ -664,15 +674,17 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) {
}

void ThreadPool::EnableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->EnableSpinning();
}
if (extended_eigen_hybrid_threadpool_)
extended_eigen_hybrid_threadpool_->EnableSpinning();
else if (extended_eigen_normal_threadpool_)
extended_eigen_normal_threadpool_->EnableSpinning();
}

void ThreadPool::DisableSpinning() {
if (extended_eigen_threadpool_) {
extended_eigen_threadpool_->DisableSpinning();
}
if (extended_eigen_hybrid_threadpool_)
extended_eigen_hybrid_threadpool_->DisableSpinning();
else if (extended_eigen_normal_threadpool_)
extended_eigen_normal_threadpool_->DisableSpinning();
}

// Return the number of threads created by the pool.
Expand Down

0 comments on commit 4e15b22

Please sign in to comment.