Skip to content

Commit

Permalink
feat: Add /internal-backstage/metricsbatch to view what Edge currentl…
Browse files Browse the repository at this point in the history
…y has stored of metrics (#420)
  • Loading branch information
Christopher Kolstad authored Feb 28, 2024
1 parent 3ca7069 commit 98758be
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 97 deletions.
4 changes: 1 addition & 3 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use unleash_yggdrasil::EngineState;
use super::unleash_client::UnleashClient;
use crate::error::{EdgeError, FeatureError};
use crate::filters::{filter_client_features, FeatureFilterSet};
use crate::types::{
build, EdgeResult, TokenType, TokenValidationStatus,
};
use crate::types::{build, EdgeResult, TokenType, TokenValidationStatus};
use crate::{
persistence::EdgePersistence,
tokens::{cache_key, simplify},
Expand Down
3 changes: 1 addition & 2 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use crate::error::{CertificateError, FeatureError};
use crate::metrics::client_metrics::MetricsBatch;
use crate::tls::build_upstream_certificate;
use crate::types::{
ClientFeaturesResponse, EdgeResult, EdgeToken,
TokenValidationStatus, ValidateTokensRequest,
ClientFeaturesResponse, EdgeResult, EdgeToken, TokenValidationStatus, ValidateTokensRequest,
};
use crate::urls::UnleashUrls;
use crate::{error::EdgeError, types::ClientFeaturesRequest};
Expand Down
30 changes: 29 additions & 1 deletion server/src/internal_backstage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ use std::collections::HashMap;
use crate::auth::token_validator::TokenValidator;
use crate::http::feature_refresher::FeatureRefresher;
use crate::metrics::actix_web_metrics::PrometheusMetricsHandler;
use crate::types::Status;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::{BuildInfo, EdgeJsonResult, EdgeToken, TokenInfo, TokenRefresh};
use crate::types::{ClientMetric, MetricsInfo, Status};
use actix_web::{
get,
web::{self, Json},
};
use dashmap::DashMap;
use iter_tools::Itertools;
use serde::{Deserialize, Serialize};
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::ClientApplication;

#[derive(Debug, Serialize, Deserialize)]
pub struct EdgeStatus {
pub status: Status,
Expand Down Expand Up @@ -82,6 +86,27 @@ pub async fn tokens(
}))
}

#[get("/metricsbatch")]
pub async fn metrics_batch(metrics_cache: web::Data<MetricsCache>) -> EdgeJsonResult<MetricsInfo> {
let applications: Vec<ClientApplication> = metrics_cache
.applications
.iter()
.map(|e| e.value().clone())
.collect_vec();
let metrics = metrics_cache
.metrics
.iter()
.map(|e| ClientMetric {
key: e.key().clone(),
bucket: e.value().clone(),
})
.collect_vec();
Ok(Json(MetricsInfo {
applications,
metrics,
}))
}

