Skip to content

Commit

Permalink
Some basic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Nov 23, 2024
1 parent 0c2daec commit 9bedad4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 12 deletions.
32 changes: 26 additions & 6 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,31 @@ use crate::execution::AppExecution;
use crate::test_utils::trailers_layer::TrailersLayer;
use color_eyre::Result;
use log::info;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics::{describe_counter, describe_histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

const DEFAULT_TIMEOUT_SECONDS: u64 = 60;

fn initialize_metrics() {
describe_counter!("requests", "Incoming requests by FlightSQL endpoint");

describe_histogram!(
"get_flight_info_latency_ms",
metrics::Unit::Milliseconds,
"Get flight info latency ms"
);

describe_histogram!(
"do_get_fallback_latency_ms",
metrics::Unit::Milliseconds,
"Do get fallback latency ms"
)
}

/// Creates and manages a running FlightSqlServer with a background task
pub struct FlightSqlApp {
/// channel to send shutdown command
Expand Down Expand Up @@ -79,14 +96,17 @@ impl FlightSqlApp {
info!("Listening to metrics on {addr}");
builder
.with_http_listener(addr)
.set_buckets_for_metric(
Matcher::Suffix("latency_ms".to_string()),
&[
1.0, 3.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0, 2500.0,
5000.0, 10000.0, 20000.0,
],
)?
.install()
.expect("failed to install metrics recorder/exporter");

metrics::describe_histogram!(
"logical_planning_ms",
metrics::Unit::Milliseconds,
"Logical planning ms"
);
initialize_metrics();
}

// Run the server in its own background task
Expand Down
24 changes: 18 additions & 6 deletions src/server/services/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::sql::parser::DFParser;
use futures::{StreamExt, TryStreamExt};
use log::{debug, error, info};
use metrics::{counter, histogram};
use prost::Message;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tonic::{Request, Response, Status};
use uuid::Uuid;

Expand Down Expand Up @@ -99,18 +101,18 @@ impl FlightSqlServiceImpl {

Ok(Response::new(info))
} else {
error!("Error encoding ticket");
error!("error encoding ticket");
Err(Status::internal("Error encoding ticket"))
}
}
Err(e) => {
error!("Error planning SQL query: {:?}", e);
error!("error planning SQL query: {:?}", e);
Err(Status::internal("Error planning SQL query"))
}
}
}
Err(e) => {
error!("Error parsing SQL query: {:?}", e);
error!("error parsing SQL query: {:?}", e);
Err(Status::internal("Error parsing SQL query"))
}
}
Expand Down Expand Up @@ -172,7 +174,7 @@ impl FlightSqlServiceImpl {
}
}
Err(e) => {
error!("Error decoding ticket: {:?}", e);
error!("error decoding ticket: {:?}", e);
Err(Status::internal("Error decoding ticket"))
}
}
Expand All @@ -188,7 +190,12 @@ impl FlightSqlService for FlightSqlServiceImpl {
query: CommandStatementQuery,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
self.get_flight_info_statement_handler(query, request).await
counter!("requests", "endpoint" => "get_flight_info").increment(1);
let start = Instant::now();
let res = self.get_flight_info_statement_handler(query, request).await;
let duration = start.elapsed();
histogram!("get_flight_info_latency_ms").record(duration.as_millis() as f64);
res
}

async fn do_get_statement(
Expand All @@ -204,7 +211,12 @@ impl FlightSqlService for FlightSqlServiceImpl {
request: Request<Ticket>,
message: Any,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
self.do_get_fallback_handler(request, message).await
counter!("requests", "endpoint" => "do_get_fallback").increment(1);
let start = Instant::now();
let res = self.do_get_fallback_handler(request, message).await;
let duration = start.elapsed();
histogram!("do_get_fallback_latency_ms").record(duration.as_millis() as f64);
res
}

async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
Expand Down

0 comments on commit 9bedad4

Please sign in to comment.