From c539a1da6be0aaf72ca652e89278cd9bae32a355 Mon Sep 17 00:00:00 2001 From: Bob Weinand Date: Fri, 7 Jun 2024 15:12:40 +0200 Subject: [PATCH] Skip normalization & obfuscation and coalesce instead (#475) * Skip normalization & obfuscation and coalesce instead Turns out the agent ignores the root_span_tags for v0.7 anyway. This is useful to coalesce chunks with common data, allowing us to actually save us from doing redundant requests. As it was currently, the agentful trace sender always sent one trace per request, instead of merging them. We also skip normalization and rely on the agent to do it, for sake of consistency with other tracers and reduce the potentially duplicated work across tracer and agent. We may change that back in future, but for now we've determined it to be the easiest way to work, also with respect to normalization-unaware testing. Signed-off-by: Bob Weinand * Update trace-utils/src/trace_utils.rs Co-authored-by: Pierre Bonet * Update trace-utils/src/trace_utils.rs Co-authored-by: Pierre Bonet * Update trace-mini-agent/src/trace_processor.rs Co-authored-by: Pierre Bonet * Add comment back Signed-off-by: Bob Weinand --------- Signed-off-by: Bob Weinand Co-authored-by: Pierre Bonet --- data-pipeline/src/trace_exporter.rs | 1 + sidecar/src/service/sidecar_server.rs | 8 ++- trace-mini-agent/src/trace_processor.rs | 1 + trace-utils/Cargo.toml | 3 +- trace-utils/src/trace_utils.rs | 68 ++++++++++++++++++++++--- 5 files changed, 72 insertions(+), 9 deletions(-) diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index cc53333b7..6898fc8d8 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -226,6 +226,7 @@ impl TraceExporter { traces, &header_tags, |_chunk, _root_span_index| {}, + self.endpoint.api_key.is_some(), ); let endpoint = Endpoint { diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index d41250f21..51a7dd74f 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -309,8 +309,12 @@ impl SidecarServer { return; } - let payload = - trace_utils::collect_trace_chunks(traces, &headers, |_chunk, _root_span_index| {}); + let payload = trace_utils::collect_trace_chunks( + traces, + &headers, + |_chunk, _root_span_index| {}, + target.api_key.is_some(), + ); // send trace payload to our trace flusher let data = SendData::new(size, payload, headers, target); diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index 5b2fd5158..3edddca66 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -85,6 +85,7 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscate_span(span, &config.obfuscation_config); } }, + true, // In mini agent, we always send agentless ); let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake); diff --git a/trace-utils/Cargo.toml b/trace-utils/Cargo.toml index 9b74f6360..4ef27bebf 100644 --- a/trace-utils/Cargo.toml +++ b/trace-utils/Cargo.toml @@ -24,11 +24,12 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } rand = "0.8.5" bytes = "1.6.0" # This should only be used for testing. It isn't under dev-dependencies because test-utils can't be under #[cfg(test)]. -httpmock = { version = "0.7.0", optional = true} +httpmock = { version = "0.7.0", optional = true} [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } serde_json = "1.0" +httpmock = { version = "0.7.0"} [features] test-utils = ["httpmock"] \ No newline at end of file diff --git a/trace-utils/src/trace_utils.rs b/trace-utils/src/trace_utils.rs index c29ad2525..18a20d8a2 100644 --- a/trace-utils/src/trace_utils.rs +++ b/trace-utils/src/trace_utils.rs @@ -3,6 +3,7 @@ use hyper::{body::Buf, Body}; use log::{error, info}; +use std::cmp::Ordering; use std::collections::HashMap; pub use crate::send_data::SendData; @@ -79,6 +80,18 @@ pub(crate) fn construct_tracer_payload( } } +fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering { + a.tracer_version + .cmp(&b.tracer_version) + .then(a.language_version.cmp(&b.language_version)) + .then(a.language_name.cmp(&b.language_name)) + .then(a.hostname.cmp(&b.hostname)) + .then(a.container_id.cmp(&b.container_id)) + .then(a.runtime_id.cmp(&b.runtime_id)) + .then(a.env.cmp(&b.env)) + .then(a.app_version.cmp(&b.app_version)) +} + pub fn coalesce_send_data(mut data: Vec) -> Vec { // TODO trace payloads with identical data except for chunk could be merged? @@ -103,6 +116,21 @@ pub fn coalesce_send_data(mut data: Vec) -> Vec { } false }); + // Merge chunks with common properties. Reduces requests for agentful mode. + // And reduces a little bit of data for agentless. + for send_data in data.iter_mut() { + send_data + .tracer_payloads + .sort_unstable_by(cmp_send_data_payloads); + send_data.tracer_payloads.dedup_by(|a, b| { + if cmp_send_data_payloads(a, b) == Ordering::Equal { + // Note: dedup_by drops a, and retains b. + b.chunks.append(&mut a.chunks); + return true; + } + false + }) + } data } @@ -288,10 +316,12 @@ pub fn collect_trace_chunks( mut traces: Vec>, tracer_header_tags: &TracerHeaderTags, process_chunk: impl Fn(&mut TraceChunk, usize), + is_agentless: bool, ) -> pb::TracerPayload { let mut trace_chunks: Vec = Vec::new(); - let mut gathered_root_span_tags = false; + // We'll skip setting the global metadata and rely on the agent to unpack these + let mut gathered_root_span_tags = !is_agentless; let mut root_span_tags = RootSpanTags::default(); for trace in traces.iter_mut() { @@ -309,8 +339,10 @@ pub fn collect_trace_chunks( } }; - if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { - error!("Error normalizing trace chunk: {e}"); + if is_agentless { + if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) { + error!("Error normalizing trace chunk: {e}"); + } } for span in chunk.spans.iter_mut() { @@ -362,7 +394,7 @@ mod tests { use ddcommon::Endpoint; #[test] - fn test_coalescing_does_not_excced_max_size() { + fn test_coalescing_does_not_exceed_max_size() { let dummy = SendData::new( MAX_PAYLOAD_SIZE / 5 + 1, pb::TracerPayload { @@ -371,7 +403,13 @@ mod tests { language_version: "".to_string(), tracer_version: "".to_string(), runtime_id: "".to_string(), - chunks: vec![], + chunks: vec![pb::TraceChunk { + priority: 0, + origin: "".to_string(), + spans: vec![], + tags: Default::default(), + dropped_trace: false, + }], tags: Default::default(), env: "".to_string(), hostname: "".to_string(), @@ -391,9 +429,27 @@ mod tests { 5, coalesced .iter() - .map(|s| s.tracer_payloads.len()) + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .sum::()) .sum::() ); + // assert some chunks are actually coalesced + assert!( + coalesced + .iter() + .map(|s| s + .tracer_payloads + .iter() + .map(|p| p.chunks.len()) + .max() + .unwrap()) + .max() + .unwrap() + > 1 + ); assert!(coalesced.len() > 1 && coalesced.len() < 5); }