Skip to content

Commit

Permalink
[APMSP-1350] Add dropped-p0 headers (#695)
Browse files Browse the repository at this point in the history
* Add dropped-p0 headers

* Add default for missing fields
APMSP-1350
  • Loading branch information
VianneyRuhlmann authored Oct 29, 2024
1 parent 22c2da5 commit 490a276
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 18 deletions.
38 changes: 23 additions & 15 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ fn add_path(url: &Uri, path: &str) -> Uri {
Uri::from_parts(parts).unwrap()
}

struct DroppedP0Counts {
pub dropped_p0_traces: usize,
pub dropped_p0_spans: usize,
}

/// Remove spans and chunks only keeping the ones that may be sampled by the agent
fn drop_chunks(traces: &mut Vec<Vec<pb::Span>>) {
fn drop_chunks(traces: &mut Vec<Vec<pb::Span>>) -> DroppedP0Counts {
let mut dropped_p0_traces = 0;
let mut dropped_p0_spans = 0;
traces.retain_mut(|chunk| {
// List of spans to keep even if the chunk is dropped
let mut sampled_indexes = Vec::new();
Expand All @@ -128,8 +135,10 @@ fn drop_chunks(traces: &mut Vec<Vec<pb::Span>>) {
sampled_indexes.push(index);
}
}
dropped_p0_spans += chunk.len() - sampled_indexes.len();
if sampled_indexes.is_empty() {
// If no spans were sampled we can drop the whole chunk
dropped_p0_traces += 1;
return false;
}
let sampled_spans = sampled_indexes
Expand All @@ -138,7 +147,11 @@ fn drop_chunks(traces: &mut Vec<Vec<pb::Span>>) {
.collect();
*chunk = sampled_spans;
true
})
});
DroppedP0Counts {
dropped_p0_traces,
dropped_p0_spans,
}
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -437,15 +450,6 @@ impl TraceExporter {
)
}

fn get_headers(&self) -> TracerHeaderTags<'_> {
let mut headers: TracerHeaderTags = self.metadata.borrow().into();
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
headers.client_computed_top_level = true;
headers.client_computed_stats = true;
};
headers
}

