Skip to content

Commit

Permalink
refactor: add Metrics trait
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Sep 7, 2023
1 parent 5443cbe commit 8feaff0
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 46 deletions.
50 changes: 26 additions & 24 deletions operator-k8s/src/controller/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use clippy_utilities::NumericCast;
use prometheus::{Histogram, HistogramOpts, HistogramTimer, IntCounterVec, Opts, Registry};
use prometheus::{Error, Histogram, HistogramOpts, HistogramTimer, IntCounterVec, Opts, Registry};

use std::iter::repeat;
use std::ops::Mul;

use crate::controller::Metrics;

/// Controller v1alpha
mod v1alpha;
/// Controller v1alpha1
Expand All @@ -20,10 +22,28 @@ pub(crate) struct ClusterMetrics {
reconcile_failed_count: IntCounterVec,
}

/// Label error
trait LabelError {
/// Label
fn label(&self) -> &str;
impl Default for ClusterMetrics {
fn default() -> Self {
Self::new()
}
}

impl Metrics for ClusterMetrics {
/// Register metrics
fn register(&self, registry: &Registry) -> Result<(), Error> {
registry.register(Box::new(self.reconcile_duration.clone()))?;
registry.register(Box::new(self.reconcile_failed_count.clone()))
}

/// Record duration
fn record_duration(&self) -> HistogramTimer {
self.reconcile_duration.start_timer()
}

/// Increment failed count
fn record_failed_count(&self, labels: &[&str]) {
self.reconcile_failed_count.with_label_values(labels).inc();
}
}

impl ClusterMetrics {
Expand All @@ -38,7 +58,7 @@ impl ClusterMetrics {
)
.buckets(exponential_time_bucket(0.1, 2.0, 10)),
)
.expect(""),
.expect("failed to create operator_reconcile_duration_seconds histogram"),
reconcile_failed_count: IntCounterVec::new(
Opts::new(
"operator_reconcile_failed_count",
Expand All @@ -49,24 +69,6 @@ impl ClusterMetrics {
.expect("failed to create operator_reconcile_failed_count counter"),
}
}

/// Register metrics
pub(crate) fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.reconcile_duration.clone()))?;
registry.register(Box::new(self.reconcile_failed_count.clone()))
}

/// Record duration
fn record_duration(&self) -> HistogramTimer {
self.reconcile_duration.start_timer()
}

/// Increment failed count
fn incr_failed_count(&self, reason: &impl LabelError) {
self.reconcile_failed_count
.with_label_values(&[reason.label()])
.inc();
}
}

/// Returns a vector of time buckets for the reconcile duration histogram.
Expand Down
23 changes: 13 additions & 10 deletions operator-k8s/src/controller/cluster/v1alpha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use kube::{Api, Client, Resource, ResourceExt};
use tracing::{debug, error};
use utils::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR};

use crate::controller::cluster::{ClusterMetrics, LabelError};
use crate::controller::cluster::ClusterMetrics;
use crate::controller::consts::{
DATA_EMPTY_DIR_NAME, DEFAULT_XLINE_PORT, FIELD_MANAGER, XLINE_POD_NAME_ENV, XLINE_PORT_NAME,
};
use crate::controller::Controller;
use crate::controller::{Controller, MetricsLabeled};
use crate::crd::v1alpha::Cluster;

