Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APMSP-1512] Add metadata headers for stats #712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
.set_language_interpreter(language_interpreter.to_utf8_lossy().as_ref())
.set_hostname(hostname.to_utf8_lossy().as_ref())
.set_env(env.to_utf8_lossy().as_ref())
.set_version(version.to_utf8_lossy().as_ref())
.set_app_version(version.to_utf8_lossy().as_ref())
.set_service(service.to_utf8_lossy().as_ref())
.set_input_format(input_format)
.set_output_format(output_format)
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {
.set_url("http://localhost:8126")
.set_hostname("test")
.set_env("testing")
.set_version(env!("CARGO_PKG_VERSION"))
.set_app_version(env!("CARGO_PKG_VERSION"))
.set_service("data-pipeline-test")
.set_tracer_version(env!("CARGO_PKG_VERSION"))
.set_language("rust")
Expand Down
76 changes: 35 additions & 41 deletions data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
borrow::Borrow,
collections::HashMap,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
Expand All @@ -10,38 +12,22 @@ use std::{
};

use datadog_trace_protobuf::pb;
use ddcommon::{connector, tag::Tag, Endpoint};
use ddcommon::{connector, Endpoint};
use hyper;
use tokio::select;
use tokio_util::sync::CancellationToken;

use crate::span_concentrator::SpanConcentrator;
use crate::{span_concentrator::SpanConcentrator, trace_exporter::TracerMetadata};

const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";

/// Metadata required in a ClientStatsPayload
#[derive(Debug, Default, Clone)]
pub struct LibraryMetadata {
pub hostname: String,
pub env: String,
pub version: String,
pub lang: String,
pub tracer_version: String,
pub runtime_id: String,
pub service: String,
pub container_id: String,
pub git_commit_sha: String,
/// Should be left empty by client, except for some specific environment
pub tags: Vec<Tag>,
}

/// An exporter that concentrates and sends stats to the agent
#[derive(Debug)]
pub struct StatsExporter {
flush_interval: time::Duration,
concentrator: Arc<Mutex<SpanConcentrator>>,
endpoint: Endpoint,
meta: LibraryMetadata,
meta: TracerMetadata,
sequence_id: AtomicU64,
client: ddcommon::HttpClient,
cancellation_token: CancellationToken,
Expand All @@ -52,14 +38,14 @@ impl StatsExporter {
///
/// - `flush_interval` the interval on which the concentrator is flushed
/// - `concentrator` SpanConcentrator storing the stats to be sent to the agent
/// - `meta` the metadata used when sending the ClientStatsPayload to the agent
/// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent
/// - `endpoint` the Endpoint used to send stats to the agent
/// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the
/// concentrator
pub fn new(
flush_interval: time::Duration,
concentrator: Arc<Mutex<SpanConcentrator>>,
meta: LibraryMetadata,
meta: TracerMetadata,
endpoint: Endpoint,
cancellation_token: CancellationToken,
) -> Self {
Expand Down Expand Up @@ -95,15 +81,23 @@ impl StatsExporter {
return Ok(());
}
let body = rmp_serde::encode::to_vec_named(&payload)?;
let req = self

let headers: HashMap<&'static str, String> = self.meta.borrow().into();

let mut req_builder = self
.endpoint
.into_request_builder(concat!("Libdatadog/", env!("CARGO_PKG_VERSION")))?
.header(
hyper::header::CONTENT_TYPE,
ddcommon::header::APPLICATION_MSGPACK,
)
.method(hyper::Method::POST)
.body(hyper::Body::from(body))?;
.method(hyper::Method::POST);

for (key, value) in &headers {
req_builder = req_builder.header(*key, value);
}

let req = req_builder.body(hyper::Body::from(body))?;

let resp = self.client.request(req).await?;

Expand All @@ -128,7 +122,7 @@ impl StatsExporter {
fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
encode_stats_payload(
self.meta.clone(),
self.meta.borrow(),
sequence,
self.concentrator
.lock()
Expand Down Expand Up @@ -158,24 +152,24 @@ impl StatsExporter {
}

fn encode_stats_payload(
meta: LibraryMetadata,
meta: &TracerMetadata,
sequence: u64,
buckets: Vec<pb::ClientStatsBucket>,
) -> pb::ClientStatsPayload {
pb::ClientStatsPayload {
hostname: meta.hostname,
env: meta.env,
lang: meta.lang,
version: meta.version,
runtime_id: meta.runtime_id,
tracer_version: meta.tracer_version,
service: meta.service,
container_id: meta.container_id,
git_commit_sha: meta.git_commit_sha,
tags: meta.tags.into_iter().map(|t| t.to_string()).collect(),
hostname: meta.hostname.clone(),
env: meta.env.clone(),
lang: meta.language.clone(),
version: meta.app_version.clone(),
runtime_id: meta.runtime_id.clone(),
tracer_version: meta.tracer_version.clone(),
service: meta.service.clone(),
sequence,
stats: buckets,
// Agent-only field
git_commit_sha: meta.git_commit_sha.clone(),
// These fields will be set by the Agent
container_id: String::new(),
tags: Vec::new(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 From trace-agent perspective

agent_aggregation: String::new(),
image_tag: String::new(),
}
Expand Down Expand Up @@ -211,12 +205,12 @@ mod tests {
let _ = is_sync::<StatsExporter>;
}

fn get_test_metadata() -> LibraryMetadata {
LibraryMetadata {
fn get_test_metadata() -> TracerMetadata {
TracerMetadata {
hostname: "libdatadog-test".into(),
env: "test".into(),
version: "0.0.0".into(),
lang: "rust".into(),
app_version: "0.0.0".into(),
language: "rust".into(),
tracer_version: "0.0.0".into(),
runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
service: "stats_exporter_test".into(),
Expand Down
Loading
Loading