Skip to content

Commit

Permalink
feat: add metrics for sidecar
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Nov 1, 2023
1 parent 65e1528 commit d7339d0
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sidecar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ async-trait = "0.1.68"
axum = "0.6.18"
bytes = "1.4.0"
clap = { version = "4.3.4", features = ["derive"] }
clippy-utilities = "0.2.0"
engine = { git = "https://github.com/xline-kv/Xline.git", package = "engine" }
futures = "0.3.28"
operator-api = { path = "../operator-api" }
prometheus = "0.13.3"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0.130", features = ["derive"] }
tokio = { version = "1.0", features = [
Expand Down
72 changes: 72 additions & 0 deletions sidecar/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use std::time::Duration;

use anyhow::{anyhow, Result};
use operator_api::registry::Registry;
use prometheus::{Histogram, HistogramOpts, IntCounter, IntCounterVec, Opts};
use tokio::select;
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, MissedTickBehavior};
use tracing::{debug, error, info};

use crate::types::{MemberConfig, State, StatePayload, StateStatus};
use crate::utils::exponential_time_bucket;
use crate::xline::XlineHandle;

/// Sidecar operator controller
Expand All @@ -25,10 +27,68 @@ pub(crate) struct Controller {
handle: Arc<RwLock<XlineHandle>>,
/// Check interval
reconcile_interval: Duration,
/// Controller metrics
metrics: ControllerMetrics,
/// Configuration Registry
registry: Arc<dyn Registry + Sync + Send>,
}

/// Controller metrics
pub(crate) struct ControllerMetrics {
/// Reconcile duration histogram
reconcile_duration: Histogram,
/// Reconcile failed count
reconcile_failed_count: IntCounterVec,
/// Xline restart count
restart_count: IntCounter,
/// Seed cluster count
seed_count: IntCounter,
}

impl ControllerMetrics {
/// New a controller metrics
#[allow(clippy::expect_used)]
pub(crate) fn new() -> Self {
Self {
reconcile_duration: Histogram::with_opts(
HistogramOpts::new(
"sidecar_reconcile_duration_seconds",
"Duration of sidecar reconcile loop in seconds",
)
.buckets(exponential_time_bucket(0.1, 2.0, 10)),
)
.expect("failed to create sidecar_reconcile_duration_seconds histogram"),
reconcile_failed_count: IntCounterVec::new(
Opts::new(
"sidecar_reconcile_failed_count",
"Number of failed times the sidecar reconcile loop has run",
),
&["reason"],
)
.expect("failed to create sidecar_reconcile_failed_count counter"),
restart_count: IntCounter::new(
"sidecar_restart_xline_count",
"Number of how many times the xline restarts by this sidecar",
)
.expect("failed to create sidecar_restart_xline_count counter"),
seed_count: IntCounter::new(
"sidecar_seed_count",
"Number of how many times the sidecar seeds the cluster",
)
.expect("failed to create sidecar_seed_count counter"),
}
}

/// Register the metrics into registry
pub(crate) fn register(&self, registry: &prometheus::Registry) -> Result<()> {
registry.register(Box::new(self.reconcile_duration.clone()))?;
registry.register(Box::new(self.reconcile_failed_count.clone()))?;
registry.register(Box::new(self.restart_count.clone()))?;
registry.register(Box::new(self.seed_count.clone()))?;
Ok(())
}
}

impl Debug for Controller {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Controller")
Expand All @@ -46,13 +106,15 @@ impl Controller {
state: Arc<Mutex<StatePayload>>,
handle: Arc<RwLock<XlineHandle>>,
reconcile_interval: Duration,
metrics: ControllerMetrics,
registry: Arc<dyn Registry + Sync + Send>,
) -> Self {
Self {
name,
state,
handle,
reconcile_interval,
metrics,
registry,
}
}
Expand Down Expand Up @@ -92,13 +154,15 @@ impl Controller {
..init_member_config
};

self.metrics.restart_count.inc();
self.handle
.write()
.await
.start(&member_config.xline_members())
.await?;

loop {
let timer = self.metrics.reconcile_duration.start_timer();
let instant = tick.tick().await;

let config = match self
Expand All @@ -116,12 +180,17 @@ impl Controller {

if let Err(err) = self.reconcile_once(&member_config).await {
error!("reconcile failed, error: {err}");
self.metrics
.reconcile_failed_count
.with_label_values(&[&err.to_string()])
.inc();
continue;
}
debug!(
"successfully reconcile the cluster states within {:?}",
instant.elapsed()
);
drop(timer);
}
}

Expand Down Expand Up @@ -149,6 +218,7 @@ impl Controller {
(true, false) => {
self.set_state(State::Pending).await;

self.metrics.restart_count.inc();
info!("status: cluster healthy + xline not running, joining the cluster");
handle.start(&xline_members).await?;
}
Expand Down Expand Up @@ -191,6 +261,8 @@ impl Controller {
info!(
"status: cluster unhealthy + xline not running + all start + seeder, seed cluster"
);
self.metrics.restart_count.inc();
self.metrics.seed_count.inc();
handle.start(&xline_members).await?;
}
}
Expand Down
22 changes: 22 additions & 0 deletions sidecar/src/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,34 @@ use std::sync::Arc;

use axum::http::StatusCode;
use axum::{Extension, Json};
use prometheus::Encoder;
use tokio::sync::{Mutex, RwLock};
use tracing::error;

use crate::types::{MembershipChange, StatePayload};
use crate::utils::{check_backup_volume, check_data_volume};
use crate::xline::XlineHandle;

/// metrics handler
#[allow(clippy::unused_async)] // require by axum
pub(crate) async fn metrics(Extension(registry): Extension<prometheus::Registry>) -> String {
let mut buf1 = Vec::new();
let encoder = prometheus::TextEncoder::new();
let metric_families = registry.gather();
if let Err(err) = encoder.encode(&metric_families, &mut buf1) {
error!("failed to encode custom metrics: {}", err);
return String::new();
}
let mut res = String::from_utf8(buf1).unwrap_or_default();
let mut buf2 = Vec::new();
if let Err(err) = encoder.encode(&prometheus::gather(), &mut buf2) {
error!("failed to encode prometheus metrics: {}", err);
return String::new();
}
res.push_str(&String::from_utf8_lossy(&buf2));
res
}

/// Return the current health condition according to the current node's storage volume and network status
/// The network status is verified upon returning the HTTP response.
#[allow(clippy::unused_async)] // This is required in axum
Expand Down
38 changes: 33 additions & 5 deletions sidecar/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use axum::routing::{get, post};
use axum::routing::{any, get, post};
use axum::{Extension, Router};
use futures::{FutureExt, TryFutureExt};
use operator_api::consts::{SIDECAR_BACKUP_ROUTE, SIDECAR_HEALTH_ROUTE, SIDECAR_STATE_ROUTE};
use operator_api::registry::{DummyRegistry, HttpRegistry, K8sStsRegistry, Registry};
use operator_api::{HeartbeatStatus, K8sXlineHandle, LocalXlineHandle};
use prometheus::{Histogram, HistogramOpts};
use tokio::sync::watch::{Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{interval, MissedTickBehavior};
use tracing::{debug, error, info, warn};

use crate::backup::pv::Pv;
use crate::backup::Provider;
use crate::controller::Controller;
use crate::controller::{Controller, ControllerMetrics};
use crate::routers;
use crate::types::{BackendConfig, BackupConfig, Config, RegistryConfig, State, StatePayload};
use crate::utils::exponential_time_bucket;
use crate::xline::XlineHandle;

/// Sidecar
Expand Down Expand Up @@ -59,19 +61,26 @@ impl Sidecar {
self.config.cluster_name.clone(),
)),
};
let metrics_registry = prometheus::Registry::new();

self.start_controller(
Arc::clone(&handle),
&metrics_registry,
Arc::clone(&registry),
Arc::clone(&state),
graceful_shutdown_event.subscribe(),
);
)?;
self.start_heartbeat(
&metrics_registry,
registry,
graceful_shutdown_event.subscribe(),
)?;
self.start_web_server(
Arc::clone(&handle),
Arc::clone(&state),
metrics_registry,
graceful_shutdown_event.subscribe(),
)?;
self.start_heartbeat(registry, graceful_shutdown_event.subscribe())?;