/// CRD `XlineCluster` controller
Expand Down Expand Up @@ -51,13 +51,13 @@ pub(crate) enum Error {
InvalidVolumeName(&'static str),
}

impl LabelError for Error {
fn label(&self) -> &str {
impl MetricsLabeled for Error {
fn labels(&self) -> Vec<&str> {
match *self {
Self::MissingObject(_) => "missing_object",
Self::Kube(_) => "kube",
Self::CannotMount(_) => "cannot_mount",
Self::InvalidVolumeName(_) => "invalid_volume_name",
Self::MissingObject(_) => vec!["missing_object"],
Self::Kube(_) => vec!["kube"],
Self::CannotMount(_) => vec!["cannot_mount"],
Self::InvalidVolumeName(_) => vec!["invalid_volume_name"],
}
}
}
Expand Down Expand Up @@ -382,9 +382,13 @@ impl ClusterController {
#[async_trait]
impl Controller<Cluster> for ClusterController {
type Error = Error;
type Metrics = ClusterMetrics;

fn metrics(&self) -> &Self::Metrics {
&self.metrics
}

async fn reconcile_once(&self, cluster: &Arc<Cluster>) -> Result<()> {
let _timer = self.metrics.record_duration();
debug!(
"Reconciling cluster: \n{}",
serde_json::to_string_pretty(cluster.as_ref()).unwrap_or_default()
Expand All @@ -403,7 +407,6 @@ impl Controller<Cluster> for ClusterController {
}

fn handle_error(&self, resource: &Arc<Cluster>, err: &Self::Error) {
self.metrics.incr_failed_count(err);
error!("{:?} reconciliation error: {}", resource.metadata.name, err);
}
}
23 changes: 13 additions & 10 deletions operator-k8s/src/controller/cluster/v1alpha1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use kube::{Api, Client, Resource, ResourceExt};
use tracing::{debug, error};
use utils::consts::{DEFAULT_BACKUP_DIR, DEFAULT_DATA_DIR};

use crate::controller::cluster::{ClusterMetrics, LabelError};
use crate::controller::cluster::ClusterMetrics;
use crate::controller::consts::{
CRONJOB_IMAGE, DATA_EMPTY_DIR_NAME, DEFAULT_SIDECAR_PORT, DEFAULT_XLINE_PORT, FIELD_MANAGER,
SIDECAR_PORT_NAME, XLINE_POD_NAME_ENV, XLINE_PORT_NAME,
};
use crate::controller::Controller;
use crate::controller::{Controller, MetricsLabeled};
use crate::crd::v1alpha1::{Cluster, StorageSpec};

/// CRD `XlineCluster` controller
Expand Down Expand Up @@ -53,13 +53,13 @@ pub(crate) enum Error {
InvalidVolumeName(&'static str),
}

impl LabelError for Error {
fn label(&self) -> &str {
impl MetricsLabeled for Error {
fn labels(&self) -> Vec<&str> {
match *self {
Self::MissingObject(_) => "missing_object",
Self::Kube(_) => "kube",
Self::CannotMount(_) => "cannot_mount",
Self::InvalidVolumeName(_) => "invalid_volume_name",
Self::MissingObject(_) => vec!["missing_object"],
Self::Kube(_) => vec!["kube"],
Self::CannotMount(_) => vec!["cannot_mount"],
Self::InvalidVolumeName(_) => vec!["invalid_volume_name"],
}
}
}
Expand Down Expand Up @@ -469,9 +469,13 @@ impl ClusterController {
#[async_trait]
impl Controller<Cluster> for ClusterController {
type Error = Error;
type Metrics = ClusterMetrics;

fn metrics(&self) -> &Self::Metrics {
&self.metrics
}

async fn reconcile_once(&self, cluster: &Arc<Cluster>) -> Result<()> {
let _timer = self.metrics.record_duration();
debug!(
"Reconciling cluster: \n{}",
serde_json::to_string_pretty(cluster.as_ref()).unwrap_or_default()
Expand Down Expand Up @@ -502,7 +506,6 @@ impl Controller<Cluster> for ClusterController {
}

fn handle_error(&self, resource: &Arc<Cluster>, err: &Self::Error) {
self.metrics.incr_failed_count(err);
error!("{:?} reconciliation error: {}", resource.metadata.name, err);
}
}
34 changes: 33 additions & 1 deletion operator-k8s/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ impl<C> Context<C> {
}
}

/// Metrics labeled
pub(crate) trait MetricsLabeled {
/// Label
#[allow(clippy::indexing_slicing)] // labels should always have at least one element
fn label(&self) -> &str {
self.labels()[0]
}

/// Labels
fn labels(&self) -> Vec<&str>;
}

/// The common metrics shared by all controllers
pub(crate) trait Metrics: Default {
/// Register metrics
fn register(&self, registry: &prometheus::Registry) -> Result<(), prometheus::Error>;

/// Record duration
fn record_duration(&self) -> prometheus::HistogramTimer;

/// Record failed count
fn record_failed_count(&self, labels: &[&str]);
}

/// The controller
#[async_trait]
pub(crate) trait Controller<R>: Sized + Send + Sync + 'static
Expand All @@ -38,7 +62,13 @@ where
R::DynamicType: Hash + Eq + Clone + Default + Unpin + Debug,
{
/// The error generated by this controller
type Error: Error + Send + Sync + 'static;
type Error: MetricsLabeled + Error + Send + Sync + 'static;

/// The metrics used by this controller
type Metrics: Metrics;

/// Get the metrics
fn metrics(&self) -> &Self::Metrics;

/// Use &self to execute a reconcile
async fn reconcile_once(&self, resource: &Arc<R>) -> Result<(), Self::Error>;
Expand All @@ -49,13 +79,15 @@ where
/// The reconcile function used in kube::runtime::Controller
async fn reconcile(resource: Arc<R>, ctx: Arc<Context<Self>>) -> Result<Action, Self::Error> {
let controller = &ctx.controller;
let _timer = controller.metrics().record_duration();
controller.reconcile_once(&resource).await?;
Ok(Action::requeue(DEFAULT_REQUEUE_DURATION))
}

/// The on_error function used in kube::runtime::Controller
fn on_error(resource: Arc<R>, err: &Self::Error, ctx: Arc<Context<Self>>) -> Action {
let controller = &ctx.controller;
controller.metrics().record_failed_count(&err.labels());
controller.handle_error(&resource, err);
Action::requeue(DEFAULT_REQUEUE_DURATION)
}
Expand Down
2 changes: 1 addition & 1 deletion operator-k8s/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use utils::migration::ApiVersion;

use crate::config::{Config, Namespace};
use crate::controller::cluster::{ClusterMetrics, Controller as ClusterController};
use crate::controller::{Context, Controller};
use crate::controller::{Context, Controller, Metrics};
use crate::crd::Cluster;
use crate::router::{healthz, metrics, sidecar_state};
use crate::sidecar_state::SidecarState;
Expand Down

0 comments on commit 8feaff0

Please sign in to comment.