diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index cc53333b7..6898fc8d8 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -226,6 +226,7 @@ impl TraceExporter { traces, &header_tags, |_chunk, _root_span_index| {}, + self.endpoint.api_key.is_some(), ); let endpoint = Endpoint { diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index d41250f21..51a7dd74f 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -309,8 +309,12 @@ impl SidecarServer { return; } - let payload = - trace_utils::collect_trace_chunks(traces, &headers, |_chunk, _root_span_index| {}); + let payload = trace_utils::collect_trace_chunks( + traces, + &headers, + |_chunk, _root_span_index| {}, + target.api_key.is_some(), + ); // send trace payload to our trace flusher let data = SendData::new(size, payload, headers, target); diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 5b2fd5158..5ac49d969 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -85,6 +85,7 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscate_span(span, &config.obfuscation_config); } }, + true, ); let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 9b74f6360..cb69a4271 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -23,12 +23,12 @@ datadog-trace-normalization = { path = "../trace-normalization" } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } rand = "0.8.5" bytes = "1.6.0" -# This should only be used for testing. It isn't under dev-dependencies because test-utils can't be under #[cfg(test)]. -httpmock = { version = "0.7.0", optional = true} +httpmock = { version = "0.7.0", optional = true} [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } serde_json = "1.0" +httpmock = { version = "0.7.0"} [features] test-utils = ["httpmock"] \ No newline at end of file diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index c29ad2525..4d8056fdb 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -3,6 +3,7 @@ use hyper::{body::Buf, Body}; use log::{error, info}; +use std::cmp::Ordering; use std::collections::HashMap; pub use crate::send_data::SendData; @@ -79,6 +80,18 @@ pub(crate) fn construct_tracer_payload( } } +fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering { + a.tracer_version + .cmp(&b.tracer_version) + .then(a.language_version.cmp(&b.language_version)) + .then(a.language_name.cmp(&b.language_name)) + .then(a.hostname.cmp(&b.hostname)) + .then(a.container_id.cmp(&b.container_id)) + .then(a.runtime_id.cmp(&b.runtime_id)) + .then(a.env.cmp(&b.env)) + .then(a.app_version.cmp(&b.app_version)) +} + pub fn coalesce_send_data(mut data: Vec) -> Vec { // TODO trace payloads with identical data except for chunk could be merged? @@ -103,6 +116,20 @@ pub fn coalesce_send_data(mut data: Vec) -> Vec { } false }); + // Merge chunks with common properties. Reduces requests for agentful mode. + // And reduces a little bit of data for agentless. + for send_data in data.iter_mut() { + send_data + .tracer_payloads + .sort_unstable_by(cmp_send_data_payloads); + send_data.tracer_payloads.dedup_by(|a, b| { + if cmp_send_data_payloads(a, b) == Ordering::Equal { + b.chunks.append(&mut a.chunks); + return true; + } + false + }) + } data } @@ -288,10 +315,12 @@ pub fn collect_trace_chunks( mut traces: Vec>, tracer_header_tags: &TracerHeaderTags, process_chunk: impl Fn(&mut TraceChunk, usize), + is_agentless: bool, ) -> pb::TracerPayload { let mut trace_chunks: Vec = Vec::new(); - let mut gathered_root_span_tags = false; + // We'll skip setting the global metadata and reply on the agent to unpack these + let mut gathered_root_span_tags = !is_agentless; let mut root_span_tags = RootSpanTags::default(); for trace in traces.iter_mut() { @@ -309,8 +338,10 @@ pub fn collect_trace_chunks( } }; - if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { - error!("Error normalizing trace chunk: {e}"); + if !is_agentless { + if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { + error!("Error normalizing trace chunk: {e}"); + } } for span in chunk.spans.iter_mut() { @@ -362,7 +393,7 @@ mod tests { use ddcommon::Endpoint; #[test] - fn test_coalescing_does_not_excced_max_size() { + fn test_coalescing_does_not_exceed_max_size() { let dummy = SendData::new( MAX_PAYLOAD_SIZE / 5 + 1, pb::TracerPayload { @@ -371,7 +402,13 @@ mod tests { language_version: "".to_string(), tracer_version: "".to_string(), runtime_id: "".to_string(), - chunks: vec![], + chunks: vec![pb::TraceChunk { + priority: 0, + origin: "".to_string(), + spans: vec![], + tags: Default::default(), + dropped_trace: false, + }], tags: Default::default(), env: "".to_string(), hostname: "".to_string(), @@ -391,9 +428,27 @@ mod tests { 5, coalesced .iter() - .map(|s| s.tracer_payloads.len()) + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .sum::()) .sum::() ); + // assert some chunks are actually coalesced + assert!( + coalesced + .iter() + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .max() + .unwrap()) + .max() + .unwrap() + > 1 + ); assert!(coalesced.len() > 1 && coalesced.len() < 5); }