diff --git a/src/server/mod.rs b/src/server/mod.rs index 4aabf99..adbebb5 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -21,7 +21,8 @@ use crate::execution::AppExecution; use crate::test_utils::trailers_layer::TrailersLayer; use color_eyre::Result; use log::info; -use metrics_exporter_prometheus::PrometheusBuilder; +use metrics::{describe_counter, describe_histogram}; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; use std::net::SocketAddr; use std::time::Duration; use tokio::net::TcpListener; @@ -29,6 +30,22 @@ use tokio::task::JoinHandle; const DEFAULT_TIMEOUT_SECONDS: u64 = 60; +fn initialize_metrics() { + describe_counter!("requests", "Incoming requests by FlightSQL endpoint"); + + describe_histogram!( + "get_flight_info_latency_ms", + metrics::Unit::Milliseconds, + "Get flight info latency ms" + ); + + describe_histogram!( + "do_get_fallback_latency_ms", + metrics::Unit::Milliseconds, + "Do get fallback latency ms" + ) +} + /// Creates and manages a running FlightSqlServer with a background task pub struct FlightSqlApp { /// channel to send shutdown command @@ -79,14 +96,17 @@ impl FlightSqlApp { info!("Listening to metrics on {addr}"); builder .with_http_listener(addr) + .set_buckets_for_metric( + Matcher::Suffix("latency_ms".to_string()), + &[ + 1.0, 3.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, + 5000.0, 10000.0, 20000.0, + ], + )? .install() .expect("failed to install metrics recorder/exporter"); - metrics::describe_histogram!( - "logical_planning_ms", - metrics::Unit::Milliseconds, - "Logical planning ms" - ); + initialize_metrics(); } // Run the server in its own background task diff --git a/src/server/services/flightsql.rs b/src/server/services/flightsql.rs index 99401ca..4034b83 100644 --- a/src/server/services/flightsql.rs +++ b/src/server/services/flightsql.rs @@ -26,10 +26,12 @@ use datafusion::logical_expr::LogicalPlan; use datafusion::sql::parser::DFParser; use futures::{StreamExt, TryStreamExt}; use log::{debug, error, info}; +use metrics::{counter, histogram}; use prost::Message; use std::collections::HashMap; use std::str::FromStr; use std::sync::{Arc, Mutex}; +use std::time::Instant; use tonic::{Request, Response, Status}; use uuid::Uuid; @@ -99,18 +101,18 @@ impl FlightSqlServiceImpl { Ok(Response::new(info)) } else { - error!("Error encoding ticket"); + error!("error encoding ticket"); Err(Status::internal("Error encoding ticket")) } } Err(e) => { - error!("Error planning SQL query: {:?}", e); + error!("error planning SQL query: {:?}", e); Err(Status::internal("Error planning SQL query")) } } } Err(e) => { - error!("Error parsing SQL query: {:?}", e); + error!("error parsing SQL query: {:?}", e); Err(Status::internal("Error parsing SQL query")) } } @@ -172,7 +174,7 @@ impl FlightSqlServiceImpl { } } Err(e) => { - error!("Error decoding ticket: {:?}", e); + error!("error decoding ticket: {:?}", e); Err(Status::internal("Error decoding ticket")) } } @@ -188,7 +190,12 @@ impl FlightSqlService for FlightSqlServiceImpl { query: CommandStatementQuery, request: Request, ) -> Result, Status> { - self.get_flight_info_statement_handler(query, request).await + counter!("requests", "endpoint" => "get_flight_info").increment(1); + let start = Instant::now(); + let res = self.get_flight_info_statement_handler(query, request).await; + let duration = start.elapsed(); + histogram!("get_flight_info_latency_ms").record(duration.as_millis() as f64); + res } async fn do_get_statement( @@ -204,7 +211,12 @@ impl FlightSqlService for FlightSqlServiceImpl { request: Request, message: Any, ) -> Result::DoGetStream>, Status> { - self.do_get_fallback_handler(request, message).await + counter!("requests", "endpoint" => "do_get_fallback").increment(1); + let start = Instant::now(); + let res = self.do_get_fallback_handler(request, message).await; + let duration = start.elapsed(); + histogram!("do_get_fallback_latency_ms").record(duration.as_millis() as f64); + res } async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}