Skip to content

Commit

Permalink
[Serverless Mini Agent] Use DogStatsD in Serverless (#616)
Browse files Browse the repository at this point in the history
* use dogstatsd in serverless

* convert env var value to lowercase and nest dogstatsd logic

* refactor to move loop outside of conditional dogstatsd block

* add environment variables for hardcoded values and refactor start_dogstatsd

* fix unit test

* add env var for default dogstatsd port
  • Loading branch information
duncanpharvey authored Sep 13, 2024
1 parent 661432f commit dea9035
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 11 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ env_logger = "0.10.0"
datadog-trace-mini-agent = { path = "../trace-mini-agent" }
datadog-trace-protobuf = { path = "../trace-protobuf" }
datadog-trace-utils = { path = "../trace-utils" }
dogstatsd = { path = "../dogstatsd" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"]}
tokio-util = { version = "0.7", default-features = false }

[[bin]]
name = "datadog-serverless-trace-mini-agent"
Expand Down
105 changes: 99 additions & 6 deletions serverless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,44 @@
// SPDX-License-Identifier: Apache-2.0

use env_logger::{Builder, Env, Target};
use log::{error, info};
use std::{env, sync::Arc};
use log::{debug, error, info};
use std::{env, sync::Arc, sync::Mutex};
use tokio::time::{interval, Duration};

use datadog_trace_mini_agent::{
config, env_verifier, mini_agent, stats_flusher, stats_processor, trace_flusher,
trace_processor,
};

pub fn main() {
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
constants::CONTEXTS,
dogstatsd::{DogStatsD, DogStatsDConfig},
flusher::{build_fqdn_metrics, Flusher},
};

use tokio_util::sync::CancellationToken;

const DOGSTATSD_FLUSH_INTERVAL: u64 = 10;
const DEFAULT_DOGSTATSD_PORT: u16 = 8125;
const AGENT_HOST: &str = "0.0.0.0";

#[tokio::main]
pub async fn main() {
let env = Env::new().filter_or("DD_LOG_LEVEL", "info");
Builder::from_env(env).target(Target::Stdout).init();

info!("Starting serverless trace mini agent");
let dd_api_key: Option<String> = env::var("DD_API_KEY").ok();
let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT")
.ok()
.and_then(|port| port.parse::<u16>().ok())
.unwrap_or(DEFAULT_DOGSTATSD_PORT);
let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string());
let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD")
.map(|val| val.to_lowercase() != "false")
.unwrap_or(true);

debug!("Starting serverless trace mini agent");

let mini_agent_version = env!("CARGO_PKG_VERSION").to_string();
env::set_var("DD_MINI_AGENT_VERSION", mini_agent_version);
Expand Down Expand Up @@ -44,7 +69,75 @@ pub fn main() {
stats_flusher,
});

if let Err(e) = mini_agent.start_mini_agent() {
error!("Error when starting serverless trace mini agent: {e}");
tokio::spawn(async move {
let res = mini_agent.start_mini_agent().await;
if let Err(e) = res {
error!("Error when starting serverless trace mini agent: {e:?}");
}
});

let mut metrics_flusher = if dd_use_dogstatsd {
debug!("Starting dogstatsd");
let (_, metrics_flusher) = start_dogstatsd(dd_dogstatsd_port, dd_api_key, dd_site).await;
info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}");
metrics_flusher
} else {
info!("dogstatsd disabled");
None
};

let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL));
flush_interval.tick().await; // discard first tick, which is instantaneous

loop {
flush_interval.tick().await;

if let Some(metrics_flusher) = metrics_flusher.as_mut() {
debug!("Flushing dogstatsd metrics");
metrics_flusher.flush().await;
}
}
}

