-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stabilize worker_total_busy_duration #6899
base: master
Are you sure you want to change the base?
Changes from 1 commit
8f1fcb4
9b47cf9
e2f4f33
b6974ba
86f019f
64f626d
489003c
8a134a2
4bc00cf
7eb6b97
7239af5
57f6b9b
c8b2c7d
d7333cf
14543de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,47 @@ | ||
use crate::runtime::metrics::{HistogramBatch, WorkerMetrics}; | ||
use crate::runtime::metrics::WorkerMetrics; | ||
|
||
cfg_unstable_metrics! { | ||
use crate::runtime::metrics::HistogramBatch; | ||
} | ||
|
||
use std::sync::atomic::Ordering::Relaxed; | ||
use std::time::{Duration, Instant}; | ||
|
||
pub(crate) struct MetricsBatch { | ||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker parked. | ||
park_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker parked and unparked. | ||
park_unpark_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times the worker woke w/o doing work. | ||
noop_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks stolen. | ||
steal_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of times tasks where stolen. | ||
steal_operations: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks that were polled by the worker. | ||
poll_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks polled when the worker entered park. This is used to | ||
/// track the noop count. | ||
poll_count_on_last_park: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks that were scheduled locally on this worker. | ||
local_schedule_count: u64, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// Number of tasks moved to the global queue to make space in the local | ||
/// queue | ||
overflow_count: u64, | ||
|
@@ -39,87 +52,107 @@ pub(crate) struct MetricsBatch { | |
/// Instant at which work last resumed (continued after park). | ||
processing_scheduled_tasks_started_at: Instant, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// If `Some`, tracks poll times in nanoseconds | ||
poll_timer: Option<PollTimer>, | ||
} | ||
|
||
struct PollTimer { | ||
/// Histogram of poll counts within each band. | ||
poll_counts: HistogramBatch, | ||
cfg_unstable_metrics! { | ||
struct PollTimer { | ||
/// Histogram of poll counts within each band. | ||
poll_counts: HistogramBatch, | ||
|
||
/// Instant when the most recent task started polling. | ||
poll_started_at: Instant, | ||
/// Instant when the most recent task started polling. | ||
poll_started_at: Instant, | ||
} | ||
} | ||
|
||
impl MetricsBatch { | ||
pub(crate) fn new(worker_metrics: &WorkerMetrics) -> MetricsBatch { | ||
pub(crate) fn new(_worker_metrics: &WorkerMetrics) -> MetricsBatch { | ||
let now = Instant::now(); | ||
#[cfg(not(tokio_unstable))] | ||
{ | ||
MetricsBatch { | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
} | ||
} | ||
|
||
MetricsBatch { | ||
park_count: 0, | ||
park_unpark_count: 0, | ||
noop_count: 0, | ||
steal_count: 0, | ||
steal_operations: 0, | ||
poll_count: 0, | ||
poll_count_on_last_park: 0, | ||
local_schedule_count: 0, | ||
overflow_count: 0, | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
poll_timer: worker_metrics | ||
.poll_count_histogram | ||
.as_ref() | ||
.map(|worker_poll_counts| PollTimer { | ||
poll_counts: HistogramBatch::from_histogram(worker_poll_counts), | ||
poll_started_at: now, | ||
}), | ||
#[cfg(tokio_unstable)] | ||
{ | ||
MetricsBatch { | ||
park_count: 0, | ||
park_unpark_count: 0, | ||
noop_count: 0, | ||
steal_count: 0, | ||
steal_operations: 0, | ||
poll_count: 0, | ||
poll_count_on_last_park: 0, | ||
local_schedule_count: 0, | ||
overflow_count: 0, | ||
busy_duration_total: 0, | ||
processing_scheduled_tasks_started_at: now, | ||
poll_timer: _worker_metrics.poll_count_histogram.as_ref().map( | ||
|worker_poll_counts| PollTimer { | ||
poll_counts: HistogramBatch::from_histogram(worker_poll_counts), | ||
poll_started_at: now, | ||
}, | ||
), | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { | ||
worker.mean_poll_time.store(mean_poll_time, Relaxed); | ||
worker.park_count.store(self.park_count, Relaxed); | ||
worker | ||
.park_unpark_count | ||
.store(self.park_unpark_count, Relaxed); | ||
worker.noop_count.store(self.noop_count, Relaxed); | ||
worker.steal_count.store(self.steal_count, Relaxed); | ||
worker | ||
.steal_operations | ||
.store(self.steal_operations, Relaxed); | ||
worker.poll_count.store(self.poll_count, Relaxed); | ||
|
||
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { | ||
#[cfg(tokio_unstable)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per the comment on pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) {
worker
.busy_duration_total
.store(self.busy_duration_total, Relaxed);
self.submit_unstable(worker, mean_poll_time);
}
cfg_not_unstable_metrics! {
#[inline(always)]
fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {}
}
cfg_unstable_metrics! {
#[inline(always)]
fn submit_unstable(&mut self, worker: &WorkerMetrics, mean_poll_time: u64){
worker.mean_poll_time.store(_mean_poll_time, Relaxed);
worker.park_count.store(self.park_count, Relaxed);
worker
.park_unpark_count
.store(self.park_unpark_count, Relaxed);
worker.noop_count.store(self.noop_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker
.steal_operations
.store(self.steal_operations, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
worker
.local_schedule_count
.store(self.local_schedule_count, Relaxed);
worker.overflow_count.store(self.overflow_count, Relaxed);
if let Some(poll_timer) = &self.poll_timer {
let dst = worker.poll_count_histogram.as_ref().unwrap();
poll_timer.poll_counts.submit(dst);
}
}
} Use the same |
||
{ | ||
worker.mean_poll_time.store(_mean_poll_time, Relaxed); | ||
worker.park_count.store(self.park_count, Relaxed); | ||
worker | ||
.park_unpark_count | ||
.store(self.park_unpark_count, Relaxed); | ||
worker.noop_count.store(self.noop_count, Relaxed); | ||
worker.steal_count.store(self.steal_count, Relaxed); | ||
worker | ||
.steal_operations | ||
.store(self.steal_operations, Relaxed); | ||
worker.poll_count.store(self.poll_count, Relaxed); | ||
|
||
worker | ||
.local_schedule_count | ||
.store(self.local_schedule_count, Relaxed); | ||
worker.overflow_count.store(self.overflow_count, Relaxed); | ||
|
||
if let Some(poll_timer) = &self.poll_timer { | ||
let dst = worker.poll_count_histogram.as_ref().unwrap(); | ||
poll_timer.poll_counts.submit(dst); | ||
} | ||
} | ||
worker | ||
.busy_duration_total | ||
.store(self.busy_duration_total, Relaxed); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move the stablized items up to the top of the function. |
||
worker | ||
.local_schedule_count | ||
.store(self.local_schedule_count, Relaxed); | ||
worker.overflow_count.store(self.overflow_count, Relaxed); | ||
|
||
if let Some(poll_timer) = &self.poll_timer { | ||
let dst = worker.poll_count_histogram.as_ref().unwrap(); | ||
poll_timer.poll_counts.submit(dst); | ||
} | ||
} | ||
|
||
/// The worker is about to park. | ||
pub(crate) fn about_to_park(&mut self) { | ||
self.park_count += 1; | ||
self.park_unpark_count += 1; | ||
|
||
if self.poll_count_on_last_park == self.poll_count { | ||
self.noop_count += 1; | ||
} else { | ||
self.poll_count_on_last_park = self.poll_count; | ||
#[cfg(tokio_unstable)] | ||
{ | ||
self.park_count += 1; | ||
self.park_unpark_count += 1; | ||
|
||
if self.poll_count_on_last_park == self.poll_count { | ||
self.noop_count += 1; | ||
} else { | ||
self.poll_count_on_last_park = self.poll_count; | ||
} | ||
} | ||
} | ||
|
||
/// The worker was unparked. | ||
pub(crate) fn unparked(&mut self) { | ||
self.park_unpark_count += 1; | ||
#[cfg(tokio_unstable)] | ||
{ | ||
self.park_unpark_count += 1; | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the rest of the Tokio code base, we do this a different way, so let's stick to that convension. Instead of gating functionality within a function, we have a separate empty function definition when the cfg flag isn't enabled. So this function would become: cfg_unstable_metrics! {
/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}
}
cfg_not_unstable_metrics! {
/// The worker was unparked.
pub(crate) fn unparked(&mut self) {}
} Please do the same here. Keep a single For the more complex functions above that have a mix of stablized and unstablized implementation, split the unstablized part out into a separate function with an impl in each of the macro blocks (see example in the comment on |
||
/// Start processing a batch of tasks | ||
|
@@ -135,38 +168,50 @@ impl MetricsBatch { | |
|
||
/// Start polling an individual task | ||
pub(crate) fn start_poll(&mut self) { | ||
self.poll_count += 1; | ||
|
||
if let Some(poll_timer) = &mut self.poll_timer { | ||
poll_timer.poll_started_at = Instant::now(); | ||
#[cfg(tokio_unstable)] | ||
{ | ||
self.poll_count += 1; | ||
if let Some(poll_timer) = &mut self.poll_timer { | ||
poll_timer.poll_started_at = Instant::now(); | ||
} | ||
} | ||
} | ||
|
||
/// Stop polling an individual task | ||
pub(crate) fn end_poll(&mut self) { | ||
#[cfg(tokio_unstable)] | ||
if let Some(poll_timer) = &mut self.poll_timer { | ||
let elapsed = duration_as_u64(poll_timer.poll_started_at.elapsed()); | ||
poll_timer.poll_counts.measure(elapsed, 1); | ||
} | ||
} | ||
|
||
pub(crate) fn inc_local_schedule_count(&mut self) { | ||
self.local_schedule_count += 1; | ||
#[cfg(tokio_unstable)] | ||
{ | ||
self.local_schedule_count += 1; | ||
} | ||
} | ||
} | ||
|
||
cfg_rt_multi_thread! { | ||
impl MetricsBatch { | ||
pub(crate) fn incr_steal_count(&mut self, by: u16) { | ||
self.steal_count += by as u64; | ||
pub(crate) fn incr_steal_count(&mut self, _by: u16) { | ||
#[cfg(tokio_unstable)] { | ||
self.steal_count += _by as u64; | ||
} | ||
} | ||
|
||
pub(crate) fn incr_steal_operations(&mut self) { | ||
self.steal_operations += 1; | ||
#[cfg(tokio_unstable)] { | ||
self.steal_operations += 1; | ||
} | ||
} | ||
|
||
pub(crate) fn incr_overflow_count(&mut self) { | ||
self.overflow_count += 1; | ||
#[cfg(tokio_unstable)] { | ||
self.overflow_count += 1; | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split this into 2 implementations each gated by
cfg_(not_)unstable_metrics!
.