tokio::pin!(forceful_shutdown);

Expand Down Expand Up @@ -145,9 +154,18 @@ impl Sidecar {
/// Start heartbeat
fn start_heartbeat(
&self,
metrics_registry: &prometheus::Registry,
registry: Arc<dyn Registry + Send + Sync>,
graceful_shutdown: Receiver<()>,
) -> Result<()> {
let heartbeat_histogram = Histogram::with_opts(
HistogramOpts::new(
"sidecar_heartbeat_duration_histogram",
"Duration of sidecar heartbeat loop in seconds",
)
.buckets(exponential_time_bucket(0.1, 2.0, 10)),
)?;
metrics_registry.register(Box::new(heartbeat_histogram.clone()))?;
let Some(monitor) = self.config.monitor.clone() else {
info!("monitor did not set, disable heartbeat");
return Ok(());
Expand All @@ -171,6 +189,7 @@ impl Sidecar {
// ensure a fixed heartbeat interval
tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let timer = heartbeat_histogram.start_timer();
let instant = tick.tick().await;

let config = match registry
Expand All @@ -197,6 +216,7 @@ impl Sidecar {
if let Err(e) = status.report(&monitor.monitor_addr).await {
error!("heartbeat report failed, error {e}");
}
drop(timer);
}
};

Expand All @@ -215,15 +235,19 @@ impl Sidecar {
fn start_controller(
&self,
handle: Arc<RwLock<XlineHandle>>,
metrics_registry: &prometheus::Registry,
registry: Arc<dyn Registry + Sync + Send>,
state: Arc<Mutex<StatePayload>>,
graceful_shutdown: Receiver<()>,
) {
) -> Result<()> {
let metrics = ControllerMetrics::new();
metrics.register(metrics_registry)?;
let controller = Controller::new(
self.config.name.clone(),
state,
handle,
self.config.reconcile_interval,
metrics,
registry,
);
let init_member_config = self.config.init_member.clone();
Expand All @@ -238,13 +262,15 @@ impl Sidecar {
info!("controller shutdown");
}
});
Ok(())
}

/// Run a web server to expose current state to other sidecar operators and k8s
fn start_web_server(
&self,
handle: Arc<RwLock<XlineHandle>>,
state: Arc<Mutex<StatePayload>>,
metrics_registry: prometheus::Registry,
graceful_shutdown: Receiver<()>,
) -> Result<()> {
let members = self.config.init_member.sidecar_members();
Expand All @@ -260,8 +286,10 @@ impl Sidecar {
.route(SIDECAR_HEALTH_ROUTE, get(routers::health))
.route(SIDECAR_BACKUP_ROUTE, get(routers::backup))
.route(SIDECAR_STATE_ROUTE, get(routers::state))
.route("/metrics", any(routers::metrics))
.route("/membership", post(routers::membership))
.layer(Extension(handle))
.layer(Extension(metrics_registry))
.layer(Extension(state));

debug!("web server listen addr: {addr}");
Expand Down
13 changes: 13 additions & 0 deletions sidecar/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fs::{read_to_string, remove_file, write};
use std::iter::repeat;
use std::ops::Mul;
use std::path::Path;

use clippy_utilities::NumericCast;
use operator_api::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR};

/// Check if the volume under the path is working fine
Expand Down Expand Up @@ -50,6 +53,16 @@ pub(crate) fn check_backup_volume() -> bool {
check_volume(backup_volume)
}

/// Returns a vector of time buckets for the reconcile duration histogram.
#[allow(clippy::as_conversions)] // count will not too large
pub(crate) fn exponential_time_bucket(start: f64, factor: f64, count: usize) -> Vec<f64> {
repeat(factor)
.enumerate()
.take(count)
.map(|(i, f)| start.mul(f.powi(i.numeric_cast())))
.collect::<Vec<_>>()
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit d7339d0

Please sign in to comment.