Skip to content

Commit

Permalink
Replace static metrics with metrics with namespace from options
Browse files Browse the repository at this point in the history
Metric subsystem_request_duration_seconds and cluster_health_status should also use custom namespace from opts instead of hardcoded elasticsearch.
  • Loading branch information
krisleipus committed Oct 31, 2023
1 parent feac9db commit 0acc473
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 45 deletions.
11 changes: 0 additions & 11 deletions src/exporter_metrics.rs

This file was deleted.

51 changes: 44 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@
unused_mut
)]

#[macro_use]
extern crate prometheus;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
use elasticsearch::http::transport::{SingleNodeConnectionPool, TransportBuilder};
use elasticsearch::Elasticsearch;
use prometheus::{default_registry, HistogramOpts, HistogramVec, IntGaugeVec, Opts};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -58,9 +55,6 @@ pub mod collection;
/// Metric
pub mod metric;

/// Exporter metrics
mod exporter_metrics;

mod options;
pub use options::ExporterOptions;

Expand Down Expand Up @@ -102,6 +96,18 @@ struct Inner {
/// Node ID to node name map for adding extra metadata labels
/// {"U-WnGaTpRxucgde3miiDWw": "m1-supernode.example.com"}
nodes_metadata: metadata::IdToMetadata,

// Exporter metrics
metrics: ExporterMetrics,
}

/// Global metrics for Elasticsearch exporter
#[derive(Debug)]
pub struct ExporterMetrics {
/// Subsystem request histogram
subsystem_req_histogram: HistogramVec,
/// Cluster health status
cluster_health_status: IntGaugeVec,
}

impl Exporter {
Expand Down Expand Up @@ -131,6 +137,11 @@ impl Exporter {
&self.0.nodes_metadata
}

/// Exporter metrics
pub fn metrics(&self) -> &ExporterMetrics {
&self.0.metrics
}

/// Spawn exporter
pub async fn new(options: ExporterOptions) -> Result<Self, Box<dyn std::error::Error>> {
let connection_pool = SingleNodeConnectionPool::new(options.elasticsearch_url.clone());
Expand All @@ -155,12 +166,38 @@ impl Exporter {
let mut const_labels = HashMap::new();
let _ = const_labels.insert("cluster".into(), cluster_name.clone());

let metrics = ExporterMetrics {
subsystem_req_histogram: HistogramVec::new(
HistogramOpts::new(
"subsystem_request_duration_seconds",
"The Elasticsearch subsystem request latencies in seconds.",
)
.namespace(options.exporter_metrics_namespace.as_str()),
&["subsystem", "cluster"],
)
.expect("valid histogram vec metric"),

cluster_health_status: prometheus::IntGaugeVec::new(
Opts::new(
"cluster_health_status",
"Whether all primary and replica shards are allocated.",
)
.namespace(options.exporter_metrics_namespace.as_str()),
&["cluster", "color"],
)
.expect("valid prometheus metric"),
};

default_registry().register(Box::new(metrics.cluster_health_status.clone()))?;
default_registry().register(Box::new(metrics.subsystem_req_histogram.clone()))?;

Ok(Self(Arc::new(Inner {
cluster_name,
client,
options,
const_labels,
nodes_metadata,
metrics,
})))
}

Expand Down
6 changes: 4 additions & 2 deletions src/metadata/node_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use elasticsearch::{Elasticsearch, Error};
use std::collections::HashMap;
use tokio::sync::RwLock;

use crate::{exporter_metrics::SUBSYSTEM_REQ_HISTOGRAM, Exporter};
use crate::Exporter;

pub(crate) type IdToMetadata = RwLock<NodeDataMap>;

Expand Down Expand Up @@ -59,7 +59,9 @@ pub(crate) async fn poll(exporter: Exporter) {
loop {
let _ = interval.tick().await;

let timer = SUBSYSTEM_REQ_HISTOGRAM
let timer: prometheus::HistogramTimer = exporter
.metrics()
.subsystem_req_histogram
.with_label_values(&["/_nodes/os", exporter.cluster_name()])
.start_timer();

Expand Down
30 changes: 13 additions & 17 deletions src/metrics/_cluster/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,23 @@ async fn metrics(exporter: &Exporter) -> Result<Vec<Metrics>, elasticsearch::Err
.send()
.await?;

let values = response
.json::<CluserHealthResponse>()
.await?
.into_value(inject_cluster_health);
let values = response.json::<CluserHealthResponse>().await?.into_value();

Ok(metric::from_value(values))
}
update_health_metrics_from_value(&values, exporter.metrics().cluster_health_status.clone());

lazy_static! {
static ref CLUSTER_STATUS: IntGaugeVec = register_int_gauge_vec!(
"elasticsearch_cluster_health_status",
"Whether all primary and replica shards are allocated.",
&["cluster", "color"]
)
.expect("valid prometheus metric");
Ok(metric::from_value(values))
}

const COLORS: [&str; 3] = ["red", "green", "yellow"];

fn update_health_metrics_from_value(value: &Value, health_metric: IntGaugeVec) {
if let Some(map) = value.as_object() {
update_health_metrics(map, health_metric)
}
}

// elasticsearch_cluster_health_cluster_status{cluster="some",status="green"} 1
fn inject_cluster_health(map: &mut SerdeMap<String, Value>) {
fn update_health_metrics(map: &SerdeMap<String, Value>, health_metric: IntGaugeVec) {
let cluster_status: String = map
.get("status")
.and_then(|v| v.as_str())
Expand All @@ -52,11 +48,11 @@ fn inject_cluster_health(map: &mut SerdeMap<String, Value>) {

for color in COLORS.iter() {
if color == &cluster_status {
CLUSTER_STATUS
health_metric
.with_label_values(&[&cluster_name, &cluster_status])
.set(1);
} else {
CLUSTER_STATUS
health_metric
.with_label_values(&[&cluster_name, color])
.set(0);
}
Expand All @@ -71,7 +67,7 @@ async fn test_cluster_health() {
serde_json::from_str(include_str!("../../tests/files/cluster_health.json"))
.expect("valid json");

let values = cluster_health.into_value(inject_cluster_health);
let values = cluster_health.into_value();

let metrics = metric::from_value(values);
assert!(!metrics.is_empty());
Expand Down
8 changes: 2 additions & 6 deletions src/metrics/_cluster/responses/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use serde_json::{Map, Value};
use serde_json::Value;

#[derive(Debug, Deserialize)]
pub(crate) struct CluserHealthResponse(Value);

impl CluserHealthResponse {
/// Inject labels into nodes response
pub(crate) fn into_value(mut self, value_mangle: fn(&mut Map<String, Value>)) -> Value {
if let Some(map) = self.0.as_object_mut() {
value_mangle(map)
}

pub(crate) fn into_value(self) -> Value {
self.0
}
}
5 changes: 3 additions & 2 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ macro_rules! poll_metrics {
use std::time::Duration;

use $crate::collection::{lifetime, lifetime::MetricLifetimeMap, Collection};
use $crate::exporter_metrics::SUBSYSTEM_REQ_HISTOGRAM;
use $crate::metric::{self, Metrics};
use $crate::Exporter;

Expand Down Expand Up @@ -74,7 +73,9 @@ macro_rules! poll_metrics {

let _ = interval.tick().await;

let timer = SUBSYSTEM_REQ_HISTOGRAM
let timer = exporter
.metrics()
.subsystem_req_histogram
.with_label_values(&[&format!("/{}", SUBSYSTEM), exporter.cluster_name()])
.start_timer();

Expand Down

0 comments on commit 0acc473

Please sign in to comment.