diff --git a/Cargo.lock b/Cargo.lock index 483b8a2e7..b40f3af89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1585,8 +1585,11 @@ dependencies = [ "datadog-trace-mini-agent", "datadog-trace-protobuf", "datadog-trace-utils", + "dogstatsd", "env_logger", "log", + "tokio", + "tokio-util 0.7.11", ] [[package]] @@ -3106,7 +3109,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] diff --git a/serverless/Cargo.toml b/serverless/Cargo.toml index d984f4d73..3256b7479 100644 --- a/serverless/Cargo.toml +++ b/serverless/Cargo.toml @@ -9,6 +9,9 @@ env_logger = "0.10.0" datadog-trace-mini-agent = { path = "../trace-mini-agent" } datadog-trace-protobuf = { path = "../trace-protobuf" } datadog-trace-utils = { path = "../trace-utils" } +dogstatsd = { path = "../dogstatsd" } +tokio = { version = "1", features = ["macros", "rt-multi-thread"]} +tokio-util = { version = "0.7", default-features = false } [[bin]] name = "datadog-serverless-trace-mini-agent" diff --git a/serverless/src/main.rs b/serverless/src/main.rs index b0b5acfe8..1c1a96e09 100644 --- a/serverless/src/main.rs +++ b/serverless/src/main.rs @@ -2,19 +2,44 @@ // SPDX-License-Identifier: Apache-2.0 use env_logger::{Builder, Env, Target}; -use log::{error, info}; -use std::{env, sync::Arc}; +use log::{debug, error, info}; +use std::{env, sync::Arc, sync::Mutex}; +use tokio::time::{interval, Duration}; use datadog_trace_mini_agent::{ config, env_verifier, mini_agent, stats_flusher, stats_processor, trace_flusher, trace_processor, }; -pub fn main() { +use dogstatsd::{ + aggregator::Aggregator as MetricsAggregator, + constants::CONTEXTS, + dogstatsd::{DogStatsD, DogStatsDConfig}, + flusher::{build_fqdn_metrics, Flusher}, +}; + +use tokio_util::sync::CancellationToken; + +const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; +const DEFAULT_DOGSTATSD_PORT: u16 = 8125; +const AGENT_HOST: &str = "0.0.0.0"; + +#[tokio::main] +pub async fn main() { let env = Env::new().filter_or("DD_LOG_LEVEL", "info"); Builder::from_env(env).target(Target::Stdout).init(); - info!("Starting serverless trace mini agent"); + let dd_api_key: Option = env::var("DD_API_KEY").ok(); + let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_DOGSTATSD_PORT); + let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); + let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + + debug!("Starting serverless trace mini agent"); let mini_agent_version = env!("CARGO_PKG_VERSION").to_string(); env::set_var("DD_MINI_AGENT_VERSION", mini_agent_version); @@ -44,7 +69,75 @@ pub fn main() { stats_flusher, }); - if let Err(e) = mini_agent.start_mini_agent() { - error!("Error when starting serverless trace mini agent: {e}"); + tokio::spawn(async move { + let res = mini_agent.start_mini_agent().await; + if let Err(e) = res { + error!("Error when starting serverless trace mini agent: {e:?}"); + } + }); + + let mut metrics_flusher = if dd_use_dogstatsd { + debug!("Starting dogstatsd"); + let (_, metrics_flusher) = start_dogstatsd(dd_dogstatsd_port, dd_api_key, dd_site).await; + info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}"); + metrics_flusher + } else { + info!("dogstatsd disabled"); + None + }; + + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + flush_interval.tick().await; // discard first tick, which is instantaneous + + loop { + flush_interval.tick().await; + + if let Some(metrics_flusher) = metrics_flusher.as_mut() { + debug!("Flushing dogstatsd metrics"); + metrics_flusher.flush().await; + } } } + +async fn start_dogstatsd( + port: u16, + dd_api_key: Option, + dd_site: String, +) -> (CancellationToken, Option) { + let metrics_aggr = Arc::new(Mutex::new( + MetricsAggregator::new(Vec::new(), CONTEXTS).expect("Failed to create metrics aggregator"), + )); + + let dogstatsd_config = DogStatsDConfig { + host: AGENT_HOST.to_string(), + port, + }; + let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); + let dogstatsd_client = DogStatsD::new( + &dogstatsd_config, + Arc::clone(&metrics_aggr), + dogstatsd_cancel_token.clone(), + ) + .await; + + tokio::spawn(async move { + dogstatsd_client.spin().await; + }); + + let metrics_flusher = match dd_api_key { + Some(dd_api_key) => { + let metrics_flusher = Flusher::new( + dd_api_key, + Arc::clone(&metrics_aggr), + build_fqdn_metrics(dd_site), + ); + Some(metrics_flusher) + } + None => { + error!("DD_API_KEY not set, won't flush metrics"); + None + } + }; + + (dogstatsd_cancel_token, metrics_flusher) +} diff --git a/trace-mini-agent/src/config.rs b/trace-mini-agent/src/config.rs index 418c6495a..101e2af50 100644 --- a/trace-mini-agent/src/config.rs +++ b/trace-mini-agent/src/config.rs @@ -13,9 +13,12 @@ use datadog_trace_utils::config_utils::{ }; use datadog_trace_utils::trace_utils; +const DEFAULT_DOGSTATSD_PORT: u16 = 8125; + #[derive(Debug)] pub struct Config { pub dd_site: String, + pub dd_dogstatsd_port: u16, pub env_type: trace_utils::EnvironmentType, pub function_name: Option, pub max_request_content_length: usize, @@ -41,6 +44,10 @@ impl Config { anyhow::anyhow!("Unable to identify environment. Shutting down Mini Agent.") })?; + let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT") + .ok() + .and_then(|port| port.parse::().ok()) + .unwrap_or(DEFAULT_DOGSTATSD_PORT); let dd_site = env::var("DD_SITE").unwrap_or_else(|_| "datadoghq.com".to_string()); // construct the trace & trace stats intake urls based on DD_SITE env var (to flush traces & @@ -69,6 +76,7 @@ impl Config { trace_flush_interval: 3, stats_flush_interval: 3, verify_env_timeout: 100, + dd_dogstatsd_port, dd_site, trace_intake: Endpoint { url: hyper::Uri::from_str(&trace_intake_url).unwrap(), @@ -207,4 +215,33 @@ mod tests { env::remove_var("DD_APM_DD_URL"); env::remove_var("K_SERVICE"); } + + #[test] + #[serial] + fn test_default_dogstatsd_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + let config_res = config::Config::new(); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_dogstatsd_port, 8125); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + } + + #[test] + #[serial] + fn test_custom_dogstatsd_port() { + env::set_var("DD_API_KEY", "_not_a_real_key_"); + env::set_var("ASCSVCRT_SPRING__APPLICATION__NAME", "test-spring-app"); + env::set_var("DD_DOGSTATSD_PORT", "18125"); + let config_res = config::Config::new(); + println!("{:?}", config_res); + assert!(config_res.is_ok()); + let config = config_res.unwrap(); + assert_eq!(config.dd_dogstatsd_port, 18125); + env::remove_var("DD_API_KEY"); + env::remove_var("ASCSVCRT_SPRING__APPLICATION__NAME"); + env::remove_var("DD_DOGSTATSD_PORT"); + } } diff --git a/trace-mini-agent/src/mini_agent.rs b/trace-mini-agent/src/mini_agent.rs index e62b53c30..e6e24b5bf 100644 --- a/trace-mini-agent/src/mini_agent.rs +++ b/trace-mini-agent/src/mini_agent.rs @@ -34,7 +34,6 @@ pub struct MiniAgent { } impl MiniAgent { - #[tokio::main] pub async fn start_mini_agent(&self) -> Result<(), Box> { let now = Instant::now(); @@ -168,7 +167,7 @@ impl MiniAgent { ), } } - (_, INFO_ENDPOINT_PATH) => match Self::info_handler() { + (_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) { Ok(res) => Ok(res), Err(err) => log_and_create_http_response( &format!("Info endpoint error: {err}"), @@ -183,7 +182,7 @@ impl MiniAgent { } } - fn info_handler() -> http::Result> { + fn info_handler(dd_dogstatsd_port: u16) -> http::Result> { let response_json = json!( { "endpoints": [ @@ -193,7 +192,7 @@ impl MiniAgent { ], "client_drop_p0s": true, "config": { - "statsd_port": MINI_AGENT_PORT + "statsd_port": dd_dogstatsd_port } } ); diff --git a/trace-mini-agent/src/trace_processor.rs b/trace-mini-agent/src/trace_processor.rs index be1fafb2b..59baca694 100644 --- a/trace-mini-agent/src/trace_processor.rs +++ b/trace-mini-agent/src/trace_processor.rs @@ -168,6 +168,7 @@ mod tests { ..Default::default() }, dd_site: "datadoghq.com".to_string(), + dd_dogstatsd_port: 8125, env_type: trace_utils::EnvironmentType::CloudFunction, os: "linux".to_string(), obfuscation_config: ObfuscationConfig::new().unwrap(),