#[get("/features")]
pub async fn features(
features_cache: web::Data<DashMap<String, ClientFeatures>>,
Expand All @@ -92,6 +117,7 @@ pub async fn features(
.collect();
Ok(Json(features))
}

pub fn configure_internal_backstage(
cfg: &mut web::ServiceConfig,
metrics_handler: PrometheusMetricsHandler,
Expand All @@ -100,6 +126,7 @@ pub fn configure_internal_backstage(
.service(info)
.service(tokens)
.service(ready)
.service(metrics_batch)
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
.service(features);
}
Expand All @@ -125,6 +152,7 @@ mod tests {
use dashmap::DashMap;
use unleash_types::client_features::{ClientFeature, ClientFeatures};
use unleash_yggdrasil::EngineState;

#[actix_web::test]
async fn test_health_ok() {
let app = test::init_service(
Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ mod tests {
.configure(crate::client_api::configure_client_api)
.configure(|cfg| {
crate::frontend_api::configure_frontend_api(cfg, false)
})
}),
)
.service(web::scope("/edge").configure(crate::edge_api::configure_edge_api)),
|_| AppConfig::default(),
Expand Down
6 changes: 2 additions & 4 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use unleash_edge::middleware::request_tracing::RequestTracing;
use unleash_edge::offline::offline_hotload;
use unleash_edge::persistence::{persist_data, EdgePersistence};
use unleash_edge::types::{EdgeToken, TokenRefresh, TokenValidationStatus};
use unleash_edge::{
cli, client_api, frontend_api, health_checker, openapi, ready_checker,
};
use unleash_edge::{cli, client_api, frontend_api, health_checker, openapi, ready_checker};
use unleash_edge::{edge_api, prom_metrics};
use unleash_edge::{internal_backstage, tls};

Expand Down Expand Up @@ -119,7 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
.configure(client_api::configure_client_api)
.configure(|cfg| {
frontend_api::configure_frontend_api(cfg, disable_all_endpoint)
})
}),
)
.service(web::scope("/edge").configure(edge_api::configure_edge_api))
.service(
Expand Down
192 changes: 113 additions & 79 deletions server/src/metrics/actix_web_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,35 +363,39 @@ where

let request_metrics = self.metrics.clone();
Box::pin(self.service.call(req).map(move |res| {
request_metrics.http_server_active_requests.add(-1, &attributes);

let status_code = match &res {
Ok(res) => res.status(),
Err(e) => e.as_response_error().status_code(),
}
.as_u16() as i64;

attributes.push(HTTP_RESPONSE_STATUS_CODE.i64(status_code));

let response_size = res
.as_ref()
.map(|res| {
res.response()
.headers()
.get(CONTENT_LENGTH)
.and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
.unwrap_or(0u64)
})
.unwrap_or(0);
request_metrics.http_server_response_size.record(response_size, &attributes);

request_metrics.http_server_duration.record(
timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
&attributes,
);

res
}))
request_metrics
.http_server_active_requests
.add(-1, &attributes);

let status_code = match &res {
Ok(res) => res.status(),
Err(e) => e.as_response_error().status_code(),
}
.as_u16() as i64;

attributes.push(HTTP_RESPONSE_STATUS_CODE.i64(status_code));

let response_size = res
.as_ref()
.map(|res| {
res.response()
.headers()
.get(CONTENT_LENGTH)
.and_then(|len| len.to_str().ok().and_then(|s| s.parse().ok()))
.unwrap_or(0u64)
})
.unwrap_or(0);
request_metrics
.http_server_response_size
.record(response_size, &attributes);

request_metrics.http_server_duration.record(
timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
&attributes,
);

res
}))
}
}

