Skip to content

Commit

Permalink
[APMSP-1512] Add metadata headers for stats (#712)
Browse files Browse the repository at this point in the history
* Add metadata headers for stats

* Add commit_sha and interpreter_vendor to metadata

* Add doc for TraceExporterBuilder

* Rename version to app_version
APMSP-1512
  • Loading branch information
VianneyRuhlmann authored Nov 8, 2024
1 parent 6943925 commit 573e678
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 85 deletions.
2 changes: 1 addition & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
.set_language_interpreter(language_interpreter.to_utf8_lossy().as_ref())
.set_hostname(hostname.to_utf8_lossy().as_ref())
.set_env(env.to_utf8_lossy().as_ref())
.set_version(version.to_utf8_lossy().as_ref())
.set_app_version(version.to_utf8_lossy().as_ref())
.set_service(service.to_utf8_lossy().as_ref())
.set_input_format(input_format)
.set_output_format(output_format)
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
.set_url("http://localhost:8126")
.set_hostname("test")
.set_env("testing")
.set_version(env!("CARGO_PKG_VERSION"))
.set_app_version(env!("CARGO_PKG_VERSION"))
.set_service("data-pipeline-test")
.set_tracer_version(env!("CARGO_PKG_VERSION"))
.set_language("rust")
Expand Down
76 changes: 35 additions & 41 deletions data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
borrow::Borrow,
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
Expand All @@ -10,38 +12,22 @@ use std::{
};

use datadog_trace_protobuf::pb;
use ddcommon::{connector, tag::Tag, Endpoint};
use ddcommon::{connector, Endpoint};
use hyper;
use tokio::select;
use tokio_util::sync::CancellationToken;

use crate::span_concentrator::SpanConcentrator;
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};

const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";

/// Metadata required in a ClientStatsPayload
#[derive(Debug, Default, Clone)]
pub struct LibraryMetadata {
pub hostname: String,
pub env: String,
pub version: String,
pub lang: String,
pub tracer_version: String,
pub runtime_id: String,
pub service: String,
pub container_id: String,
pub git_commit_sha: String,
/// Should be left empty by client, except for some specific environment
pub tags: Vec<Tag>,
}

/// An exporter that concentrates and sends stats to the agent
#[derive(Debug)]
pub struct StatsExporter {
flush_interval: time::Duration,
concentrator: Arc<Mutex<SpanConcentrator>>,
endpoint: Endpoint,
meta: LibraryMetadata,
meta: TracerMetadata,
sequence_id: AtomicU64,
client: ddcommon::HttpClient,
cancellation_token: CancellationToken,
Expand All @@ -52,14 +38,14 @@ impl StatsExporter {
///
/// - `flush_interval` the interval on which the concentrator is flushed
/// - `concentrator` SpanConcentrator storing the stats to be sent to the agent
/// - `meta` the metadata used when sending the ClientStatsPayload to the agent
/// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent
/// - `endpoint` the Endpoint used to send stats to the agent
/// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the
/// concentrator
pub fn new(
flush_interval: time::Duration,
concentrator: Arc<Mutex<SpanConcentrator>>,
meta: LibraryMetadata,
meta: TracerMetadata,
endpoint: Endpoint,
cancellation_token: CancellationToken,
) -> Self {
Expand Down Expand Up @@ -95,15 +81,23 @@ impl StatsExporter {
return Ok(());
}
let body = rmp_serde::encode::to_vec_named(&payload)?;
let req = self

let headers: HashMap<&'static str, String> = self.meta.borrow().into();

let mut req_builder = self
.endpoint
.into_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))?
.header(
hyper::header::CONTENT_TYPE,
ddcommon::header::APPLICATION_MSGPACK,
)
.method(hyper::Method::POST)
.body(hyper::Body::from(body))?;
.method(hyper::Method::POST);

for (key, value) in &headers {
req_builder = req_builder.header(*key, value);
}

let req = req_builder.body(hyper::Body::from(body))?;

let resp = self.client.request(req).await?;

Expand All @@ -128,7 +122,7 @@ impl StatsExporter {
fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
encode_stats_payload(
self.meta.clone(),
self.meta.borrow(),
sequence,
self.concentrator
.lock()
Expand Down Expand Up @@ -158,24 +152,24 @@ impl StatsExporter {
}

fn encode_stats_payload(
meta: LibraryMetadata,
meta: &TracerMetadata,
sequence: u64,
buckets: Vec<pb::ClientStatsBucket>,
) -> pb::ClientStatsPayload {
pb::ClientStatsPayload {
hostname: meta.hostname,
env: meta.env,
lang: meta.lang,
version: meta.version,
runtime_id: meta.runtime_id,
tracer_version: meta.tracer_version,
service: meta.service,
container_id: meta.container_id,
git_commit_sha: meta.git_commit_sha,
tags: meta.tags.into_iter().map(|t| t.to_string()).collect(),
hostname: meta.hostname.clone(),
env: meta.env.clone(),
lang: meta.language.clone(),
version: meta.app_version.clone(),
runtime_id: meta.runtime_id.clone(),
tracer_version: meta.tracer_version.clone(),
service: meta.service.clone(),
sequence,
stats: buckets,
// Agent-only field
git_commit_sha: meta.git_commit_sha.clone(),
// These fields will be set by the Agent
container_id: String::new(),
tags: Vec::new(),
agent_aggregation: String::new(),
image_tag: String::new(),
}
Expand Down Expand Up @@ -211,12 +205,12 @@ mod tests {
let _ = is_sync::<StatsExporter>;
}

fn get_test_metadata() -> LibraryMetadata {
LibraryMetadata {
fn get_test_metadata() -> TracerMetadata {
TracerMetadata {
hostname: "libdatadog-test".into(),
env: "test".into(),
version: "0.0.0".into(),
lang: "rust".into(),
app_version: "0.0.0".into(),
language: "rust".into(),
tracer_version: "0.0.0".into(),
runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
service: "stats_exporter_test".into(),
Expand Down
Loading

0 comments on commit 573e678

Please sign in to comment.