Skip to content

Commit

Permalink
native_layer: Add support for delaying SliceBegin (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
dflemstr authored Nov 4, 2024
1 parent 6268535 commit 51a25b5
Showing 1 changed file with 99 additions and 19 deletions.
118 changes: 99 additions & 19 deletions crates/layer/src/native_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Builder<'c, W> {
background_poll_timeout: time::Duration,
background_poll_interval: time::Duration,
force_flavor: Option<flavor::Flavor>,
delay_slice_begin: bool,
enable_in_process: bool,
enable_system: bool,
}
Expand All @@ -62,6 +63,7 @@ where
drop_flush_timeout: time::Duration,
drop_poll_timeout: time::Duration,
force_flavor: Option<flavor::Flavor>,
delay_slice_begin: bool,
process_track_uuid: ids::TrackUuid,
process_descriptor_sent: atomic::AtomicBool,
// Mutex is held very briefly for member check and to insert a string if it is missing.
Expand All @@ -77,6 +79,15 @@ where
thread_local_ctxs: thread_local::ThreadLocal<ThreadLocalCtx>,
}

// Does not contain DebugAnnotations; they are shipped separately as a span
// extension
struct DelayedSliceBegin {
timestamp_ns: u64,
meta: &'static tracing::Metadata<'static>,
track_uuid: ids::TrackUuid,
sequence_id: ids::SequenceId,
}

impl<W> NativeLayer<W>
where
W: for<'w> fmt::MakeWriter<'w> + Send + Sync + 'static,
Expand Down Expand Up @@ -137,6 +148,7 @@ where
})?;