Expand Down Expand Up @@ -434,69 +438,99 @@ impl dev::Handler<actix_web::HttpRequest> for PrometheusMetricsHandler {

#[cfg(test)]
mod tests {
use actix_web::{test, web, App, HttpResponse, http::StatusCode};
use prometheus::{Encoder, Registry, TextEncoder};
use crate::prom_metrics;

use actix_web::{http::StatusCode, test, web, App, HttpResponse};
use prometheus::{Encoder, Registry, TextEncoder};

async fn test_ok_endpoint() -> HttpResponse {
HttpResponse::Ok().append_header(("Content-length", 7)).body("Test OK")
HttpResponse::Ok()
.append_header(("Content-length", 7))
.body("Test OK")
}

async fn test_client_error_endpoint() -> HttpResponse {
HttpResponse::BadRequest().append_header(("Content-length", 17)).body("Test Client Error")
HttpResponse::BadRequest()
.append_header(("Content-length", 17))
.body("Test Client Error")
}

async fn test_server_error_endpoint() -> HttpResponse {
HttpResponse::InternalServerError().append_header(("Content-length", 17)).body("Test Server Error")
HttpResponse::InternalServerError()
.append_header(("Content-length", 17))
.body("Test Server Error")
}

fn parse_metrics_for_status_code(metrics_output: &str, status_code: i64) -> Option<f64> {
metrics_output.lines()
.filter(|line| line.contains("http_server_response_size_bytes_sum") && line.contains(&format!("http_response_status_code=\"{}\"", status_code)))
.flat_map(|line| line.split_whitespace().last())
.flat_map(|value| value.parse::<f64>().ok())
.next()
}
metrics_output
.lines()
.filter(|line| {
line.contains("http_server_response_size_bytes_sum")
&& line.contains(&format!("http_response_status_code=\"{}\"", status_code))
})
.flat_map(|line| line.split_whitespace().last())
.flat_map(|value| value.parse::<f64>().ok())
.next()
}

#[tokio::test]
async fn test_middleware_response_metrics() {
let registry = Registry::new();
let (_, request_metrics) = prom_metrics::test_instantiate_without_tracing_and_logging(Some(registry.clone()));

let mut app = test::init_service(
App::new()
.wrap(request_metrics.clone())
.service(web::resource("/test_ok").to(test_ok_endpoint))
.service(web::resource("/test_client_error").to(test_client_error_endpoint))
.service(web::resource("/test_server_error").to(test_server_error_endpoint))
).await;

let req_ok = test::TestRequest::get().uri("/test_ok").to_request();
let resp_ok = test::call_service(&mut app, req_ok).await;
assert_eq!(resp_ok.status(), StatusCode::OK);

let req_client_error = test::TestRequest::get().uri("/test_client_error").to_request();
let resp_client_error = test::call_service(&mut app, req_client_error).await;
assert_eq!(resp_client_error.status(), StatusCode::BAD_REQUEST);

let req_server_error = test::TestRequest::get().uri("/test_server_error").to_request();
let resp_server_error = test::call_service(&mut app, req_server_error).await;
assert_eq!(resp_server_error.status(), StatusCode::INTERNAL_SERVER_ERROR);

let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
let metrics_output = String::from_utf8(buffer).unwrap();

let value_ok = parse_metrics_for_status_code(&metrics_output, 200).expect("Metric with status code 200 not found");
assert_eq!(value_ok, 7.0, "Metric value for status code 200 did not match expected");

let value_client_error = parse_metrics_for_status_code(&metrics_output, 400).expect("Metric with status code 400 not found");
assert_eq!(value_client_error, 17.0, "Metric value for status code 400 did not match expected");

let value_server_error = parse_metrics_for_status_code(&metrics_output, 500).expect("Metric with status code 500 not found");
assert_eq!(value_server_error, 17.0, "Metric value for status code 500 did not match expected");
let registry = Registry::new();
let (_, request_metrics) =
prom_metrics::test_instantiate_without_tracing_and_logging(Some(registry.clone()));

let mut app = test::init_service(
App::new()
.wrap(request_metrics.clone())
.service(web::resource("/test_ok").to(test_ok_endpoint))
.service(web::resource("/test_client_error").to(test_client_error_endpoint))
.service(web::resource("/test_server_error").to(test_server_error_endpoint)),
)
.await;

let req_ok = test::TestRequest::get().uri("/test_ok").to_request();
let resp_ok = test::call_service(&mut app, req_ok).await;
assert_eq!(resp_ok.status(), StatusCode::OK);

let req_client_error = test::TestRequest::get()
.uri("/test_client_error")
.to_request();
let resp_client_error = test::call_service(&mut app, req_client_error).await;
assert_eq!(resp_client_error.status(), StatusCode::BAD_REQUEST);

let req_server_error = test::TestRequest::get()
.uri("/test_server_error")
.to_request();
let resp_server_error = test::call_service(&mut app, req_server_error).await;
assert_eq!(
resp_server_error.status(),
StatusCode::INTERNAL_SERVER_ERROR
);

let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
let metrics_output = String::from_utf8(buffer).unwrap();

let value_ok = parse_metrics_for_status_code(&metrics_output, 200)
.expect("Metric with status code 200 not found");
assert_eq!(
value_ok, 7.0,
"Metric value for status code 200 did not match expected"
);

let value_client_error = parse_metrics_for_status_code(&metrics_output, 400)
.expect("Metric with status code 400 not found");
assert_eq!(
value_client_error, 17.0,
"Metric value for status code 400 did not match expected"
);

let value_server_error = parse_metrics_for_status_code(&metrics_output, 500)
.expect("Metric with status code 500 not found");
assert_eq!(
value_server_error, 17.0,
"Metric value for status code 500 did not match expected"
);
}
}
}
2 changes: 1 addition & 1 deletion server/src/metrics/client_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl From<ClientMetricsEnv> for MetricsKey {
}
}

#[derive(Debug, Clone, Eq)]
#[derive(Debug, Clone, Eq, Deserialize, Serialize)]
pub struct MetricsKey {
pub app_name: String,
pub feature_name: String,
Expand Down
2 changes: 1 addition & 1 deletion server/src/middleware/client_token_from_frontend_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod tests {
.configure(crate::client_api::configure_client_api)
.configure(|cfg| {
crate::frontend_api::configure_frontend_api(cfg, false)
})
}),
)
.service(web::scope("/edge").configure(crate::edge_api::configure_edge_api)),
|_| AppConfig::default(),
Expand Down
12 changes: 7 additions & 5 deletions server/src/prom_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ fn register_custom_metrics(registry: &prometheus::Registry) {
}

#[cfg(test)]
pub fn test_instantiate_without_tracing_and_logging(registry: Option<prometheus::Registry>) -> (PrometheusMetricsHandler, RequestMetrics) {
let registry = registry.unwrap_or_else(instantiate_registry);
register_custom_metrics(&registry);
instantiate_prometheus_metrics_handler(registry)
}
pub fn test_instantiate_without_tracing_and_logging(
registry: Option<prometheus::Registry>,
) -> (PrometheusMetricsHandler, RequestMetrics) {
let registry = registry.unwrap_or_else(instantiate_registry);
register_custom_metrics(&registry);
instantiate_prometheus_metrics_handler(registry)
}
Loading

0 comments on commit 98758be

Please sign in to comment.