From eae0df0a4759e025dc1d0d9e74a758bf607effd1 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Fri, 7 Jun 2024 12:56:05 +0200 Subject: [PATCH] Skip normalization & obfuscation and coalesce instead Turns out the agent ignores the root_span_tags for v0.7 anyway. This is useful to coalesce chunks with common data, allowing us to actually save us from doing redundant requests. As it was currently, the agentful trace sender always sent one trace per request, instead of merging them. We also skip normalization and rely on the agent to do it, for sake of consistency with other tracers and reduce the potentially duplicated work across tracer and agent. We may change that back in future, but for now we've determined it to be the easiest way to work, also with respect to normalization-unaware testing. Signed-off-by: Bob Weinand --- data-pipeline/src/trace_exporter.rs | 1 + sidecar/src/service/sidecar_server.rs | 8 ++- trace-mini-agent/src/trace_processor.rs | 1 + trace-utils/Cargo.toml | 4 +- trace-utils/src/trace_utils.rs | 67 ++++++++++++++++++++++--- 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index cc53333b7..6898fc8d8 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -226,6 +226,7 @@ impl TraceExporter { traces, &header_tags, |_chunk, _root_span_index| {}, + self.endpoint.api_key.is_some(), ); let endpoint = Endpoint { diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index d41250f21..51a7dd74f 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -309,8 +309,12 @@ impl SidecarServer { return; } - let payload = - trace_utils::collect_trace_chunks(traces, &headers, |_chunk, _root_span_index| {}); + let payload = trace_utils::collect_trace_chunks( + traces, + &headers, + |_chunk, _root_span_index| {}, + target.api_key.is_some(), + ); // send trace payload to our trace flusher let data = SendData::new(size, payload, headers, target); diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 5b2fd5158..5ac49d969 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -85,6 +85,7 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscate_span(span, &config.obfuscation_config); } }, + true, ); let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 9b74f6360..cb69a4271 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -23,12 +23,12 @@ datadog-trace-normalization = { path = "../trace-normalization" } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } rand = "0.8.5" bytes = "1.6.0" -# This should only be used for testing. It isn't under dev-dependencies because test-utils can't be under #[cfg(test)]. -httpmock = { version = "0.7.0", optional = true} +httpmock = { version = "0.7.0", optional = true} [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } serde_json = "1.0" +httpmock = { version = "0.7.0"} [features] test-utils = ["httpmock"] \ No newline at end of file diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index c29ad2525..cf14d00bd 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -3,6 +3,7 @@ use hyper::{body::Buf, Body}; use log::{error, info}; +use std::cmp::Ordering; use std::collections::HashMap; pub use crate::send_data::SendData; @@ -79,6 +80,18 @@ pub(crate) fn construct_tracer_payload( } } +fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering { + a.tracer_version + .cmp(&b.tracer_version) + .then(a.language_version.cmp(&b.language_version)) + .then(a.language_name.cmp(&b.language_name)) + .then(a.hostname.cmp(&b.hostname)) + .then(a.container_id.cmp(&b.container_id)) + .then(a.runtime_id.cmp(&b.runtime_id)) + .then(a.env.cmp(&b.env)) + .then(a.app_version.cmp(&b.app_version)) +} + pub fn coalesce_send_data(mut data: Vec) -> Vec { // TODO trace payloads with identical data except for chunk could be merged? @@ -103,6 +116,20 @@ pub fn coalesce_send_data(mut data: Vec) -> Vec { } false }); + // Merge chunks with common properties. Reduces requests for agentful mode. + // And reduces a little bit of data for agentless. + for send_data in data.iter_mut() { + send_data + .tracer_payloads + .sort_unstable_by(cmp_send_data_payloads); + send_data.tracer_payloads.dedup_by(|a, b| { + if cmp_send_data_payloads(a, b) == Ordering::Equal { + b.chunks.append(&mut a.chunks); + return true; + } + false + }) + } data } @@ -288,10 +315,12 @@ pub fn collect_trace_chunks( mut traces: Vec>, tracer_header_tags: &TracerHeaderTags, process_chunk: impl Fn(&mut TraceChunk, usize), + is_agentless: bool, ) -> pb::TracerPayload { let mut trace_chunks: Vec = Vec::new(); - let mut gathered_root_span_tags = false; + // We'll skip setting the global metadata and reply on the agent to unpack these + let mut gathered_root_span_tags = !is_agentless; let mut root_span_tags = RootSpanTags::default(); for trace in traces.iter_mut() { @@ -309,8 +338,10 @@ pub fn collect_trace_chunks( } }; - if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { - error!("Error normalizing trace chunk: {e}"); + if is_agentless { + if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { + error!("Error normalizing trace chunk: {e}"); + } } for span in chunk.spans.iter_mut() { @@ -362,7 +393,7 @@ mod tests { use ddcommon::Endpoint; #[test] - fn test_coalescing_does_not_excced_max_size() { + fn test_coalescing_does_not_exceed_max_size() { let dummy = SendData::new( MAX_PAYLOAD_SIZE / 5 + 1, pb::TracerPayload { @@ -371,7 +402,13 @@ mod tests { language_version: "".to_string(), tracer_version: "".to_string(), runtime_id: "".to_string(), - chunks: vec![], + chunks: vec![pb::TraceChunk { + priority: 0, + origin: "".to_string(), + spans: vec![], + tags: Default::default(), + dropped_trace: false, + }], tags: Default::default(), env: "".to_string(), hostname: "".to_string(), @@ -391,9 +428,27 @@ mod tests { 5, coalesced .iter() - .map(|s| s.tracer_payloads.len()) + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .sum::()) .sum::() ); + // assert some chunks are actually coalesced + assert!( + coalesced + .iter() + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .max() + .unwrap()) + .max() + .unwrap() + > 1 + ); assert!(coalesced.len() > 1 && coalesced.len() < 5); }