Skip to content

Commit

Permalink
Add flightsql save bench results (datafusion-contrib#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner authored Nov 5, 2024
1 parent a4a275a commit 38d5c25
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 8 deletions.
2 changes: 2 additions & 0 deletions f.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total
SELECT 1,10,7,28,11,9,33.44,9,127,22,10,66.29,9,127,22,10,66.43,17,139,33,19,100.00
1 change: 1 addition & 0 deletions f.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 1
68 changes: 60 additions & 8 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ use std::error::Error;
use std::io::Write;
use std::path::{Path, PathBuf};
#[cfg(feature = "flightsql")]
use tonic::IntoRequest;
use {crate::execution::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::IntoRequest};

const DEFAULT_LOCAL_BENCHMARK_HEADER_ROW: &str =
const LOCAL_BENCHMARK_HEADER_ROW: &str =
"query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";

const FLIGHTSQL_BENCHMARK_HEADER_ROW: &str =
"query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";

/// Encapsulates the command line interface
pub struct CliApp {
/// Execution context for running queries
Expand Down Expand Up @@ -144,9 +147,34 @@ impl CliApp {
#[cfg(feature = "flightsql")]
async fn flightsql_benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
info!("Benchmarking FlightSQL files: {:?}", files);

let mut open_opts = std::fs::OpenOptions::new();
let mut results_file = if let Some(p) = &self.args.save {
if !p.exists() {
if let Some(parent) = p.parent() {
std::fs::DirBuilder::new().recursive(true).create(parent)?;
}
};
if self.args.append && p.exists() {
open_opts.append(true).create(true);
Some(open_opts.open(p)?)
} else {
open_opts.write(true).create(true).truncate(true);
let mut file = open_opts.open(p)?;
writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
Some(file)
}
} else {
None
};

for file in files {
let query = std::fs::read_to_string(file)?;
self.flightsql_benchmark_from_string(&query).await?;
let stats = self.flightsql_benchmark_from_string(&query).await?;
println!("{}", stats);
if let Some(ref mut results_file) = &mut results_file {
writeln!(results_file, "{}", stats.to_summary_csv_row())?
}
}

Ok(())
Expand Down Expand Up @@ -212,7 +240,7 @@ impl CliApp {
} else {
open_opts.write(true).create(true).truncate(true);
let mut file = open_opts.open(p)?;
writeln!(file, "{}", DEFAULT_LOCAL_BENCHMARK_HEADER_ROW)?;
writeln!(file, "{}", LOCAL_BENCHMARK_HEADER_ROW)?;
Some(file)
}
} else {
Expand Down Expand Up @@ -242,8 +270,33 @@ impl CliApp {
#[cfg(feature = "flightsql")]
async fn flightsql_benchmark_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
info!("Benchmark FlightSQL commands: {:?}", commands);

let mut open_opts = std::fs::OpenOptions::new();
let mut file = if let Some(p) = &self.args.save {
if !p.exists() {
if let Some(parent) = p.parent() {
std::fs::DirBuilder::new().recursive(true).create(parent)?;
}
};
if self.args.append && p.exists() {
open_opts.append(true).create(true);
Some(open_opts.open(p)?)
} else {
open_opts.write(true).create(true).truncate(true);
let mut file = open_opts.open(p)?;
writeln!(file, "{}", FLIGHTSQL_BENCHMARK_HEADER_ROW)?;
Some(file)
}
} else {
None
};

for command in commands {
self.flightsql_benchmark_from_string(command).await?;
let stats = self.flightsql_benchmark_from_string(command).await?;
println!("{}", stats);
if let Some(ref mut file) = &mut file {
writeln!(file, "{}", stats.to_summary_csv_row())?
}
}

Ok(())
Expand Down Expand Up @@ -284,14 +337,13 @@ impl CliApp {
}

#[cfg(feature = "flightsql")]
async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result<()> {
async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result<FlightSQLBenchmarkStats> {
let stats = self
.app_execution
.flightsql_ctx()
.benchmark_query(sql)
.await?;
println!("{}", stats);
Ok(())
Ok(stats)
}

/// run and execute SQL statements and commands from a file, against a context
Expand Down
21 changes: 21 additions & 0 deletions src/execution/flightsql_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ impl FlightSQLBenchmarkStats {
percent_of_total,
}
}

pub fn to_summary_csv_row(&self) -> String {
let mut csv = String::new();
let logical_planning_summary = self.summarize(&self.get_flight_info_durations);
let physical_planning_summary = self.summarize(&self.ttfb_durations);
let execution_summary = self.summarize(&self.do_get_durations);
let total_summary = self.summarize(&self.total_durations);

csv.push_str(&self.query);
csv.push(',');
csv.push_str(&self.runs.to_string());
csv.push(',');
csv.push_str(logical_planning_summary.to_csv_fields().as_str());
csv.push(',');
csv.push_str(physical_planning_summary.to_csv_fields().as_str());
csv.push(',');
csv.push_str(execution_summary.to_csv_fields().as_str());
csv.push(',');
csv.push_str(total_summary.to_csv_fields().as_str());
csv
}
}

impl std::fmt::Display for FlightSQLBenchmarkStats {
Expand Down
125 changes: 125 additions & 0 deletions tests/extension_cases/flightsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,128 @@ async fn test_custom_config_benchmark_iterations() {
assert.stdout(contains_str(expected));
fixture.shutdown_and_wait().await;
}

#[tokio::test]
pub async fn test_bench_command_and_save() {
let test_server = TestFlightSqlServiceImpl::new();
let fixture = TestFixture::new(test_server.service(), "127.0.0.1:50051").await;

let temp_dir = tempfile::tempdir().unwrap();
let file = temp_dir.path().join("results.csv");
let cloned = file.clone();
let assert = tokio::task::spawn_blocking(move || {
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg("SELECT 1")
.arg("--bench")
.arg("--flightsql")
.arg("--save")
.arg(cloned.to_str().unwrap())
.assert()
.success()
})
.await
.unwrap();

let expected = r##"
----------------------------
Benchmark Stats (10 runs)
----------------------------
SELECT 1
----------------------------"##;
assert.stdout(contains_str(expected));
assert!(file.exists());
fixture.shutdown_and_wait().await;
}

#[tokio::test]
pub async fn test_bench_files_and_save() {
let test_server = TestFlightSqlServiceImpl::new();
let fixture = TestFixture::new(test_server.service(), "127.0.0.1:50051").await;
let file = sql_in_file(r#"SELECT 1 + 1;"#);

let temp_dir = tempfile::tempdir().unwrap();
let results_file = temp_dir.path().join("results.csv");
let cloned = results_file.clone();
let assert = tokio::task::spawn_blocking(move || {
Command::cargo_bin("dft")
.unwrap()
.arg("-f")
.arg(file.path())
.arg("--bench")
.arg("--flightsql")
.arg("--save")
.arg(cloned.to_str().unwrap())
.assert()
.success()
})
.await
.unwrap();

let expected_err = r##"
----------------------------
Benchmark Stats (10 runs)
----------------------------
SELECT 1 + 1;
----------------------------"##;
assert.code(0).stdout(contains_str(expected_err));
assert!(results_file.exists());
fixture.shutdown_and_wait().await;
}

#[tokio::test]
pub async fn test_bench_command_and_save_then_append() {
let test_server = TestFlightSqlServiceImpl::new();
let fixture = TestFixture::new(test_server.service(), "127.0.0.1:50051").await;

let temp_dir = tempfile::tempdir().unwrap();
let file = temp_dir.path().join("results.csv");
let cloned = file.clone();
let assert = tokio::task::spawn_blocking(move || {
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg("SELECT 1")
.arg("--bench")
.arg("--flightsql")
.arg("--save")
.arg(cloned.to_str().unwrap())
.assert()
.success()
})
.await
.unwrap();

let expected = r##"
----------------------------
Benchmark Stats (10 runs)
----------------------------
SELECT 1
----------------------------"##;
assert.stdout(contains_str(expected));
assert!(file.exists());

let cloned_again = file.clone();
tokio::task::spawn_blocking(move || {
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg("SELECT 1")
.arg("--bench")
.arg("--flightsql")
.arg("--save")
.arg(cloned_again.to_str().unwrap())
.arg("--append")
.assert()
.success()
})
.await
.unwrap();

let contents = std::fs::read_to_string(file).unwrap();
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(3, lines.len());

fixture.shutdown_and_wait().await;
}

0 comments on commit 38d5c25

Please sign in to comment.