Skip to content

Commit

Permalink
Add helper for conditionally recording metrics (#20143)
Browse files Browse the repository at this point in the history
Prompted by
#20138 (comment),
this adds two helpers for a common pattern: increment a metric counter
or record a metric observation _if_ the current thread has a workunit
handle set.

As a related drive-by, this also notices that `increment_counter` takes
`&mut self` but is happy with `&self` (and similarly for one
`record_observation` function), and so swaps it to use `&self`.
  • Loading branch information
huonw authored Nov 5, 2023
1 parent 73c64b2 commit 7d8f306
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 45 deletions.
18 changes: 8 additions & 10 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,16 +973,14 @@ impl ByteStore {
dbs.load_bytes_with(digest.hash, len_checked_f).await?
};

if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() {
workunit_store_handle.store.record_observation(
ObservationMetric::LocalStoreReadBlobSize,
digest.size_bytes as u64,
);
workunit_store_handle.store.record_observation(
ObservationMetric::LocalStoreReadBlobTimeMicros,
start.elapsed().as_micros() as u64,
);
}
workunit_store::record_observation_if_in_workunit(
ObservationMetric::LocalStoreReadBlobSize,
digest.size_bytes as u64,
);
workunit_store::record_observation_if_in_workunit(
ObservationMetric::LocalStoreReadBlobTimeMicros,
start.elapsed().as_micros() as u64,
);

Ok(result)
}
Expand Down
6 changes: 2 additions & 4 deletions src/rust/engine/grpc_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tower::limit::ConcurrencyLimit;
use tower::timeout::{Timeout, TimeoutLayer};
use tower::ServiceBuilder;
use tower_service::Service;
use workunit_store::{get_workunit_store_handle, Metric, ObservationMetric};
use workunit_store::{increment_counter_if_in_workunit, Metric, ObservationMetric};

use crate::channel::Channel;
use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer};
Expand Down Expand Up @@ -164,9 +164,7 @@ where
result
.inspect_err(move |_| {
if let Some(metric) = metric {
if let Some(mut workunit_store_handle) = get_workunit_store_handle() {
workunit_store_handle.store.increment_counter(metric, 1)
}
increment_counter_if_in_workunit(metric, 1);
}
})
.boxed()
Expand Down
9 changes: 2 additions & 7 deletions src/rust/engine/grpc_util/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use http::{Request, Response};
use pin_project::pin_project;
use tower_layer::Layer;
use tower_service::Service;
use workunit_store::{get_workunit_store_handle, ObservationMetric};
use workunit_store::{record_observation_if_in_workunit, ObservationMetric};

#[derive(Clone, Debug)]
pub struct NetworkMetricsLayer {
Expand Down Expand Up @@ -81,12 +81,7 @@ where
let this = self.project();
let result = ready!(this.inner.poll(cx));
if let Some((metric, start)) = metric_data {
let workunit_store_handle = get_workunit_store_handle();
if let Some(workunit_store_handle) = workunit_store_handle {
workunit_store_handle
.store
.record_observation(metric, start.elapsed().as_micros() as u64)
}
record_observation_if_in_workunit(metric, start.elapsed().as_micros() as u64)
}
Poll::Ready(result)
}
Expand Down
18 changes: 8 additions & 10 deletions src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,14 @@ impl Provider {
Err(e) => return Err(format!("failed to read {}: {}", path, e)),
};

if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() {
// TODO: this pretends that the time-to-first-byte can be approximated by "time to create
// reader", which is often not really true.
let timing: Result<u64, _> =
Instant::now().duration_since(start).as_micros().try_into();
if let Ok(obs) = timing {
workunit_store_handle
.store
.record_observation(ObservationMetric::RemoteStoreTimeToFirstByteMicros, obs);
}
// TODO: this pretends that the time-to-first-byte can be approximated by "time to create
// reader", which is often not really true.
let timing: Result<u64, _> = Instant::now().duration_since(start).as_micros().try_into();
if let Ok(obs) = timing {
workunit_store::record_observation_if_in_workunit(
ObservationMetric::RemoteStoreTimeToFirstByteMicros,
obs,
);
}

match mode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,14 @@ impl ByteStoreProvider for Provider {
let mut stream = response.into_inner().inspect(|_| {
// Record the observed time to receive the first response for this read.
if let Some(start) = start_opt.take() {
if let Some(workunit_store_handle) =
workunit_store::get_workunit_store_handle()
{
let timing: Result<u64, _> =
Instant::now().duration_since(start).as_micros().try_into();
if let Ok(obs) = timing {
workunit_store_handle.store.record_observation(
ObservationMetric::RemoteStoreTimeToFirstByteMicros,
obs,
);
}
let timing: Result<u64, _> =
Instant::now().duration_since(start).as_micros().try_into();

if let Ok(obs) = timing {
workunit_store::record_observation_if_in_workunit(
ObservationMetric::RemoteStoreTimeToFirstByteMicros,
obs,
);
}
}
});
Expand Down
19 changes: 16 additions & 3 deletions src/rust/engine/workunit_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl WorkunitStore {
.latest_workunits(max_verbosity)
}

pub fn increment_counter(&mut self, counter_name: Metric, change: u64) {
pub fn increment_counter(&self, counter_name: Metric, change: u64) {
self.metrics_data
.counters
.lock()
Expand Down Expand Up @@ -841,6 +841,19 @@ pub fn expect_workunit_store_handle() -> WorkunitStoreHandle {
get_workunit_store_handle().expect("A WorkunitStore has not been set for this thread.")
}

/// If this thread has a workunit set, increment `counter_name` by `change`.
pub fn increment_counter_if_in_workunit(counter_name: Metric, change: u64) {
if let Some(handle) = get_workunit_store_handle() {
handle.store.increment_counter(counter_name, change)
}
}
/// If this thread has a workunit set, record `value` as an observation of `metric`.
pub fn record_observation_if_in_workunit(metric: ObservationMetric, value: u64) {
if let Some(handle) = get_workunit_store_handle() {
handle.store.record_observation(metric, value)
}
}

/// Run the given async block. If the level given by the WorkunitMetadata is above a configured
/// threshold, the block will run inside of a workunit recorded in the workunit store.
///
Expand Down Expand Up @@ -900,11 +913,11 @@ impl RunningWorkunit {
}
}

pub fn record_observation(&mut self, metric: ObservationMetric, value: u64) {
pub fn record_observation(&self, metric: ObservationMetric, value: u64) {
self.store.record_observation(metric, value);
}

pub fn increment_counter(&mut self, counter_name: Metric, change: u64) {
pub fn increment_counter(&self, counter_name: Metric, change: u64) {
self.store.increment_counter(counter_name, change);
}

Expand Down

0 comments on commit 7d8f306

Please sign in to comment.