fn send_data_to_url(
&self,
data: &[u8],
Expand All @@ -462,7 +466,7 @@ impl TraceExporter {
)
.method(Method::POST);

let headers: HashMap<&'static str, String> = self.get_headers().into();
let headers: HashMap<&'static str, String> = self.metadata.borrow().into();

for (key, value) in &headers {
req_builder = req_builder.header(*key, value);
Expand Down Expand Up @@ -597,6 +601,8 @@ impl TraceExporter {
None,
);

let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();

// Stats computation
if let StatsComputationStatus::Enabled { .. } = &**self.client_side_stats.load() {
if !self.client_computed_top_level {
Expand All @@ -607,11 +613,13 @@ impl TraceExporter {
self.add_spans_to_stats(traces.iter().flat_map(|trace| trace.iter()));
// Once stats have been computed we can drop all chunks that are not going to be
// sampled by the agent
drop_chunks(&mut traces);
let dropped_counts = drop_chunks(&mut traces);
header_tags.client_computed_top_level = true;
header_tags.client_computed_stats = true;
header_tags.dropped_p0_traces = dropped_counts.dropped_p0_traces;
header_tags.dropped_p0_spans = dropped_counts.dropped_p0_spans;
}

let header_tags: TracerHeaderTags<'_> = self.get_headers();

match self.output_format {
TraceExporterOutputFormat::V04 => rmp_serde::to_vec_named(&traces)
.map_err(|err| {
Expand Down
1 change: 1 addition & 0 deletions sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ impl<'a> TryInto<SerializedTracerHeaderTags> for &'a TracerHeaderTags<'a> {
container_id: &self.container_id.to_utf8_lossy(),
client_computed_top_level: self.client_computed_top_level,
client_computed_stats: self.client_computed_stats,
..Default::default()
};

tags.try_into().map_err(|_| {
Expand Down
4 changes: 4 additions & 0 deletions sidecar/src/service/serialized_tracer_header_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct SerializedTracerHeaderTags {
/// container_id: "1234567890",
/// client_computed_top_level: true,
/// client_computed_stats: false,
/// ..Default::default()
/// };
///
/// let serialized: SerializedTracerHeaderTags = tracer_header_tags.try_into().unwrap();
Expand Down Expand Up @@ -73,6 +74,7 @@ impl<'a> TryFrom<&'a SerializedTracerHeaderTags> for TracerHeaderTags<'a> {
/// container_id: "1234567890",
/// client_computed_top_level: true,
/// client_computed_stats: false,
/// ..Default::default()
/// };
///
/// let serialized: Result<SerializedTracerHeaderTags, _> = tracer_header_tags.try_into();
Expand Down Expand Up @@ -106,6 +108,7 @@ mod tests {
container_id: "1234567890",
client_computed_top_level: true,
client_computed_stats: false,
..Default::default()
};

let serialized: Result<SerializedTracerHeaderTags, _> = tracer_header_tags.try_into();
Expand All @@ -124,6 +127,7 @@ mod tests {
container_id: "1234567890",
client_computed_top_level: true,
client_computed_stats: false,
..Default::default()
};

let data = bincode::serialize(&tracer_header_tags).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions trace-utils/src/send_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ mod tests {
container_id: "id",
client_computed_top_level: false,
client_computed_stats: false,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};

fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
Expand Down
43 changes: 42 additions & 1 deletion trace-utils/src/tracer_header_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct TracerHeaderTags<'a> {
// specifies whether the client has computed stats so that the agent doesn't have to. Any
// non-empty value will mean 'yes'.
pub client_computed_stats: bool,
// number of trace chunks dropped in the tracer
pub dropped_p0_traces: usize,
// number of spans dropped in the tracer
pub dropped_p0_spans: usize,
}

impl<'a> From<TracerHeaderTags<'a>> for HashMap<&'static str, String> {
Expand Down Expand Up @@ -71,6 +75,22 @@ impl<'a> From<TracerHeaderTags<'a>> for HashMap<&'static str, String> {
String::new()
},
),
(
"datadog-client-dropped-p0-traces",
if tags.dropped_p0_traces > 0 {
tags.dropped_p0_traces.to_string()
} else {
String::new()
},
),
(
"datadog-client-dropped-p0-spans",
if tags.dropped_p0_spans > 0 {
tags.dropped_p0_spans.to_string()
} else {
String::new()
},
),
]);
headers.retain(|_, v| !v.is_empty());
headers
Expand All @@ -97,6 +117,16 @@ impl<'a> From<&'a HeaderMap<HeaderValue>> for TracerHeaderTags<'a> {
if headers.get("datadog-client-computed-stats").is_some() {
tags.client_computed_stats = true;
}
if let Some(count) = headers.get("datadog-client-dropped-p0-traces") {
tags.dropped_p0_traces = count
.to_str()
.unwrap_or_default()
.parse()
.unwrap_or_default();
}
if let Some(count) = headers.get("datadog-client-dropped-p0-spans") {
tags.dropped_p0_spans = count.to_str().map_or(0, |c| c.parse().unwrap_or(0));
}
tags
}
}
Expand All @@ -117,11 +147,13 @@ mod tests {
container_id: "id",
client_computed_top_level: true,
client_computed_stats: true,
dropped_p0_traces: 12,
dropped_p0_spans: 120,
};

let map: HashMap<&'static str, String> = header_tags.into();

assert_eq!(map.len(), 8);
assert_eq!(map.len(), 10);
assert_eq!(map.get("datadog-meta-lang").unwrap(), "test-lang");
assert_eq!(map.get("datadog-meta-lang-version").unwrap(), "2.0");
assert_eq!(
Expand All @@ -139,6 +171,8 @@ mod tests {
"true"
);
assert_eq!(map.get("datadog-client-computed-stats").unwrap(), "true");
assert_eq!(map.get("datadog-client-dropped-p0-traces").unwrap(), "12");
assert_eq!(map.get("datadog-client-dropped-p0-spans").unwrap(), "120");
}
#[test]
fn tags_to_hashmap_empty_value() {
Expand All @@ -151,6 +185,8 @@ mod tests {
container_id: "",
client_computed_top_level: false,
client_computed_stats: false,
dropped_p0_spans: 0,
dropped_p0_traces: 0,
};

let map: HashMap<&'static str, String> = header_tags.into();
Expand All @@ -170,6 +206,8 @@ mod tests {
assert_eq!(map.get("datadog-container-id"), None);
assert_eq!(map.get("datadog-client-computed-top-level"), None);
assert_eq!(map.get("datadog-client-computed-stats"), None);
assert_eq!(map.get("datadog-client-dropped-p0-traces"), None);
assert_eq!(map.get("datadog-client-dropped-p0-spans"), None);
}

#[test]
Expand All @@ -189,6 +227,7 @@ mod tests {
header_map.insert("datadog-meta-tracer-version", "1.0".parse().unwrap());
header_map.insert("datadog-container-id", "id".parse().unwrap());
header_map.insert("datadog-client-computed-stats", "true".parse().unwrap());
header_map.insert("datadog-client-dropped-p0-traces", "12".parse().unwrap());

let tags: TracerHeaderTags = (&header_map).into();

Expand All @@ -200,5 +239,7 @@ mod tests {
assert_eq!(tags.container_id, "id");
assert!(tags.client_computed_stats);
assert!(!tags.client_computed_top_level);
assert_eq!(tags.dropped_p0_traces, 12);
assert_eq!(tags.dropped_p0_spans, 0);
}
}
3 changes: 1 addition & 2 deletions trace-utils/tests/test_send_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ mod tracing_integration_tests {
lang_vendor: "vendor",
tracer_version: "1.0",
container_id: "id",
client_computed_top_level: false,
client_computed_stats: false,
..Default::default()
};

let endpoint = Endpoint::from_url(
Expand Down

0 comments on commit 490a276

Please sign in to comment.