Skip to content

Commit

Permalink
Add files bench
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Oct 15, 2024
1 parent 1600dd8 commit 838c5c1
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl CliApp {
#[cfg(feature = "flightsql")]
async fn flightsql_benchmark_files(&self, files: &[PathBuf]) -> Result<()> {
info!("Benchmarking FlightSQL files: {:?}", files);
for file in files {
let query = std::fs::read_to_string(file)?;
self.flightsql_benchmark_from_string(&query).await?;
}

Ok(())
}
Expand Down
201 changes: 201 additions & 0 deletions src/execution/benchmarks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`ExecutionContext`]: DataFusion based execution context for running SQL queries
use std::time::Duration;

/// Duration summary statistics
#[derive(Debug)]
pub struct LocalDurationsSummary {
pub min: Duration,
pub max: Duration,
pub mean: Duration,
pub median: Duration,
pub percent_of_total: f64,
}

impl std::fmt::Display for LocalDurationsSummary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Min: {:?}", self.min)?;
writeln!(f, "Max: {:?}", self.max)?;
writeln!(f, "Median: {:?}", self.median)?;
writeln!(f, "Mean: {:?} ({:.2}%)", self.mean, self.percent_of_total)
}
}

/// Contains stats for all runs of a benchmarked query and provides methods for aggregating
#[derive(Debug, Default)]
pub struct LocalBenchmarkStats {
query: String,
runs: usize,
logical_planning_durations: Vec<Duration>,
physical_planning_durations: Vec<Duration>,
execution_durations: Vec<Duration>,
total_durations: Vec<Duration>,
}

impl LocalBenchmarkStats {
pub fn new(
query: String,
logical_planning_durations: Vec<Duration>,
physical_planning_durations: Vec<Duration>,
execution_durations: Vec<Duration>,
total_durations: Vec<Duration>,
) -> Self {
let runs = logical_planning_durations.len();
Self {
query,
runs,
logical_planning_durations,
physical_planning_durations,
execution_durations,
total_durations,
}
}

fn summarize(&self, durations: &[Duration]) -> LocalDurationsSummary {
let mut sorted = durations.to_vec();
sorted.sort();
let len = sorted.len();
let min = *sorted.first().unwrap();
let max = *sorted.last().unwrap();
let mean = sorted.iter().sum::<Duration>() / len as u32;
let median = sorted[len / 2];
let this_total = durations.iter().map(|d| d.as_nanos()).sum::<u128>();
let total_duration = self
.total_durations
.iter()
.map(|d| d.as_nanos())
.sum::<u128>();
let percent_of_total = (this_total as f64 / total_duration as f64) * 100.0;
LocalDurationsSummary {
min,
max,
mean,
median,
percent_of_total,
}
}
}

impl std::fmt::Display for LocalBenchmarkStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
writeln!(f, "----------------------------")?;
writeln!(f, "Benchmark Stats ({} runs)", self.runs)?;
writeln!(f, "----------------------------")?;
writeln!(f, "{}", self.query)?;
writeln!(f, "----------------------------")?;

let logical_planning_summary = self.summarize(&self.logical_planning_durations);
writeln!(f, "Logical Planning")?;
writeln!(f, "{}", logical_planning_summary)?;

let physical_planning_summary = self.summarize(&self.physical_planning_durations);
writeln!(f, "Physical Planning")?;
writeln!(f, "{}", physical_planning_summary)?;

let execution_summary = self.summarize(&self.execution_durations);
writeln!(f, "Execution")?;
writeln!(f, "{}", execution_summary)?;

let total_summary = self.summarize(&self.total_durations);
writeln!(f, "Total")?;
writeln!(f, "{}", total_summary)
}
}

pub struct FlightSQLBenchmarkStats {
query: String,
runs: usize,
get_flight_info_durations: Vec<Duration>,
ttfb_durations: Vec<Duration>,
do_get_durations: Vec<Duration>,
total_durations: Vec<Duration>,
}

impl FlightSQLBenchmarkStats {
pub fn new(
query: String,
get_flight_info_durations: Vec<Duration>,
ttfb_durations: Vec<Duration>,
do_get_durations: Vec<Duration>,
total_durations: Vec<Duration>,
) -> Self {
let runs = get_flight_info_durations.len();
Self {
query,
runs,
get_flight_info_durations,
ttfb_durations,
do_get_durations,
total_durations,
}
}

fn summarize(&self, durations: &[Duration]) -> LocalDurationsSummary {
let mut sorted = durations.to_vec();
sorted.sort();
let len = sorted.len();
let min = *sorted.first().unwrap();
let max = *sorted.last().unwrap();
let mean = sorted.iter().sum::<Duration>() / len as u32;
let median = sorted[len / 2];
let this_total = durations.iter().map(|d| d.as_nanos()).sum::<u128>();
let total_duration = self
.total_durations
.iter()
.map(|d| d.as_nanos())
.sum::<u128>();
let percent_of_total = (this_total as f64 / total_duration as f64) * 100.0;
LocalDurationsSummary {
min,
max,
mean,
median,
percent_of_total,
}
}
}

impl std::fmt::Display for FlightSQLBenchmarkStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f)?;
writeln!(f, "----------------------------")?;
writeln!(f, "Benchmark Stats ({} runs)", self.runs)?;
writeln!(f, "----------------------------")?;
writeln!(f, "{}", self.query)?;
writeln!(f, "----------------------------")?;

let logical_planning_summary = self.summarize(&self.get_flight_info_durations);
writeln!(f, "Get Flight Info")?;
writeln!(f, "{}", logical_planning_summary)?;

let physical_planning_summary = self.summarize(&self.ttfb_durations);
writeln!(f, "Time to First Byte")?;
writeln!(f, "{}", physical_planning_summary)?;

let execution_summary = self.summarize(&self.do_get_durations);
writeln!(f, "Do Get")?;
writeln!(f, "{}", execution_summary)?;

let total_summary = self.summarize(&self.total_durations);
writeln!(f, "Total")?;
writeln!(f, "{}", total_summary)
}
}

0 comments on commit 838c5c1

Please sign in to comment.