Skip to content

Commit

Permalink
Skip normalization & obfuscation and coalesce instead (#475)
Browse files Browse the repository at this point in the history
* Skip normalization & obfuscation and coalesce instead

Turns out the agent ignores the root_span_tags for v0.7 anyway.
This is useful to coalesce chunks with common data, allowing us to actually save us from doing redundant requests.
As it was currently, the agentful trace sender always sent one trace per request, instead of merging them.

We also skip normalization and rely on the agent to do it, for sake of consistency with other tracers and reduce the potentially duplicated work across tracer and agent. We may change that back in future, but for now we've determined it to be the easiest way to work, also with respect to normalization-unaware testing.

Signed-off-by: Bob Weinand <[email protected]>

* Update trace-utils/src/trace_utils.rs

Co-authored-by: Pierre Bonet <[email protected]>

* Update trace-utils/src/trace_utils.rs

Co-authored-by: Pierre Bonet <[email protected]>

* Update trace-mini-agent/src/trace_processor.rs

Co-authored-by: Pierre Bonet <[email protected]>

* Add comment back

Signed-off-by: Bob Weinand <[email protected]>

---------

Signed-off-by: Bob Weinand <[email protected]>
Co-authored-by: Pierre Bonet <[email protected]>
  • Loading branch information
bwoebi and pierotibou authored Jun 7, 2024
1 parent ce04a4d commit c539a1d
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 9 deletions.
1 change: 1 addition & 0 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl TraceExporter {
traces,
&header_tags,
|_chunk, _root_span_index| {},
self.endpoint.api_key.is_some(),
);

let endpoint = Endpoint {
Expand Down
8 changes: 6 additions & 2 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,12 @@ impl SidecarServer {
return;
}

let payload =
trace_utils::collect_trace_chunks(traces, &headers, |_chunk, _root_span_index| {});
let payload = trace_utils::collect_trace_chunks(
traces,
&headers,
|_chunk, _root_span_index| {},
target.api_key.is_some(),
);

// send trace payload to our trace flusher
let data = SendData::new(size, payload, headers, target);
Expand Down
1 change: 1 addition & 0 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
obfuscate_span(span, &config.obfuscation_config);
}
},
true, // In mini agent, we always send agentless
);

let send_data = SendData::new(body_size, payload, tracer_header_tags, &config.trace_intake);
Expand Down
3 changes: 2 additions & 1 deletion trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
rand = "0.8.5"
bytes = "1.6.0"
# This should only be used for testing. It isn't under dev-dependencies because test-utils can't be under #[cfg(test)].
httpmock = { version = "0.7.0", optional = true}
httpmock = { version = "0.7.0", optional = true}

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1.0"
httpmock = { version = "0.7.0"}

[features]
test-utils = ["httpmock"]
68 changes: 62 additions & 6 deletions trace-utils/src/trace_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use hyper::{body::Buf, Body};
use log::{error, info};
use std::cmp::Ordering;
use std::collections::HashMap;

pub use crate::send_data::SendData;
Expand Down Expand Up @@ -79,6 +80,18 @@ pub(crate) fn construct_tracer_payload(
}
}

fn cmp_send_data_payloads(a: &pb::TracerPayload, b: &pb::TracerPayload) -> Ordering {
a.tracer_version
.cmp(&b.tracer_version)
.then(a.language_version.cmp(&b.language_version))
.then(a.language_name.cmp(&b.language_name))
.then(a.hostname.cmp(&b.hostname))
.then(a.container_id.cmp(&b.container_id))
.then(a.runtime_id.cmp(&b.runtime_id))
.then(a.env.cmp(&b.env))
.then(a.app_version.cmp(&b.app_version))
}

pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
// TODO trace payloads with identical data except for chunk could be merged?

Expand All @@ -103,6 +116,21 @@ pub fn coalesce_send_data(mut data: Vec<SendData>) -> Vec<SendData> {
}
false
});
// Merge chunks with common properties. Reduces requests for agentful mode.
// And reduces a little bit of data for agentless.
for send_data in data.iter_mut() {
send_data
.tracer_payloads
.sort_unstable_by(cmp_send_data_payloads);
send_data.tracer_payloads.dedup_by(|a, b| {
if cmp_send_data_payloads(a, b) == Ordering::Equal {
// Note: dedup_by drops a, and retains b.
b.chunks.append(&mut a.chunks);
return true;
}
false
})
}
data
}

Expand Down Expand Up @@ -288,10 +316,12 @@ pub fn collect_trace_chunks(
mut traces: Vec<Vec<pb::Span>>,
tracer_header_tags: &TracerHeaderTags,
process_chunk: impl Fn(&mut TraceChunk, usize),
is_agentless: bool,
) -> pb::TracerPayload {
let mut trace_chunks: Vec<pb::TraceChunk> = Vec::new();

let mut gathered_root_span_tags = false;
// We'll skip setting the global metadata and rely on the agent to unpack these
let mut gathered_root_span_tags = !is_agentless;
let mut root_span_tags = RootSpanTags::default();

for trace in traces.iter_mut() {
Expand All @@ -309,8 +339,10 @@ pub fn collect_trace_chunks(
}
};

if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
error!("Error normalizing trace chunk: {e}");
if is_agentless {
if let Err(e) = normalizer::normalize_chunk(&mut chunk, root_span_index) {
error!("Error normalizing trace chunk: {e}");
}
}

for span in chunk.spans.iter_mut() {
Expand Down Expand Up @@ -362,7 +394,7 @@ mod tests {
use ddcommon::Endpoint;

#[test]
fn test_coalescing_does_not_excced_max_size() {
fn test_coalescing_does_not_exceed_max_size() {
let dummy = SendData::new(
MAX_PAYLOAD_SIZE / 5 + 1,
pb::TracerPayload {
Expand All @@ -371,7 +403,13 @@ mod tests {
language_version: "".to_string(),
tracer_version: "".to_string(),
runtime_id: "".to_string(),
chunks: vec![],
chunks: vec![pb::TraceChunk {
priority: 0,
origin: "".to_string(),
spans: vec![],
tags: Default::default(),
dropped_trace: false,
}],
tags: Default::default(),
env: "".to_string(),
hostname: "".to_string(),
Expand All @@ -391,9 +429,27 @@ mod tests {
5,
coalesced
.iter()
.map(|s| s.tracer_payloads.len())
.map(|s| s
.tracer_payloads
.iter()
.map(|p| p.chunks.len())
.sum::<usize>())
.sum::<usize>()
);
// assert some chunks are actually coalesced
assert!(
coalesced
.iter()
.map(|s| s
.tracer_payloads
.iter()
.map(|p| p.chunks.len())
.max()
.unwrap())
.max()
.unwrap()
> 1
);
assert!(coalesced.len() > 1 && coalesced.len() < 5);
}

Expand Down

0 comments on commit c539a1d

Please sign in to comment.