Skip to content

Commit

Permalink
Skip normalization & obfuscation and coalesce instead
Browse files Browse the repository at this point in the history
Turns out the agent ignores the root_span_tags for v0.7 anyway.
This is useful to coalesce chunks with common data, allowing us to actually save us from doing redundant requests.
As it was currently, the agentful trace sender always sent one trace per request, instead of merging them.

We also skip normalization and rely on the agent to do it, for sake of consistency with other tracers and reduce the potentially duplicated work across tracer and agent. We may change that back in future, but for now we've determined it to be the easiest way to work, also with respect to normalization-unaware testing.

Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Jun 7, 2024
1 parent ce04a4d commit 40e13c3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 10 deletions.
1 change: 1 addition & 0 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl TraceExporter {
traces,
&header_tags,
|_chunk, _root_span_index| {},
self.endpoint.api_key.is_some(),
);

let endpoint = Endpoint {
Expand Down
8 changes: 6 additions & 2 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,12 @@ impl SidecarServer {
return;
}

let payload =
trace_utils::collect_trace_chunks(traces, &headers, |_chunk, _root_span_index| {});
let payload = trace_utils::collect_trace_chunks(
traces,
&headers,
|_chunk, _root_span_index| {},
target.api_key.is_some(),
);

// send trace payload to our trace flusher
let data = SendData::new(size, payload, headers, target);
Expand Down
1 change: 1 addition & 0 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
obfuscate_span(span, &config.obfuscation_config);
}
},
true,
);

let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake);
Expand Down
4 changes: 2 additions & 2 deletions trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ datadog-trace-normalization = { path = "../trace-normalization" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
rand = "0.8.5"
bytes = "1.6.0"
# This should only be used for testing. It isn't under dev-dependencies because test-utils can't be under #[cfg(test)].
httpmock = { version = "0.7.0", optional = true}
httpmock = { version = "0.7.0", optional = true}

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1.0"
httpmock = { version = "0.7.0"}

[features]
test-utils = ["httpmock"]
67 changes: 61 additions & 6 deletions trace-utils/src/trace_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use hyper::{body::Buf, Body};
use log::{error, info};
use std::cmp::Ordering;
use std::collections::HashMap;

pub use crate::send_data::SendData;
Expand Down Expand Up @@ -79,6 +80,18 @@ pub(crate) fn construct_tracer_payload(
}
}

fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
a.tracer_version
.cmp(&b.tracer_version)
.then(a.language_version.cmp(&b.language_version))
.then(a.language_name.cmp(&b.language_name))
.then(a.hostname.cmp(&b.hostname))
.then(a.container_id.cmp(&b.container_id))
.then(a.runtime_id.cmp(&b.runtime_id))
.then(a.env.cmp(&b.env))
.then(a.app_version.cmp(&b.app_version))
}

pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
// TODO trace payloads with identical data except for chunk could be merged?

Expand All @@ -103,6 +116,20 @@ pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
}
false
});
// Merge chunks with common properties. Reduces requests for agentful mode.
// And reduces a little bit of data for agentless.
for send_data in data.iter_mut() {
send_data
.tracer_payloads
.sort_unstable_by(cmp_send_data_payloads);
send_data.tracer_payloads.dedup_by(|a, b| {
if cmp_send_data_payloads(a, b) == Ordering::Equal {
b.chunks.append(&mut a.chunks);
return true;
}
false
})
}
data
}

Expand Down Expand Up @@ -288,10 +315,12 @@ pub fn collect_trace_chunks(
mut traces: Vec<Vec<pb::Span>>,
tracer_header_tags: &TracerHeaderTags,
process_chunk: impl Fn(&mut TraceChunk, usize),
is_agentless: bool,
) -> pb::TracerPayload {
let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();

let mut gathered_root_span_tags = false;
// We'll skip setting the global metadata and reply on the agent to unpack these
let mut gathered_root_span_tags = !is_agentless;
let mut root_span_tags = RootSpanTags::default();

for trace in traces.iter_mut() {
Expand All @@ -309,8 +338,10 @@ pub fn collect_trace_chunks(
}
};

if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
error!("Error normalizing trace chunk: {e}");
if !is_agentless {
if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
error!("Error normalizing trace chunk: {e}");
}
}

for span in chunk.spans.iter_mut() {
Expand Down Expand Up @@ -362,7 +393,7 @@ mod tests {
use ddcommon::Endpoint;

#[test]
fn test_coalescing_does_not_excced_max_size() {
fn test_coalescing_does_not_exceed_max_size() {
let dummy = SendData::new(
MAX_PAYLOAD_SIZE / 5 + 1,
pb::TracerPayload {
Expand All @@ -371,7 +402,13 @@ mod tests {
language_version: "".to_string(),
tracer_version: "".to_string(),
runtime_id: "".to_string(),
chunks: vec![],
chunks: vec![pb::TraceChunk {
priority: 0,
origin: "".to_string(),
spans: vec![],
tags: Default::default(),
dropped_trace: false,
}],
tags: Default::default(),
env: "".to_string(),
hostname: "".to_string(),
Expand All @@ -391,9 +428,27 @@ mod tests {
5,
coalesced
.iter()
.map(|s| s.tracer_payloads.len())
.map(|s| s
.tracer_payloads
.iter()
.map(|p| p.chunks.len())
.sum::<usize>())
.sum::<usize>()
);
// assert some chunks are actually coalesced
assert!(
coalesced
.iter()
.map(|s| s
.tracer_payloads
.iter()
.map(|p| p.chunks.len())
.max()
.unwrap())
.max()
.unwrap()
> 1
);
assert!(coalesced.len() > 1 && coalesced.len() < 5);
}

Expand Down

0 comments on commit 40e13c3

Please sign in to comment.