From d7339d06e20a7ac48203ce72761ecb7892cda03d Mon Sep 17 00:00:00 2001 From: iGxnon Date: Wed, 1 Nov 2023 23:03:58 +0800 Subject: [PATCH] feat: add metrics for sidecar Signed-off-by: iGxnon --- Cargo.lock | 2 ++ sidecar/Cargo.toml | 2 ++ sidecar/src/controller.rs | 72 +++++++++++++++++++++++++++++++++++++++ sidecar/src/routers.rs | 22 ++++++++++++ sidecar/src/sidecar.rs | 38 ++++++++++++++++++--- sidecar/src/utils.rs | 13 +++++++ 6 files changed, 144 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fa61d0e..97ee62fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4132,9 +4132,11 @@ dependencies = [ "axum", "bytes", "clap 4.4.6", + "clippy-utilities 0.2.0", "engine", "futures", "operator-api", + "prometheus", "reqwest", "serde", "tokio", diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index 94bba3d3..036c7da9 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -17,9 +17,11 @@ async-trait = "0.1.68" axum = "0.6.18" bytes = "1.4.0" clap = { version = "4.3.4", features = ["derive"] } +clippy-utilities = "0.2.0" engine = { git = "https://github.com/xline-kv/Xline.git", package = "engine" } futures = "0.3.28" operator-api = { path = "../operator-api" } +prometheus = "0.13.3" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0.130", features = ["derive"] } tokio = { version = "1.0", features = [ diff --git a/sidecar/src/controller.rs b/sidecar/src/controller.rs index cfd3fa9e..5e671d1c 100644 --- a/sidecar/src/controller.rs +++ b/sidecar/src/controller.rs @@ -7,12 +7,14 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use operator_api::registry::Registry; +use prometheus::{Histogram, HistogramOpts, IntCounter, IntCounterVec, Opts}; use tokio::select; use tokio::sync::{Mutex, RwLock}; use tokio::time::{interval, MissedTickBehavior}; use tracing::{debug, error, info}; use crate::types::{MemberConfig, State, StatePayload, StateStatus}; +use crate::utils::exponential_time_bucket; use crate::xline::XlineHandle; /// Sidecar operator controller @@ -25,10 +27,68 @@ pub(crate) struct Controller { handle: Arc>, /// Check interval reconcile_interval: Duration, + /// Controller metrics + metrics: ControllerMetrics, /// Configuration Registry registry: Arc, } +/// Controller metrics +pub(crate) struct ControllerMetrics { + /// Reconcile duration histogram + reconcile_duration: Histogram, + /// Reconcile failed count + reconcile_failed_count: IntCounterVec, + /// Xline restart count + restart_count: IntCounter, + /// Seed cluster count + seed_count: IntCounter, +} + +impl ControllerMetrics { + /// New a controller metrics + #[allow(clippy::expect_used)] + pub(crate) fn new() -> Self { + Self { + reconcile_duration: Histogram::with_opts( + HistogramOpts::new( + "sidecar_reconcile_duration_seconds", + "Duration of sidecar reconcile loop in seconds", + ) + .buckets(exponential_time_bucket(0.1, 2.0, 10)), + ) + .expect("failed to create sidecar_reconcile_duration_seconds histogram"), + reconcile_failed_count: IntCounterVec::new( + Opts::new( + "sidecar_reconcile_failed_count", + "Number of failed times the sidecar reconcile loop has run", + ), + &["reason"], + ) + .expect("failed to create sidecar_reconcile_failed_count counter"), + restart_count: IntCounter::new( + "sidecar_restart_xline_count", + "Number of how many times the xline restarts by this sidecar", + ) + .expect("failed to create sidecar_restart_xline_count counter"), + seed_count: IntCounter::new( + "sidecar_seed_count", + "Number of how many times the sidecar seeds the cluster", + ) + .expect("failed to create sidecar_seed_count counter"), + } + } + + /// Register the metrics into registry + pub(crate) fn register(&self, registry: &prometheus::Registry) -> Result<()> { + registry.register(Box::new(self.reconcile_duration.clone()))?; + registry.register(Box::new(self.reconcile_failed_count.clone()))?; + registry.register(Box::new(self.restart_count.clone()))?; + registry.register(Box::new(self.seed_count.clone()))?; + Ok(()) + } +} + impl Debug for Controller { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Controller") @@ -46,6 +106,7 @@ impl Controller { state: Arc>, handle: Arc>, reconcile_interval: Duration, + metrics: ControllerMetrics, registry: Arc, ) -> Self { Self { @@ -53,6 +114,7 @@ impl Controller { state, handle, reconcile_interval, + metrics, registry, } } @@ -92,6 +154,7 @@ impl Controller { ..init_member_config }; + self.metrics.restart_count.inc(); self.handle .write() .await @@ -99,6 +162,7 @@ impl Controller { .await?; loop { + let timer = self.metrics.reconcile_duration.start_timer(); let instant = tick.tick().await; let config = match self @@ -116,12 +180,17 @@ impl Controller { if let Err(err) = self.reconcile_once(&member_config).await { error!("reconcile failed, error: {err}"); + self.metrics + .reconcile_failed_count + .with_label_values(&[&err.to_string()]) + .inc(); continue; } debug!( "successfully reconcile the cluster states within {:?}", instant.elapsed() ); + drop(timer); } } @@ -149,6 +218,7 @@ impl Controller { (true, false) => { self.set_state(State::Pending).await; + self.metrics.restart_count.inc(); info!("status: cluster healthy + xline not running, joining the cluster"); handle.start(&xline_members).await?; } @@ -191,6 +261,8 @@ impl Controller { info!( "status: cluster unhealthy + xline not running + all start + seeder, seed cluster" ); + self.metrics.restart_count.inc(); + self.metrics.seed_count.inc(); handle.start(&xline_members).await?; } } diff --git a/sidecar/src/routers.rs b/sidecar/src/routers.rs index fdcb3261..4156bf97 100644 --- a/sidecar/src/routers.rs +++ b/sidecar/src/routers.rs @@ -2,12 +2,34 @@ use std::sync::Arc; use axum::http::StatusCode; use axum::{Extension, Json}; +use prometheus::Encoder; use tokio::sync::{Mutex, RwLock}; +use tracing::error; use crate::types::{MembershipChange, StatePayload}; use crate::utils::{check_backup_volume, check_data_volume}; use crate::xline::XlineHandle; +/// metrics handler +#[allow(clippy::unused_async)] // require by axum +pub(crate) async fn metrics(Extension(registry): Extension) -> String { + let mut buf1 = Vec::new(); + let encoder = prometheus::TextEncoder::new(); + let metric_families = registry.gather(); + if let Err(err) = encoder.encode(&metric_families, &mut buf1) { + error!("failed to encode custom metrics: {}", err); + return String::new(); + } + let mut res = String::from_utf8(buf1).unwrap_or_default(); + let mut buf2 = Vec::new(); + if let Err(err) = encoder.encode(&prometheus::gather(), &mut buf2) { + error!("failed to encode prometheus metrics: {}", err); + return String::new(); + } + res.push_str(&String::from_utf8_lossy(&buf2)); + res +} + /// Return the current health condition according to the current node's storage volume and network status /// The network status is verified upon returning the HTTP response. #[allow(clippy::unused_async)] // This is required in axum diff --git a/sidecar/src/sidecar.rs b/sidecar/src/sidecar.rs index 6a512459..1576ca61 100644 --- a/sidecar/src/sidecar.rs +++ b/sidecar/src/sidecar.rs @@ -2,12 +2,13 @@ use std::net::ToSocketAddrs; use std::sync::Arc; use anyhow::{anyhow, Result}; -use axum::routing::{get, post}; +use axum::routing::{any, get, post}; use axum::{Extension, Router}; use futures::{FutureExt, TryFutureExt}; use operator_api::consts::{SIDECAR_BACKUP_ROUTE, SIDECAR_HEALTH_ROUTE, SIDECAR_STATE_ROUTE}; use operator_api::registry::{DummyRegistry, HttpRegistry, K8sStsRegistry, Registry}; use operator_api::{HeartbeatStatus, K8sXlineHandle, LocalXlineHandle}; +use prometheus::{Histogram, HistogramOpts}; use tokio::sync::watch::{Receiver, Sender}; use tokio::sync::{Mutex, RwLock}; use tokio::time::{interval, MissedTickBehavior}; @@ -15,9 +16,10 @@ use tracing::{debug, error, info, warn}; use crate::backup::pv::Pv; use crate::backup::Provider; -use crate::controller::Controller; +use crate::controller::{Controller, ControllerMetrics}; use crate::routers; use crate::types::{BackendConfig, BackupConfig, Config, RegistryConfig, State, StatePayload}; +use crate::utils::exponential_time_bucket; use crate::xline::XlineHandle; /// Sidecar @@ -59,19 +61,26 @@ impl Sidecar { self.config.cluster_name.clone(), )), }; + let metrics_registry = prometheus::Registry::new(); self.start_controller( Arc::clone(&handle), + &metrics_registry, Arc::clone(®istry), Arc::clone(&state), graceful_shutdown_event.subscribe(), - ); + )?; + self.start_heartbeat( + &metrics_registry, + registry, + graceful_shutdown_event.subscribe(), + )?; self.start_web_server( Arc::clone(&handle), Arc::clone(&state), + metrics_registry, graceful_shutdown_event.subscribe(), )?; - self.start_heartbeat(registry, graceful_shutdown_event.subscribe())?; tokio::pin!(forceful_shutdown); @@ -145,9 +154,18 @@ impl Sidecar { /// Start heartbeat fn start_heartbeat( &self, + metrics_registry: &prometheus::Registry, registry: Arc, graceful_shutdown: Receiver<()>, ) -> Result<()> { + let heartbeat_histogram = Histogram::with_opts( + HistogramOpts::new( + "sidecar_heartbeat_duration_histogram", + "Duration of sidecar heartbeat loop in seconds", + ) + .buckets(exponential_time_bucket(0.1, 2.0, 10)), + )?; + metrics_registry.register(Box::new(heartbeat_histogram.clone()))?; let Some(monitor) = self.config.monitor.clone() else { info!("monitor did not set, disable heartbeat"); return Ok(()); @@ -171,6 +189,7 @@ impl Sidecar { // ensure a fixed heartbeat interval tick.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { + let timer = heartbeat_histogram.start_timer(); let instant = tick.tick().await; let config = match registry @@ -197,6 +216,7 @@ impl Sidecar { if let Err(e) = status.report(&monitor.monitor_addr).await { error!("heartbeat report failed, error {e}"); } + drop(timer); } }; @@ -215,15 +235,19 @@ impl Sidecar { fn start_controller( &self, handle: Arc>, + metrics_registry: &prometheus::Registry, registry: Arc, state: Arc>, graceful_shutdown: Receiver<()>, - ) { + ) -> Result<()> { + let metrics = ControllerMetrics::new(); + metrics.register(metrics_registry)?; let controller = Controller::new( self.config.name.clone(), state, handle, self.config.reconcile_interval, + metrics, registry, ); let init_member_config = self.config.init_member.clone(); @@ -238,6 +262,7 @@ impl Sidecar { info!("controller shutdown"); } }); + Ok(()) } /// Run a web server to expose current state to other sidecar operators and k8s @@ -245,6 +270,7 @@ impl Sidecar { &self, handle: Arc>, state: Arc>, + metrics_registry: prometheus::Registry, graceful_shutdown: Receiver<()>, ) -> Result<()> { let members = self.config.init_member.sidecar_members(); @@ -260,8 +286,10 @@ impl Sidecar { .route(SIDECAR_HEALTH_ROUTE, get(routers::health)) .route(SIDECAR_BACKUP_ROUTE, get(routers::backup)) .route(SIDECAR_STATE_ROUTE, get(routers::state)) + .route("/metrics", any(routers::metrics)) .route("/membership", post(routers::membership)) .layer(Extension(handle)) + .layer(Extension(metrics_registry)) .layer(Extension(state)); debug!("web server listen addr: {addr}"); diff --git a/sidecar/src/utils.rs b/sidecar/src/utils.rs index fa08cf1e..8b38560f 100644 --- a/sidecar/src/utils.rs +++ b/sidecar/src/utils.rs @@ -1,6 +1,9 @@ use std::fs::{read_to_string, remove_file, write}; +use std::iter::repeat; +use std::ops::Mul; use std::path::Path; +use clippy_utilities::NumericCast; use operator_api::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR}; /// Check if the volume under the path is working fine @@ -50,6 +53,16 @@ pub(crate) fn check_backup_volume() -> bool { check_volume(backup_volume) } +/// Returns a vector of time buckets for the reconcile duration histogram. +#[allow(clippy::as_conversions)] // count will not too large +pub(crate) fn exponential_time_bucket(start: f64, factor: f64, count: usize) -> Vec { + repeat(factor) + .enumerate() + .take(count) + .map(|(i, f)| start.mul(f.powi(i.numeric_cast()))) + .collect::>() +} + #[cfg(test)] mod test { use super::*;