From 7677a0a0a70d6e0fc1c92dce70c7c038f47078bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Flemstr=C3=B6m?= Date: Fri, 22 Nov 2024 13:15:19 +0100 Subject: [PATCH] native_layer: Add support for creating async task tracks --- Cargo.lock | 61 +++++++- Cargo.toml | 1 + crates/layer/Cargo.toml | 1 + crates/layer/src/ids.rs | 9 +- crates/layer/src/native_layer.rs | 257 +++++++++++++++++++------------ 5 files changed, 230 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e176258..9404120 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,6 +172,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -310,6 +324,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.0" @@ -335,7 +355,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", ] [[package]] @@ -398,6 +418,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -483,6 +513,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pbjson" version = "0.6.0" @@ -629,6 +672,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -683,6 +735,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "scratch" version = "1.0.7" @@ -940,6 +998,7 @@ dependencies = [ "anyhow", "bytes", "cxx", + "dashmap", "nix", "prost", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index c5f63c6..ddfc9dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ anyhow = "1.0.90" bytes = "1.8.0" cxx = { version = "1.0.129", features = ["c++17"] } cxx-build = { version = "1.0.129", features = ["parallel"] } +dashmap = "6.1.0" futures = "0.3.31" nix = "0.29.0" pbjson = "0.6.0" diff --git a/crates/layer/Cargo.toml b/crates/layer/Cargo.toml index 2dc71b3..16d76c7 100644 --- a/crates/layer/Cargo.toml +++ b/crates/layer/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true [dependencies] bytes.workspace = true cxx.workspace = true +dashmap.workspace = true nix = { workspace = true, features = ["time"] } prost.workspace = true thiserror.workspace = true diff --git a/crates/layer/src/ids.rs b/crates/layer/src/ids.rs index ec25d20..9f38770 100644 --- a/crates/layer/src/ids.rs +++ b/crates/layer/src/ids.rs @@ -38,6 +38,13 @@ impl TrackUuid { TrackUuid(h.finish()) } + #[cfg(feature = "tokio")] + pub fn for_task(id: task::Id) -> TrackUuid { + let mut h = hash::DefaultHasher::new(); + (TRACK_UUID_NS, TASK_NS, id).hash(&mut h); + TrackUuid(h.finish()) + } + #[cfg(feature = "tokio")] pub fn for_tokio() -> TrackUuid { let mut h = hash::DefaultHasher::new(); @@ -66,7 +73,7 @@ impl SequenceId { #[cfg(feature = "tokio")] pub fn for_task(id: task::Id) -> SequenceId { let mut h = hash::DefaultHasher::new(); - (TRACK_UUID_NS, TASK_NS, id).hash(&mut h); + (SEQUENCE_ID_NS, TASK_NS, id).hash(&mut h); SequenceId(h.finish() as u32) } diff --git a/crates/layer/src/native_layer.rs b/crates/layer/src/native_layer.rs index 8294b30..6b636fe 100644 --- a/crates/layer/src/native_layer.rs +++ b/crates/layer/src/native_layer.rs @@ -1,6 +1,6 @@ //! The main tracing layer and related utils exposed by this crate. use std::sync::atomic; -use std::{borrow, cell, env, marker, mem, process, sync, thread, time}; +use std::{borrow, env, marker, mem, process, sync, thread, time}; use prost::encoding; #[cfg(feature = "tokio")] @@ -44,21 +44,12 @@ pub struct Builder<'c, W> { force_flavor: Option, delay_slice_begin: bool, discard_tracing_data: bool, + create_async_tracks: Option, enable_in_process: bool, enable_system: bool, _phantom: marker::PhantomData<&'c ()>, } -#[derive(Default)] -struct ThreadLocalCtx { - descriptor_sent: atomic::AtomicBool, - // We probably don't need a fancy data structure like `HashSet` or similar because this is - // expected to contain a small (<100 entries) set of short (<20 chars) strings, and they are - // all static (meaning high CPU cache coherence), so a linear scan + equals check is probably - // faster than hashing strings etc. - counters_sent_cache: cell::RefCell>>, -} - struct Inner where W: for<'w> fmt::MakeWriter<'w>, @@ -72,19 +63,17 @@ where force_flavor: Option, delay_slice_begin: bool, discard_tracing_data: bool, + create_async_tracks: Option, process_track_uuid: ids::TrackUuid, process_descriptor_sent: atomic::AtomicBool, - // Mutex is held very briefly for member check and to insert a string if it is missing. - // We probably don't need a fancy data structure like `HashSet` or similar because this is - // expected to contain a small (<100 entries) set of short (<20 chars) strings, and they are - // all static (meaning high CPU cache coherence), so a linear scan + equals check is probably - // faster than hashing strings etc. - counters_sent: sync::Mutex>, #[cfg(feature = "tokio")] tokio_descriptor_sent: atomic::AtomicBool, #[cfg(feature = "tokio")] tokio_track_uuid: ids::TrackUuid, - thread_local_ctxs: thread_local::ThreadLocal, + counter_tracks_sent: dashmap::DashSet<&'static str>, + thread_tracks_sent: dashmap::DashSet, + #[cfg(feature = "tokio")] + task_tracks_sent: dashmap::DashSet, } // Does not contain DebugAnnotations; they are shipped separately as a span @@ -163,15 +152,18 @@ where let force_flavor = builder.force_flavor; let delay_slice_begin = builder.delay_slice_begin; let discard_tracing_data = builder.discard_tracing_data; + let create_async_tracks = builder.create_async_tracks; let pid = process::id(); let process_track_uuid = ids::TrackUuid::for_process(pid); let process_descriptor_sent = atomic::AtomicBool::new(false); - let counters_sent = sync::Mutex::new(Vec::new()); #[cfg(feature = "tokio")] let tokio_descriptor_sent = atomic::AtomicBool::new(false); #[cfg(feature = "tokio")] let tokio_track_uuid = ids::TrackUuid::for_tokio(); - let thread_local_ctxs = thread_local::ThreadLocal::new(); + let counter_tracks_sent = dashmap::DashSet::new(); + let thread_tracks_sent = dashmap::DashSet::new(); + #[cfg(feature = "tokio")] + let task_tracks_sent = dashmap::DashSet::new(); let inner = sync::Arc::new(Inner { #[cfg(feature = "sdk")] @@ -182,14 +174,17 @@ where force_flavor, delay_slice_begin, discard_tracing_data, + create_async_tracks, process_track_uuid, process_descriptor_sent, - counters_sent, #[cfg(feature = "tokio")] tokio_descriptor_sent, #[cfg(feature = "tokio")] tokio_track_uuid, - thread_local_ctxs, + counter_tracks_sent, + thread_tracks_sent, + #[cfg(feature = "tokio")] + task_tracks_sent, }); Ok(Self { inner }) @@ -208,17 +203,20 @@ where } flavor::Flavor::Async => { #[cfg(feature = "tokio")] - if let Some(id) = task::try_id() { - return ( - self.inner.process_track_uuid, - ids::SequenceId::for_task(id), - flavor::Flavor::Async, - ); + if let Some(res) = + self.tokio_trace_track_sequence(self.inner.process_track_uuid) + { + return res; } let tid = thread_id::get(); + let track_uuid = if self.inner.create_async_tracks.is_some() { + ids::TrackUuid::for_thread(tid) + } else { + self.inner.process_track_uuid + }; ( - self.inner.process_track_uuid, + track_uuid, ids::SequenceId::for_thread(tid), flavor::Flavor::Async, ) @@ -226,12 +224,8 @@ where } } else { #[cfg(feature = "tokio")] - if let Some(id) = task::try_id() { - return ( - self.inner.tokio_track_uuid, - ids::SequenceId::for_task(id), - flavor::Flavor::Async, - ); + if let Some(res) = self.tokio_trace_track_sequence(self.inner.tokio_track_uuid) { + return res; } let tid = thread_id::get(); @@ -243,6 +237,25 @@ where } } + #[cfg(feature = "tokio")] + fn tokio_trace_track_sequence( + &self, + default_track: ids::TrackUuid, + ) -> Option<(ids::TrackUuid, ids::SequenceId, flavor::Flavor)> { + let id = task::try_id()?; + let track_uuid = if self.inner.create_async_tracks.is_some() { + ids::TrackUuid::for_task(id) + } else { + default_track + }; + + Some(( + track_uuid, + ids::SequenceId::for_task(id), + flavor::Flavor::Async, + )) + } + /// Flush internal buffers, making the best effort for all pending writes to /// be visible on this layer's `writer`. pub fn flush( @@ -361,7 +374,11 @@ where self.ensure_process_known(meta); self.ensure_thread_known(meta); #[cfg(feature = "tokio")] - self.ensure_tokio_runtime_known(meta); + if let Some(ref name) = self.inner.create_async_tracks { + self.ensure_task_track_known(meta, name); + } else { + self.ensure_tokio_runtime_known(meta); + } } fn ensure_process_known(&self, meta: &tracing::Metadata) { @@ -387,17 +404,17 @@ where } fn ensure_thread_known(&self, meta: &tracing::Metadata) { - let thread_local_ctx = self.inner.thread_local_ctxs.get_or(ThreadLocalCtx::default); - let thread_descriptor_sent = thread_local_ctx - .descriptor_sent - .fetch_or(true, atomic::Ordering::Relaxed); - if !thread_descriptor_sent { - let thread_id = thread_id::get(); + let thread_id = thread_id::get(); + if self.inner.thread_tracks_sent.insert(thread_id) { let thread_name = thread::current() .name() .map(|s| s.to_owned()) .unwrap_or_else(|| format!("(unnamed thread {thread_id})")); - let packet = self.create_thread_track_descriptor(thread_id, thread_name); + let packet = if let Some(ref name) = self.inner.create_async_tracks { + self.create_thread_track_descriptor(thread_id, name.to_owned(), false) + } else { + self.create_thread_track_descriptor(thread_id, thread_name, true) + }; self.write_packet(meta, packet); } } @@ -414,55 +431,27 @@ where } } + #[cfg(feature = "tokio")] + fn ensure_task_track_known(&self, meta: &tracing::Metadata, name: &str) { + if let Some(task_id) = task::try_id() { + if self.inner.task_tracks_sent.insert(task_id) { + let packet = self.create_task_track_descriptor(task_id, name.to_owned()); + self.write_packet(meta, packet); + } + } + } + fn ensure_counters_known( &self, meta: &tracing::Metadata, counters: &[debug_annotations::Counter], ) { - let thread_local_ctx = self.inner.thread_local_ctxs.get_or(ThreadLocalCtx::default); - if let Ok(mut counters_sent_cache) = thread_local_ctx.counters_sent_cache.try_borrow_mut() { - let counters_sent_cache = counters_sent_cache.get_or_insert_with(|| { - self.inner - .counters_sent - .lock() - .map(|cs| cs.clone()) - .unwrap_or_default() - }); - if counters - .iter() - .all(|c| counters_sent_cache.contains(&c.name)) - { - return; - } - } - - // This might seem wasteful, but we want to minimize the time that we hold the - // `counters_sent` mutex, and we can report the counters after we have released - // the lock. Also remember that `Vec::new()` does not allocate until the first - // push! - let mut new_counters = Vec::new(); - - // Skip counters entirely if Mutex is poisoned -- can't afford to panic here - if let Ok(mut counters_sent) = self.inner.counters_sent.lock() { - for counter in counters { - if !counters_sent.contains(&counter.name) { - new_counters.push(counter); - counters_sent.push(counter.name); - } - } - if !new_counters.is_empty() { - if let Ok(mut counters_sent_cache) = - thread_local_ctx.counters_sent_cache.try_borrow_mut() - { - *counters_sent_cache = Some((*counters_sent).clone()); - } + for counter in counters { + if self.inner.counter_tracks_sent.insert(counter.name) { + let packet = self.create_counter_track_descriptor(counter); + self.write_packet(meta, packet); } } - - for counter in new_counters { - let packet = self.create_counter_track_descriptor(counter); - self.write_packet(meta, packet); - } } #[must_use] @@ -496,18 +485,24 @@ where &self, thread_id: usize, thread_name: String, + include_thread_metadata: bool, ) -> schema::TracePacket { + let thread = if include_thread_metadata { + Some(schema::ThreadDescriptor { + pid: Some(process::id() as i32), + tid: Some((thread_id as i32).saturating_abs()), + thread_name: Some(thread_name.clone()), + ..Default::default() + }) + } else { + None + }; schema::TracePacket { data: Some(trace_packet::Data::TrackDescriptor( schema::TrackDescriptor { parent_uuid: Some(self.inner.process_track_uuid.as_raw()), uuid: Some(ids::TrackUuid::for_thread(thread_id).as_raw()), - thread: Some(schema::ThreadDescriptor { - pid: Some(process::id() as i32), - tid: Some((thread_id as i32).saturating_abs()), - thread_name: Some(thread_name.clone()), - ..Default::default() - }), + thread, static_or_dynamic_name: Some(track_descriptor::StaticOrDynamicName::Name( thread_name, )), @@ -544,6 +539,29 @@ where } } + #[cfg(feature = "tokio")] + #[must_use] + fn create_task_track_descriptor(&self, task_id: task::Id, name: String) -> schema::TracePacket { + let parent_uuid = if self.inner.force_flavor == Some(flavor::Flavor::Async) { + self.inner.process_track_uuid + } else { + self.inner.tokio_track_uuid + }; + schema::TracePacket { + data: Some(trace_packet::Data::TrackDescriptor( + schema::TrackDescriptor { + parent_uuid: Some(parent_uuid.as_raw()), + uuid: Some(ids::TrackUuid::for_task(task_id).as_raw()), + static_or_dynamic_name: Some(track_descriptor::StaticOrDynamicName::Name( + name.to_owned(), + )), + ..Default::default() + }, + )), + ..Default::default() + } + } + #[must_use] fn create_counter_track_descriptor( &self, @@ -901,6 +919,7 @@ where force_flavor: None, delay_slice_begin: false, discard_tracing_data: false, + create_async_tracks: None, enable_in_process: true, enable_system: false, _phantom: marker::PhantomData, @@ -934,6 +953,20 @@ where self } + /// Creates a separate track in the Perfetto UI for every "thread" even in + /// async mode. This means there will be a separate "track" for each OS + /// thread *and* Tokio task, but they will be light-weight tracks without as + /// much info as a thread track would have. Instead, all of these tracks + /// will have the specified name. + /// + /// This makes it easier for Perfetto to keep the necessary context window + /// per track for some async events to not get dropped. However, the + /// trade-off is some data inflation and a laggier UI as a result. + pub fn with_create_async_tracks(mut self, create_async_tracks: Option) -> Self { + self.create_async_tracks = create_async_tracks; + self + } + /// Enable in-process collection, where traces will be collected by buffers /// in the Perfetto SDK and spilled to file in-process. pub fn with_enable_in_process(mut self, enable_in_process: bool) -> Self { @@ -1047,11 +1080,21 @@ where #[cfg(not(feature = "sdk"))] static HAS_BOOTTIME: sync::LazyLock = sync::LazyLock::new(|| { - #[cfg(any(linux_android, target_os = "emscripten", target_os = "fuchsia"))] + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + ))] { nix::time::clock_gettime(nix::time::ClockId::CLOCK_BOOTTIME).is_ok() } - #[cfg(not(any(linux_android, target_os = "emscripten", target_os = "fuchsia")))] + #[cfg(not(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + )))] { false } @@ -1065,7 +1108,12 @@ fn trace_time_ns() -> u64 { #[cfg(not(feature = "sdk"))] fn trace_time_ns() -> u64 { use nix::time; - #[cfg(any(linux_android, target_os = "emscripten", target_os = "fuchsia"))] + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + ))] { if *HAS_BOOTTIME { std::time::Duration::from(time::clock_gettime(time::ClockId::CLOCK_BOOTTIME).unwrap()) @@ -1075,7 +1123,12 @@ fn trace_time_ns() -> u64 { .as_nanos() as u64 } } - #[cfg(not(any(linux_android, target_os = "emscripten", target_os = "fuchsia")))] + #[cfg(not(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + )))] { std::time::Duration::from(time::clock_gettime(time::ClockId::CLOCK_MONOTONIC).unwrap()) .as_nanos() as u64 @@ -1089,7 +1142,12 @@ fn trace_clock_id() -> u32 { #[cfg(not(feature = "sdk"))] fn trace_clock_id() -> u32 { - #[cfg(any(linux_android, target_os = "emscripten", target_os = "fuchsia"))] + #[cfg(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + ))] { if *HAS_BOOTTIME { schema::BuiltinClock::Boottime as u32 @@ -1097,7 +1155,12 @@ fn trace_clock_id() -> u32 { schema::BuiltinClock::Monotonic as u32 } } - #[cfg(not(any(linux_android, target_os = "emscripten", target_os = "fuchsia")))] + #[cfg(not(any( + target_os = "linux", + target_os = "android", + target_os = "emscripten", + target_os = "fuchsia" + )))] { schema::BuiltinClock::Monotonic as u32 }