From 4e15b229a0ab8334ba7639b03b42c15edb968613 Mon Sep 17 00:00:00 2001
From: goldsteinn <35538541+goldsteinn@users.noreply.github.com>
Date: Tue, 1 Oct 2024 19:25:02 -0500
Subject: [PATCH] ThreadPool: Spend less time busy waiting. (#21545)
The purpose of the patch is primarily to save power, but it also has
nice perf benefits (mostly from allowing the system to better distribute
power to cores doing meaningful work).
Changes are twofold:
1) Decrease WorkerLoop spin count dramatically ~10^6 -> ~10^4. The
reality is after ~10^4 spins, if there hasn't been any new work
added its unlikely any new work is imminent so sleep to
preserve power. This aligns more closely with upstream EigenV3.
2) Use exponential backoff for waiting on memory. This saves a bit
more power, and important increases the time between iterations
in WorkerLoop to help accomidate the dramatically lowering spin
counts.
Since the tuning for both the iteration counts / backoff counts are
dramatically different for hybrid/non-hybrid systems, this patch
templates the affected functions and dynamically choses based on
`CPUIDInfo::IsHybrid()`. This seemed like the "lightest weight" way of
getting the change in, although its likely we could incur less dynamic
overhead if we added the template argument to the entirety of
`ThreadPoolTempl`.
Measured performance on an [Intel Meteor Lake
CPU](https://www.intel.com/content/www/us/en/products/sku/237329/intel-core-ultra-7-processor-165u-12m-cache-up-to-4-90-ghz/specifications.html)
across a range of models.
Below are the result of 3 runs with each metric being the
value-before-patch / value-after-patch (so for something like inference
time, lower is better).
Session creation time cost |
0.7179 |
First inference time cost |
0.7156 |
Total inference time cost |
1.0146 |
Total inference requests |
0.8874 |
Average inference time cost |
0.8800 |
Total inference run time |
1.0146 |
Number of inferences per second |
0.8955 |
Avg CPU usage |
0.9462 |
Peak working set size |
0.9922 |
Runs |
1.1552 |
Min Latency |
0.7283 |
Max Latency |
0.9258 |
P50 Latency |
0.9534 |
P90 Latency |
0.9639 |
P95 Latency |
0.9659 |
P99 Latency |
0.9640 |
So the net result is a 1.16x improvement in throughput and between
1.08-1.37x improvement in latency.
---
.../platform/EigenNonBlockingThreadPool.h | 65 ++++++++++++++++---
.../onnxruntime/core/platform/threadpool.h | 5 +-
onnxruntime/core/common/threadpool.cc | 38 +++++++----
3 files changed, 83 insertions(+), 25 deletions(-)
diff --git a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
index d4411a6d72356..53d65f4ade9a4 100644
--- a/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
+++ b/include/onnxruntime/core/platform/EigenNonBlockingThreadPool.h
@@ -695,7 +695,7 @@ class RunQueue {
static std::atomic next_tag{1};
-template
+template
class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInterface {
private:
struct PerThread;
@@ -767,6 +767,29 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
typedef std::function Task;
typedef RunQueue Queue;
+ // Class for waiting w/ exponential backoff.
+ // Template argument is maximum number of spins in backoff loop.
+ template
+ class ThreadPoolWaiter {
+ // Current number if spins in backoff loop
+ unsigned pause_time_;
+
+ public:
+ void wait() {
+ // If kMaxBackoff is zero don't do any pausing.
+ if constexpr (kMaxBackoff == 1) {
+ onnxruntime::concurrency::SpinPause();
+ } else if constexpr (kMaxBackoff > 1) {
+ // Exponential backoff
+ unsigned pause_time = pause_time_ + 1U;
+ for (unsigned i = 0; i < pause_time; ++i) {
+ onnxruntime::concurrency::SpinPause();
+ }
+ pause_time_ = (pause_time * 2U) % kMaxBackoff;
+ }
+ }
+ };
+
ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env,
const ThreadOptions& thread_options)
: profiler_(num_threads, name),
@@ -908,8 +931,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// finish dispatch work. This avoids new tasks being started
// concurrently with us attempting to end the parallel section.
if (ps.dispatch_q_idx != -1) {
+ ThreadPoolWaiter<4> waiter{};
while (!ps.dispatch_done.load(std::memory_order_acquire)) {
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
}
}
@@ -931,15 +955,17 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// Wait for the dispatch task's own work...
if (ps.dispatch_q_idx > -1) {
+ ThreadPoolWaiter waiter{};
while (!ps.work_done.load(std::memory_order_acquire)) {
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
}
}
// ...and wait for any other tasks not revoked to finish their work
auto tasks_to_wait_for = tasks_started - ps.tasks_revoked;
+ ThreadPoolWaiter waiter{};
while (ps.tasks_finished < tasks_to_wait_for) {
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
}
// Clear status to allow the ThreadPoolParallelSection to be
@@ -1257,9 +1283,10 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// Increase the worker count if needed. Each worker will pick up
// loops to execute from the current parallel section.
std::function worker_fn = [&ps](unsigned par_idx) {
+ ThreadPoolWaiter waiter{};
while (ps.active) {
if (ps.current_loop.load() == nullptr) {
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
} else {
ps.workers_in_loop++;
ThreadPoolLoop* work_item = ps.current_loop;
@@ -1280,8 +1307,9 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
// Wait for workers to exit the loop
ps.current_loop = 0;
+ ThreadPoolWaiter waiter{};
while (ps.workers_in_loop) {
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
}
profiler_.LogEnd(ThreadPoolProfiler::WAIT);
}
@@ -1532,13 +1560,30 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);
- constexpr int log2_spin = 20;
- const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
- const int steal_count = spin_count / 100;
+ // The exact value of spin_count and steal_count are arbitrary and
+ // were experimentally determined. These numbers yielded the best
+ // performance across a range of workloads and
+ // machines. Generally, the goal of tuning spin_count is to make
+ // the number as small as possible while ensuring there is enough
+ // slack so that if each core is doing the same amount of work it
+ // won't sleep before they have all finished. The idea here is
+ // that in pipelined workloads, it won't sleep during each stage
+ // if it's done a bit faster than its neighbors, but that if there
+ // are non-equal sizes of work distributed, it won't take too long
+ // to reach sleep giving power (and thus frequency/performance) to
+ // its neighbors. Since hybrid has P/E cores, a lower value is
+ // chosen. On hybrid systems, even with equal sized workloads
+ // distributed the compute time won't stay synced. Typically in
+ // the hybrid case the P cores finish first (and are thus waiting)
+ // which is essentially a priority inversion.
+ constexpr int pref_spin_count = kIsHybrid ? 5000 : 10000;
+ const int spin_count = allow_spinning_ ? pref_spin_count : 0;
+ constexpr int steal_count = pref_spin_count / (kIsHybrid ? 25 : 100);
SetDenormalAsZero(set_denormal_as_zero_);
profiler_.LogThreadId(thread_id);
+ ThreadPoolWaiter waiter{};
while (!should_exit) {
Task t = q.PopFront();
if (!t) {
@@ -1554,7 +1599,7 @@ class ThreadPoolTempl : public onnxruntime::concurrency::ExtendedThreadPoolInter
if (spin_loop_status_.load(std::memory_order_relaxed) == SpinLoopStatus::kIdle) {
break;
}
- onnxruntime::concurrency::SpinPause();
+ waiter.wait();
}
// Attempt to block
diff --git a/include/onnxruntime/core/platform/threadpool.h b/include/onnxruntime/core/platform/threadpool.h
index 04df6dc982c6a..8b0f8044b2351 100644
--- a/include/onnxruntime/core/platform/threadpool.h
+++ b/include/onnxruntime/core/platform/threadpool.h
@@ -129,7 +129,7 @@ struct TensorOpCost {
namespace concurrency {
-template
+template
class ThreadPoolTempl;
class ExtendedThreadPoolInterface;
@@ -424,7 +424,8 @@ class ThreadPool {
ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr;
// If used, underlying_threadpool_ is instantiated and owned by the ThreadPool.
- std::unique_ptr > extended_eigen_threadpool_;
+ std::unique_ptr> extended_eigen_hybrid_threadpool_;
+ std::unique_ptr> extended_eigen_normal_threadpool_;
// Force the thread pool to run in hybrid mode on a normal cpu.
bool force_hybrid_ = false;
diff --git a/onnxruntime/core/common/threadpool.cc b/onnxruntime/core/common/threadpool.cc
index 7b62de799b6fc..f7b511fc45e72 100644
--- a/onnxruntime/core/common/threadpool.cc
+++ b/onnxruntime/core/common/threadpool.cc
@@ -389,13 +389,23 @@ ThreadPool::ThreadPool(Env* env,
assert(thread_options_.affinities.size() >= size_t(threads_to_create));
}
- extended_eigen_threadpool_ =
- std::make_unique >(name,
- threads_to_create,
- low_latency_hint,
- *env,
- thread_options_);
- underlying_threadpool_ = extended_eigen_threadpool_.get();
+ if (force_hybrid_) {
+ extended_eigen_hybrid_threadpool_ =
+ std::make_unique >(name,
+ threads_to_create,
+ low_latency_hint,
+ *env,
+ thread_options_);
+ underlying_threadpool_ = extended_eigen_hybrid_threadpool_.get();
+ } else {
+ extended_eigen_normal_threadpool_ =
+ std::make_unique >(name,
+ threads_to_create,
+ low_latency_hint,
+ *env,
+ thread_options_);
+ underlying_threadpool_ = extended_eigen_normal_threadpool_.get();
+ }
}
}
@@ -664,15 +674,17 @@ std::string ThreadPool::StopProfiling(concurrency::ThreadPool* tp) {
}
void ThreadPool::EnableSpinning() {
- if (extended_eigen_threadpool_) {
- extended_eigen_threadpool_->EnableSpinning();
- }
+ if (extended_eigen_hybrid_threadpool_)
+ extended_eigen_hybrid_threadpool_->EnableSpinning();
+ else if (extended_eigen_normal_threadpool_)
+ extended_eigen_normal_threadpool_->EnableSpinning();
}
void ThreadPool::DisableSpinning() {
- if (extended_eigen_threadpool_) {
- extended_eigen_threadpool_->DisableSpinning();
- }
+ if (extended_eigen_hybrid_threadpool_)
+ extended_eigen_hybrid_threadpool_->DisableSpinning();
+ else if (extended_eigen_normal_threadpool_)
+ extended_eigen_normal_threadpool_->DisableSpinning();
}
// Return the number of threads created by the pool.