Skip to content

Commit

Permalink
Add metrics for trace_chunks_sent, trace_chunks_dropped and trace_api…
Browse files Browse the repository at this point in the history
….bytes.

* Rework send_request method in order to return a meaningful enum in
  which pertinent information about metrics and response is held. This
  favours that all metrics are managed in the same place regardless if
  the trace sent either to the intake or through the agent.

* Limit the visibility for SendData fields in order to imporve their
  encapsulation.

* Add default trait implementation for SendData.

* Add support for the new metrics in the TraceFlusher service.
  • Loading branch information
hoolioh committed May 30, 2024
1 parent 5128ad6 commit 872637c
Show file tree
Hide file tree
Showing 6 changed files with 752 additions and 97 deletions.
46 changes: 45 additions & 1 deletion sidecar/src/self_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct MetricData<'a> {
trace_api_requests: ContextKey,
trace_api_responses: ContextKey,
trace_api_errors: ContextKey,
trace_api_bytes: ContextKey,
trace_chunk_sent: ContextKey,
trace_chunk_dropped: ContextKey,
}
impl<'a> MetricData<'a> {
async fn send(&self, key: ContextKey, value: f64, tags: Vec<Tag>) {
Expand Down Expand Up @@ -68,7 +71,6 @@ impl<'a> MetricData<'a> {
vec![Tag::new("level", level.as_str().to_lowercase()).unwrap()],
));
}

if trace_metrics.api_requests > 0 {
futures.push(self.send(
self.trace_api_requests,
Expand Down Expand Up @@ -97,6 +99,27 @@ impl<'a> MetricData<'a> {
vec![Tag::new("type", "status_code").unwrap()],
));
}
if trace_metrics.bytes_sent > 0 {
futures.push(self.send(
self.trace_api_bytes,
trace_metrics.bytes_sent as f64,
vec![],
));
}
if trace_metrics.chunks_sent > 0 {
futures.push(self.send(
self.trace_chunk_sent,
trace_metrics.chunks_sent as f64,
vec![],
));
}
if trace_metrics.chunks_dropped > 0 {
futures.push(self.send(
self.trace_chunk_dropped,
trace_metrics.chunks_dropped as f64,
vec![],
));
}
for (status_code, count) in &trace_metrics.api_responses_count_per_code {
futures.push(self.send(
self.trace_api_responses,
Expand Down Expand Up @@ -218,6 +241,27 @@ impl SelfTelemetry {
true,
MetricNamespace::Tracers,
),
trace_api_bytes: worker.register_metric_context(
"trace_api_bytes".to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
trace_chunk_sent: worker.register_metric_context(
"trace_chunk_sent".to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
trace_chunk_dropped: worker.register_metric_context(
"trace_chunk_dropped".to_string(),
vec![],
MetricType::Count,
true,
MetricNamespace::Tracers,
),
};

let _ = worker
Expand Down
10 changes: 8 additions & 2 deletions sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct TraceFlusherMetrics {
pub api_errors_timeout: u64,
pub api_errors_network: u64,
pub api_errors_status_code: u64,
pub bytes_sent: u64,
pub chunks_sent: u64,
pub chunks_dropped: u64,
}

impl TraceFlusherMetrics {
Expand All @@ -69,6 +72,9 @@ impl TraceFlusherMetrics {
self.api_errors_timeout += result.errors_timeout;
self.api_errors_network += result.errors_network;
self.api_errors_status_code += result.errors_status_code;
self.bytes_sent += result.bytes_sent;
self.chunks_sent += result.chunks_sent;
self.chunks_dropped += result.chunks_dropped;

for (status_code, count) in &result.responses_count_per_code {
*self
Expand Down Expand Up @@ -113,7 +119,7 @@ impl TraceFlusher {
let mut flush_data = self.inner.lock().unwrap();
let flush_data = flush_data.deref_mut();

flush_data.traces.send_data_size += data.size;
flush_data.traces.send_data_size += data.len();

if flush_data.traces.send_data_size
> self.min_force_drop_size_bytes.load(Ordering::Relaxed) as usize
Expand Down Expand Up @@ -233,7 +239,7 @@ impl TraceFlusher {
let mut futures: Vec<_> = Vec::new();
let mut intake_target: Vec<_> = Vec::new();
for send_data in send_data {
intake_target.push(send_data.target.clone());
intake_target.push(send_data.get_target().clone());
futures.push(send_data.send());
}
for (endpoint, response) in zip(intake_target, join_all(futures).await) {
Expand Down
1 change: 1 addition & 0 deletions trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ rand = "0.8.5"
bytes = "1.6.0"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
httpmock = "0.7.0"
serde_json = "1.0"
Loading

0 comments on commit 872637c

Please sign in to comment.