Skip to content

Commit

Permalink
Start setting up metrics (datafusion-contrib#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner authored Nov 24, 2024
1 parent e8c8e20 commit 6c9073a
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 165 deletions.
352 changes: 314 additions & 38 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ http-body = {version = "0.4.5" }
itertools = "0.13.0"
lazy_static = "1.4.0"
log = "0.4.22"
metrics = {version = "0.24.0", optional = true }
metrics-exporter-prometheus = {version = "0.16.0", optional = true }
num_cpus = "1.16.0"
object_store = { version = "0.10.2", features = ["aws"], optional = true }
parking_lot = "0.12.3"
Expand Down Expand Up @@ -63,10 +65,11 @@ url = "2.5.2"
default = ["functions-parquet"]
deltalake = ["dep:deltalake"]
flightsql = ["dep:arrow-flight", "dep:tonic"]
experimental-flightsql-server = ["dep:arrow-flight", "dep:tonic"]
experimental-flightsql-server = ["flightsql"]
s3 = ["object_store/aws", "url"]
functions-json = ["dep:datafusion-functions-json"]
functions-parquet = ["dep:datafusion-functions-parquet"]
metrics = ["dep:metrics", "dep:metrics-exporter-prometheus"]

[[bin]]
name = "dft"
Expand Down
1 change: 1 addition & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub struct DftArgs {
#[clap(long, short, help = "Only show how long the query took to run")]
pub time: bool,

#[cfg(feature = "experimental-flightsql-server")]
#[clap(long, help = "Start a FlightSQL server")]
pub serve: bool,

Expand Down
10 changes: 10 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ pub struct FlightSQLConfig {
pub connection_url: String,
#[serde(default = "default_benchmark_iterations")]
pub benchmark_iterations: usize,
#[cfg(feature = "metrics")]
#[serde(default = "default_server_metrics_port")]
pub server_metrics_port: String,
}

#[cfg(feature = "flightsql")]
Expand All @@ -266,6 +269,8 @@ impl Default for FlightSQLConfig {
Self {
connection_url: default_connection_url(),
benchmark_iterations: default_benchmark_iterations(),
#[cfg(feature = "metrics")]
server_metrics_port: default_server_metrics_port(),
}
}
}
Expand All @@ -275,6 +280,11 @@ pub fn default_connection_url() -> String {
"http://localhost:50051".to_string()
}

#[cfg(all(feature = "experimental-flightsql-server", feature = "metrics"))]
fn default_server_metrics_port() -> String {
"0.0.0.0:9000".to_string()
}

#[derive(Clone, Debug, Default, Deserialize)]
pub struct EditorConfig {
pub experimental_syntax_highlighting: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod config;
pub mod execution;
pub mod extensions;
#[cfg(feature = "experimental-flightsql-server")]
pub mod flightsql_server;
pub mod server;
pub mod telemetry;
pub mod test_utils;
pub mod tui;
65 changes: 29 additions & 36 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ use dft::cli::CliApp;
use dft::execution::flightsql::FlightSQLContext;
use dft::execution::{local::ExecutionContext, AppExecution, AppType};
#[cfg(feature = "experimental-flightsql-server")]
use dft::flightsql_server::{FlightSqlApp, FlightSqlServiceImpl};
use dft::server::FlightSqlApp;
use dft::telemetry;
use dft::tui::state::AppState;
use dft::tui::{state, App};
#[cfg(feature = "experimental-flightsql-server")]
use log::info;

#[allow(unused_mut)]
fn main() -> Result<()> {
let cli = DftArgs::parse();

if !cli.files.is_empty() || !cli.commands.is_empty() || cli.serve {
env_logger::init();
}

let state = state::initialize(cli.config_path());

// With Runtimes configured correctly the main Tokio runtime should only be used for network
Expand All @@ -53,8 +50,31 @@ fn main() -> Result<()> {
}

async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> {
#[cfg(feature = "experimental-flightsql-server")]
if cli.serve {
env_logger::init();
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
let state = state::initialize(cli.config_path());
let execution_ctx =
ExecutionContext::try_new(&state.config.execution, AppType::FlightSQLServer)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
let app_execution = AppExecution::new(execution_ctx);
let app = FlightSqlApp::try_new(
app_execution,
&cli.flightsql_host
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
&state.config.flightsql.server_metrics_port,
)
.await?;
app.run_app().await;
return Ok(());
}
// CLI mode: executing commands from files or CLI arguments
if !cli.files.is_empty() || !cli.commands.is_empty() {
env_logger::init();
let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Cli)?;
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
Expand All @@ -70,37 +90,10 @@ async fn app_entry_point(cli: DftArgs, state: AppState<'_>) -> Result<()> {
}
let app = CliApp::new(app_execution, cli.clone());
app.execute_files_or_commands().await?;
// FlightSQL Server mode: start a FlightSQL server
} else if cli.serve {
#[cfg(not(feature = "experimental-flightsql-server"))]
{
panic!("FlightSQL feature is not enabled");
}
#[cfg(feature = "experimental-flightsql-server")]
{
const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:50051";
info!("Starting FlightSQL server on {}", DEFAULT_SERVER_ADDRESS);
let state = state::initialize(cli.config_path());
let execution_ctx =
ExecutionContext::try_new(&state.config.execution, AppType::FlightSQLServer)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
let app_execution = AppExecution::new(execution_ctx);
let server = FlightSqlServiceImpl::new(app_execution);
let app = FlightSqlApp::new(
server.service(),
&cli.flightsql_host
.unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()),
)
.await;
app.run_app().await;
}
}
// TUI mode: running the TUI
else {
// use alternate logging for TUI
telemetry::initialize_logs()?;
// FlightSQL Server mode: start a FlightSQL server
} else {
// TUI mode: running the TUI
telemetry::initialize_logs()?; // use alternate logging for TUI
let state = state::initialize(cli.config_path());
let execution_ctx = ExecutionContext::try_new(&state.config.execution, AppType::Tui)?;
let app_execution = AppExecution::new(execution_ctx);
Expand Down
146 changes: 146 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

pub mod services;

use crate::execution::AppExecution;
use crate::test_utils::trailers_layer::TrailersLayer;
use color_eyre::Result;
use log::info;
use metrics::{describe_counter, describe_histogram};
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::task::JoinHandle;

const DEFAULT_TIMEOUT_SECONDS: u64 = 60;

fn initialize_metrics() {
describe_counter!("requests", "Incoming requests by FlightSQL endpoint");

describe_histogram!(
"get_flight_info_latency_ms",
metrics::Unit::Milliseconds,
"Get flight info latency ms"
);

describe_histogram!(
"do_get_fallback_latency_ms",
metrics::Unit::Milliseconds,
"Do get fallback latency ms"
)
}

/// Creates and manages a running FlightSqlServer with a background task
pub struct FlightSqlApp {
/// channel to send shutdown command
shutdown: Option<tokio::sync::oneshot::Sender<()>>,

/// Address the server is listening on
pub addr: SocketAddr,

/// handle for the server task
handle: Option<JoinHandle<Result<(), tonic::transport::Error>>>,
}

impl FlightSqlApp {
/// create a new app for the flightsql server
#[allow(dead_code)]
pub async fn try_new(
app_execution: AppExecution,
addr: &str,
metrics_addr: &str,
) -> Result<Self> {
let flightsql = services::flightsql::FlightSqlServiceImpl::new(app_execution);
// let OS choose a free port
let listener = TcpListener::bind(addr).await.unwrap();
let addr = listener.local_addr().unwrap();

// prepare the shutdown channel
let (tx, rx) = tokio::sync::oneshot::channel();

let server_timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECONDS);

let shutdown_future = async move {
rx.await.ok();
};

let serve_future = tonic::transport::Server::builder()
.timeout(server_timeout)
.layer(TrailersLayer)
.add_service(flightsql.service())
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
shutdown_future,
);

#[cfg(feature = "metrics")]
{
let builder = PrometheusBuilder::new();
let addr: SocketAddr = metrics_addr.parse()?;
info!("Listening to metrics on {addr}");
builder
.with_http_listener(addr)
.set_buckets_for_metric(
Matcher::Suffix("latency_ms".to_string()),
&[
1.0, 3.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 1000.0, 2500.0,
5000.0, 10000.0, 20000.0,
],
)?
.install()
.expect("failed to install metrics recorder/exporter");

initialize_metrics();
}

// Run the server in its own background task
let handle = tokio::task::spawn(serve_future);

let app = Self {
shutdown: Some(tx),
addr,
handle: Some(handle),
};
Ok(app)
}

/// Stops the server and waits for the server to shutdown
pub async fn shutdown_and_wait(mut self) {
if let Some(shutdown) = self.shutdown.take() {
shutdown.send(()).expect("server quit early");
}
if let Some(handle) = self.handle.take() {
handle
.await
.expect("task join error (panic?)")
.expect("Server Error found at shutdown");
}
}

pub async fn run_app(self) {
if let Some(handle) = self.handle {
handle
.await
.expect("Unable to run server task")
.expect("Server Error found at shutdown");
} else {
panic!("Server task not found");
}
}
}
Loading

0 comments on commit 6c9073a

Please sign in to comment.