async fn start_dogstatsd(
port: u16,
dd_api_key: Option<String>,
dd_site: String,
) -> (CancellationToken, Option<Flusher>) {
let metrics_aggr = Arc::new(Mutex::new(
MetricsAggregator::new(Vec::new(), CONTEXTS).expect("Failed to create metrics aggregator"),
));

let dogstatsd_config = DogStatsDConfig {
host: AGENT_HOST.to_string(),
port,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
let dogstatsd_client = DogStatsD::new(
&dogstatsd_config,
Arc::clone(&metrics_aggr),
dogstatsd_cancel_token.clone(),
)
.await;

tokio::spawn(async move {
dogstatsd_client.spin().await;
});

let metrics_flusher = match dd_api_key {
Some(dd_api_key) => {
let metrics_flusher = Flusher::new(
dd_api_key,
Arc::clone(&metrics_aggr),
build_fqdn_metrics(dd_site),
);
Some(metrics_flusher)
}
None => {
error!("DD_API_KEY not set, won't flush metrics");
None
}
};

(dogstatsd_cancel_token, metrics_flusher)
}
37 changes: 37 additions & 0 deletions trace-mini-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ use datadog_trace_utils::config_utils::{
};
use datadog_trace_utils::trace_utils;

const DEFAULT_DOGSTATSD_PORT: u16 = 8125;

#[derive(Debug)]
pub struct Config {
pub dd_site: String,
pub dd_dogstatsd_port: u16,
pub env_type: trace_utils::EnvironmentType,
pub function_name: Option<String>,
pub max_request_content_length: usize,
Expand All @@ -41,6 +44,10 @@ impl Config {
anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.")
})?;

let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT")
.ok()
.and_then(|port| port.parse::<u16>().ok())
.unwrap_or(DEFAULT_DOGSTATSD_PORT);
let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string());

// construct the trace & trace stats intake urls based on DD_SITE env var (to flush traces &
Expand Down Expand Up @@ -69,6 +76,7 @@ impl Config {
trace_flush_interval: 3,
stats_flush_interval: 3,
verify_env_timeout: 100,
dd_dogstatsd_port,
dd_site,
trace_intake: Endpoint {
url: hyper::Uri::from_str(&trace_intake_url).unwrap(),
Expand Down Expand Up @@ -207,4 +215,33 @@ mod tests {
env::remove_var("DD_APM_DD_URL");
env::remove_var("K_SERVICE");
}

#[test]
#[serial]
fn test_default_dogstatsd_port() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
let config_res = config::Config::new();
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dd_dogstatsd_port, 8125);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
}

#[test]
#[serial]
fn test_custom_dogstatsd_port() {
env::set_var("DD_API_KEY", "_not_a_real_key_");
env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app");
env::set_var("DD_DOGSTATSD_PORT", "18125");
let config_res = config::Config::new();
println!("{:?}", config_res);
assert!(config_res.is_ok());
let config = config_res.unwrap();
assert_eq!(config.dd_dogstatsd_port, 18125);
env::remove_var("DD_API_KEY");
env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME");
env::remove_var("DD_DOGSTATSD_PORT");
}
}
7 changes: 3 additions & 4 deletions trace-mini-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub struct MiniAgent {
}

impl MiniAgent {
#[tokio::main]
pub async fn start_mini_agent(&self) -> Result<(), Box<dyn std::error::Error>> {
let now = Instant::now();

Expand Down Expand Up @@ -168,7 +167,7 @@ impl MiniAgent {
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler() {
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!("Info endpoint error: {err}"),
Expand All @@ -183,7 +182,7 @@ impl MiniAgent {
}
}

fn info_handler() -> http::Result<Response<Body>> {
fn info_handler(dd_dogstatsd_port: u16) -> http::Result<Response<Body>> {
let response_json = json!(
{
"endpoints": [
Expand All @@ -193,7 +192,7 @@ impl MiniAgent {
],
"client_drop_p0s": true,
"config": {
"statsd_port": MINI_AGENT_PORT
"statsd_port": dd_dogstatsd_port
}
}
);
Expand Down
1 change: 1 addition & 0 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mod tests {
..Default::default()
},
dd_site: "datadoghq.com".to_string(),
dd_dogstatsd_port: 8125,
env_type: trace_utils::EnvironmentType::CloudFunction,
os: "linux".to_string(),
obfuscation_config: ObfuscationConfig::new().unwrap(),
Expand Down

0 comments on commit dea9035

Please sign in to comment.