let force_flavor = builder.force_flavor;
let delay_slice_begin = builder.delay_slice_begin;
let pid = process::id();
let process_track_uuid = ids::TrackUuid::for_process(pid);
let process_descriptor_sent = atomic::AtomicBool::new(false);
Expand All @@ -153,6 +165,7 @@ where
drop_flush_timeout,
drop_poll_timeout,
force_flavor,
delay_slice_begin,
process_track_uuid,
process_descriptor_sent,
counters_sent,
Expand Down Expand Up @@ -237,6 +250,7 @@ where
debug_annotations: debug_annotations::ProtoDebugAnnotations,
) {
let packet = self.create_slice_begin_track_event_packet(
ffi::trace_time_ns(),
meta,
track_uuid,
sequence_id,
Expand All @@ -253,21 +267,66 @@ where
sequence_id: ids::SequenceId,
extensions: registry::Extensions,
) {
let entered_track_uuid = extensions.get::<ids::TrackUuid>();
let track_uuid = *entered_track_uuid.unwrap_or(&track_uuid);
let entered_sequence_id = extensions.get::<ids::SequenceId>();
let sequence_id = *entered_sequence_id.unwrap_or(&sequence_id);
let track_uuid = extensions
.get::<ids::TrackUuid>()
.copied()
.unwrap_or(track_uuid);
let sequence_id = extensions
.get::<ids::SequenceId>()
.copied()
.unwrap_or(sequence_id);
let debug_annotations = extensions
.get::<debug_annotations::ProtoDebugAnnotations>()
.cloned()
.unwrap_or_default();

if let Some(delayed_slice_begin) = extensions.get::<DelayedSliceBegin>() {
let slice_begin_packet = self.create_slice_begin_track_event_packet(
delayed_slice_begin.timestamp_ns,
delayed_slice_begin.meta,
delayed_slice_begin.track_uuid,
delayed_slice_begin.sequence_id,
debug_annotations.clone(),
);
self.ensure_context_known(delayed_slice_begin.meta);
self.write_packet(delayed_slice_begin.meta, slice_begin_packet);
}

let packet = self.create_slice_end_track_event_packet(meta, track_uuid, sequence_id);
let packet = self.create_slice_end_track_event_packet(
ffi::trace_time_ns(),
meta,
track_uuid,
sequence_id,
debug_annotations,
);
self.ensure_context_known(meta);
self.write_packet(meta, packet);
}

fn report_event(
&self,
meta: &tracing::Metadata,
debug_annotations: debug_annotations::ProtoDebugAnnotations,
track_uuid: ids::TrackUuid,
sequence_id: ids::SequenceId,
) {
let packet = self.create_event_track_event_packet(
ffi::trace_time_ns(),
meta,
debug_annotations,
track_uuid,
sequence_id,
);
self.ensure_context_known(meta);
self.write_packet(meta, packet);
}

fn report_counters(&self, meta: &tracing::Metadata, counters: Vec<debug_annotations::Counter>) {
if !counters.is_empty() {
let timestamp_ns = ffi::trace_time_ns();
self.ensure_counters_known(meta, &counters);
for counter in counters {
let packet = self.create_counter_track_event_packet(counter);
let packet = self.create_counter_track_event_packet(timestamp_ns, counter);
self.write_packet(meta, packet);
}
}
Expand Down Expand Up @@ -494,13 +553,14 @@ where
#[must_use]
fn create_slice_begin_track_event_packet(
&self,
timestamp_ns: u64,
meta: &tracing::Metadata,
track_uuid: ids::TrackUuid,
sequence_id: ids::SequenceId,
debug_annotations: debug_annotations::ProtoDebugAnnotations,
) -> schema::TracePacket {
schema::TracePacket {
timestamp: Some(ffi::trace_time_ns()),
timestamp: Some(timestamp_ns),
optional_trusted_packet_sequence_id: Some(
trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(
sequence_id.as_raw(),
Expand All @@ -521,12 +581,14 @@ where
#[must_use]
fn create_slice_end_track_event_packet(
&self,
timestamp_ns: u64,
meta: &tracing::Metadata,
track_uuid: ids::TrackUuid,
sequence_id: ids::SequenceId,
debug_annotations: debug_annotations::ProtoDebugAnnotations,
) -> schema::TracePacket {
schema::TracePacket {
timestamp: Some(ffi::trace_time_ns()),
timestamp: Some(timestamp_ns),
optional_trusted_packet_sequence_id: Some(
trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(
sequence_id.as_raw(),
Expand All @@ -536,6 +598,7 @@ where
r#type: Some(track_event::Type::SliceEnd as i32),
track_uuid: Some(track_uuid.as_raw()),
name_field: Some(track_event::NameField::Name(meta.name().to_owned())),
debug_annotations: debug_annotations.into_proto(),
source_location_field: Self::source_location_field(meta),
..Default::default()
})),
Expand All @@ -546,13 +609,14 @@ where
#[must_use]
fn create_event_track_event_packet(
&self,
timestamp_ns: u64,
meta: &tracing::Metadata,
mut debug_annotations: debug_annotations::ProtoDebugAnnotations,
debug_annotations: debug_annotations::ProtoDebugAnnotations,
track_uuid: ids::TrackUuid,
sequence_id: ids::SequenceId,
) -> schema::TracePacket {
schema::TracePacket {
timestamp: Some(ffi::trace_time_ns()),
timestamp: Some(timestamp_ns),
optional_trusted_packet_sequence_id: Some(
trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(
sequence_id.as_raw(),
Expand All @@ -573,10 +637,11 @@ where
#[must_use]
fn create_counter_track_event_packet(
&self,
timestamp_ns: u64,
counter: debug_annotations::Counter,
) -> schema::TracePacket {
schema::TracePacket {
timestamp: Some(ffi::trace_time_ns()),
timestamp: Some(timestamp_ns),
optional_trusted_packet_sequence_id: Some(
trace_packet::OptionalTrustedPacketSequenceId::TrustedPacketSequenceId(
ids::SequenceId::for_counter(counter.name).as_raw(),
Expand Down Expand Up @@ -671,10 +736,19 @@ where
let mut debug_annotations = debug_annotations::ProtoDebugAnnotations::default();
attrs.record(&mut debug_annotations);
self.report_counters(meta, debug_annotations.take_counters());
span.extensions_mut().insert(debug_annotations.clone());

span.extensions_mut().insert(debug_annotations.clone());
if flavor == flavor::Flavor::Async {
self.report_slice_begin(meta, track_uuid, sequence_id, debug_annotations);
if self.inner.delay_slice_begin {
span.extensions_mut().insert(DelayedSliceBegin {
timestamp_ns: ffi::trace_time_ns(),
meta,
track_uuid,
sequence_id,
})
} else {
self.report_slice_begin(meta, track_uuid, sequence_id, debug_annotations);
}
}
}

Expand Down Expand Up @@ -702,12 +776,7 @@ where
self.report_counters(meta, debug_annotations.take_counters());

let (track_uuid, sequence_id, _) = self.pick_trace_track_sequence();

self.ensure_context_known(meta);

let packet =
self.create_event_track_event_packet(meta, debug_annotations, track_uuid, sequence_id);
self.write_packet(meta, packet);
self.report_event(meta, debug_annotations, track_uuid, sequence_id);
}

fn on_enter(&self, id: &span::Id, ctx: layer::Context<'_, S>) {
Expand Down Expand Up @@ -768,6 +837,7 @@ where
background_poll_timeout: time::Duration::from_millis(100),
background_poll_interval: time::Duration::from_millis(100),
force_flavor: None,
delay_slice_begin: true,
enable_in_process: true,
enable_system: false,
}
Expand All @@ -780,6 +850,16 @@ where
self
}

/// Delays the emission of `SliceBegin` events.
///
/// This allows more fields to be recorded onto the span before it is
/// written to the trace file, but the trade-off is that spans that never
/// end will not be reported to the trace at all.
pub fn with_delay_slice_begin(mut self, delay_slice_begin: bool) -> Self {
self.delay_slice_begin = delay_slice_begin;
self
}

/// Enable in-process collection, where traces will be collected by buffers
/// in the Perfetto SDK and spilled to file in-process.
pub fn with_enable_in_process(mut self, enable_in_process: bool) -> Self {
Expand Down

0 comments on commit 51a25b5

Please sign in to comment.