Skip to content

Commit

Permalink
native_layer: Add thread local cache for counters
Browse files Browse the repository at this point in the history
  • Loading branch information
dflemstr committed Nov 1, 2024
1 parent aeb4a52 commit 3ab0a68
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions crates/layer/src/native_layer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The main tracing layer and related utils exposed by this crate.
use std::sync::atomic;
use std::{borrow, env, process, sync, thread, time};
use std::{borrow, cell, env, process, sync, thread, time};

use prost::encoding;
#[cfg(feature = "tokio")]
Expand Down Expand Up @@ -42,8 +42,14 @@ pub struct Builder<'c, W> {
enable_system: bool,
}

#[derive(Default)]
struct ThreadLocalCtx {
descriptor_sent: atomic::AtomicBool,
// We probably don't need a fancy data structure like `HashSet` or similar because this is
// expected to contain a small (<100 entries) set of short (<20 chars) strings, and they are
// all static (meaning high CPU cache coherence), so a linear scan + equals check is probably
// faster than hashing strings etc.
counters_sent_cache: cell::RefCell<Option<Vec<&'static str>>>,
}

struct Inner<W>
Expand All @@ -60,7 +66,7 @@ where
process_descriptor_sent: atomic::AtomicBool,
// Mutex is held very briefly for member check and to insert a string if it is missing.
// We probably don't need a fancy data structure like `HashSet` or similar because this is
// expected to contain a small (<20 entries) set of short (<20 chars) strings, and they are
// expected to contain a small (<100 entries) set of short (<20 chars) strings, and they are
// all static (meaning high CPU cache coherence), so a linear scan + equals check is probably
// faster than hashing strings etc.
counters_sent: sync::Mutex<Vec<&'static str>>,
Expand Down Expand Up @@ -270,9 +276,7 @@ where
}

fn ensure_thread_known(&self, meta: &tracing::Metadata) {
let thread_local_ctx = self.inner.thread_local_ctxs.get_or(|| ThreadLocalCtx {
descriptor_sent: atomic::AtomicBool::new(false),
});
let thread_local_ctx = self.inner.thread_local_ctxs.get_or(ThreadLocalCtx::default);
let thread_descriptor_sent = thread_local_ctx
.descriptor_sent
.fetch_or(true, atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -352,6 +356,23 @@ where
meta: &tracing::Metadata,
counters: &[debug_annotations::Counter],
) {
let thread_local_ctx = self.inner.thread_local_ctxs.get_or(ThreadLocalCtx::default);
if let Ok(mut counters_sent_cache) = thread_local_ctx.counters_sent_cache.try_borrow_mut() {
let counters_sent_cache = counters_sent_cache.get_or_insert_with(|| {
self.inner
.counters_sent
.lock()
.map(|cs| cs.clone())
.unwrap_or_default()
});
if counters
.iter()
.all(|c| counters_sent_cache.contains(&c.name))
{
return;
}
}

// This might seem wasteful, but we want to minimize the time that we hold the
// `counters_sent` mutex, and we can report the counters after we have released
// the lock. Also remember that `Vec::new()` does not allocate until the first
Expand All @@ -360,10 +381,19 @@ where

// Skip counters entirely if Mutex is poisoned -- can't afford to panic here
if let Ok(mut counters_sent) = self.inner.counters_sent.lock() {
let mut had_new_counters = false;
for counter in counters {
if !counters_sent.contains(&counter.name) {
new_counters.push(counter);
counters_sent.push(counter.name);
had_new_counters = true;
}
}
if had_new_counters {
if let Ok(mut counters_sent_cache) =
thread_local_ctx.counters_sent_cache.try_borrow_mut()
{
*counters_sent_cache = Some((*counters_sent).clone());
}
}
}
Expand Down

0 comments on commit 3ab0a68

Please sign in to comment.