Skip to content

Commit

Permalink
Add --analyze param to CLI (datafusion-contrib#216)
Browse files Browse the repository at this point in the history
TLDR: Execution plan output from EXPLAIN ANALYZE tells a story of how a
query was executed and the goal of this command is to output a summary
of that story.

I am working on adding this feature because I often look at the output
of EXPLAIN ANALYZE and need to connect several dots myself to see what
is important which can be quite manual and then if you are trying to do
this across different queries or with different source data layouts you
end up redoing a lot of work.

The new analyze parameter generates output like the below which is
designed to make it easier to analyze queries and find hot spots (IO vs
Compute and a breakdown of the core parts for each) and / or
optimization opportunties. It is still very much a WIP (organization,
stats to shows, wording, etc) but it is now starting to take shape into
something close to what i envision the v1 version will look like.

IO stats will have some relevant metrics available for different io
execs (parquet, csv, ndjson, arrow, etc). Starting with just parquet for
now.

For compute stats i think im going to have 4 sections (projection,
filter, sort, join, and aggregate, and other which will include
everything else - getting more insight from others on potential execs to
extract would be interesting).

One of the open points is about how i aggregate stats on the same nodes
/ execs. Im probably going to just rollup details per exec for v1 but
you do lose some interesting information by doing that so ideally id
like to improve on that.

An example of using this could be benchmarking and analyzing parquet
files with different layouts (for example file sizes, row group sizes,
bloom filters, page sizes, etc). Ideally running this command and
comparing the results should make it easier to choose the optimal
layout.

Todo for v1
- [x] Selectivity based on rows
- [x] Selectivity based on bytes
- [x] Add "Other" compute stats
- [x] Add "Join" compute stats
- [x] Add "Aggregate" compute stats
- [x] Cleanup display (alignment, same header widths, fixed decimals,
etc)
- [x] ~For IO add % of total for scanning and opening~ Going to revisit
this
- [x] ~Add throughput to summary based on output bytes from IO layer.
Would be cool to do a pass that pulled actual file sizes that could be
used.~ Going to revisit this
- [x] Ratio of selectivity to pruning for parquet
- [x] Avg/median time to scan single row group (total scanning time /
row groups)
- [x] Update README


Future 
- identify nodes where some partitions are "hot" (i.e doing much more
compute than others)

<img width="805" alt="image"
src="https://github.com/user-attachments/assets/3a853ed6-8db6-4d23-91dd-96aa959d7936">
  • Loading branch information
matthewmturner authored Nov 8, 2024
1 parent 38d5c25 commit 0060fb2
Show file tree
Hide file tree
Showing 9 changed files with 875 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ Cargo.lock

# For local development of benchmarking
benchmark_results/

# For local query testing
queries/**
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,21 @@ Both of the commands above support the `--flightsql` parameter to run the SQL wi

The CLI can also run your configured DDL prior to executing the query by adding the `--run-ddl` parameter.

#### Benchmarking
#### Benchmarking Queries
You can benchmark queries by adding the `--bench` parameter. This will run the query a configurable number of times and output a breakdown of the queries execution time with summary statistics for each component of the query (logical planning, physical planning, execution time, and total time).

Optionally you can use the `--run-before` param to run a query before the benchmark is run. This is useful in cases where you want to hit a temp table or write a file to disk that your benchmark query will use.

To save benchmark results to a file use the `--save` parameter with a file path. Further, you can use the `--append` parameter to append to the file instead of overwriting it.

#### Analyze Queries

The output from `EXPLAIN ANALYZE` provides a wealth of information on a queries execution - however, the amount of information and connecting the dots can be difficult and manual. Further, there is detail in the `MetricSet`'s of the underlying `ExecutionPlan`'s that is lost in the output.

To help with this the `--analyze` flag can used to generate a summary of the underlying `ExecutionPlan` `MetricSet`s. The summary presents the information in a way that is hopefully easier to understand and easier to draw conclusions on a query's performance.

This feature is still in it's early stages and is expected to evolve. Once it has gone through enough real world testing and it has been confirmed the metrics make sense documentation will be added on the exact calculations - until then the source will need to be inspected to see the calculations.

## `dft` FlightSQL Server

The `dft` FlightSQL server (feature flag `experimental-flightsql-server`) is a Flight service that can be used to execute SQL queries against DataFusion. The server is started by running `dft --serve` and can optionally run your configured DDL with the `--run-ddl` parameter.
Expand Down
1 change: 0 additions & 1 deletion f.sql

This file was deleted.

6 changes: 6 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub struct DftArgs {
#[clap(long, short, help = "Benchmark the provided query")]
pub bench: bool,

#[clap(
long,
help = "Print a summary of the query's execution plan and statistics"
)]
pub analyze: bool,

#[clap(long, help = "Run the provided query before running the benchmark")]
pub run_before: Option<String>,

Expand Down
108 changes: 87 additions & 21 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use {crate::execution::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::Int
const LOCAL_BENCHMARK_HEADER_ROW: &str =
"query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";

#[cfg(feature = "flightsql")]
const FLIGHTSQL_BENCHMARK_HEADER_ROW: &str =
"query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total";

Expand Down Expand Up @@ -66,45 +67,81 @@ impl CliApp {
self.args.commands.is_empty(),
self.args.flightsql,
self.args.bench,
self.args.analyze,
) {
(_, _, true, _) => Err(eyre!(
// Error cases
(_, _, true, _, _) => Err(eyre!(
"FLightSQL feature isn't enabled. Reinstall `dft` with `--features=flightsql`"
)),
(true, true, _, _) => Err(eyre!("No files or commands provided to execute")),
(false, true, _, false) => self.execute_files(&self.args.files).await,
(false, true, _, true) => self.benchmark_files(&self.args.files).await,
(true, false, _, false) => self.execute_commands(&self.args.commands).await,
(true, false, _, true) => self.benchmark_commands(&self.args.commands).await,
(false, false, _, false) => Err(eyre!(
(false, false, false, true, _) => {
Err(eyre!("Cannot benchmark without a command or file"))
}
(true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
(false, false, _, false, _) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
(false, false, false, true) => Err(eyre!("Cannot benchmark without a command or file")),
(_, _, false, true, true) => Err(eyre!(
"The `benchmark` and `analyze` flags are mutually exclusive"
)),

// Execution cases
(false, true, _, false, false) => self.execute_files(&self.args.files).await,
(true, false, _, false, false) => self.execute_commands(&self.args.commands).await,

// Benchmark cases
(false, true, _, true, false) => self.benchmark_files(&self.args.files).await,
(true, false, _, true, false) => self.benchmark_commands(&self.args.commands).await,

// Analyze cases
(false, true, _, false, true) => self.analyze_files(&self.args.files).await,
(true, false, _, false, true) => self.analyze_commands(&self.args.commands).await,
}
#[cfg(feature = "flightsql")]
match (
self.args.files.is_empty(),
self.args.commands.is_empty(),
self.args.flightsql,
self.args.bench,
self.args.analyze,
) {
(true, true, _, _) => Err(eyre!("No files or commands provided to execute")),
(false, true, true, false) => self.flightsql_execute_files(&self.args.files).await,
(false, true, true, true) => self.flightsql_benchmark_files(&self.args.files).await,
(false, true, false, false) => self.execute_files(&self.args.files).await,
(false, true, false, true) => self.benchmark_files(&self.args.files).await,
// Error cases
(true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")),
(false, false, false, true, _) => {
Err(eyre!("Cannot benchmark without a command or file"))
}
(false, false, _, _, _) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
(_, _, _, true, true) => Err(eyre!(
"The `benchmark` and `analyze` flags are mutually exclusive"
)),
(_, _, true, false, true) => Err(eyre!(
"The `analyze` flag is not currently supported with FlightSQL"
)),

(true, false, true, false) => {
// Execution cases
(true, false, false, false, false) => self.execute_commands(&self.args.commands).await,
(false, true, false, false, false) => self.execute_files(&self.args.files).await,
(false, true, true, false, false) => {
self.flightsql_execute_files(&self.args.files).await
}
(true, false, true, false, false) => {
self.flightsql_execute_commands(&self.args.commands).await
}
(true, false, true, true) => {

// Benchmark cases
(false, true, false, true, false) => self.benchmark_files(&self.args.files).await,
(false, true, true, true, false) => {
self.flightsql_benchmark_files(&self.args.files).await
}
(true, false, true, true, false) => {
self.flightsql_benchmark_commands(&self.args.commands).await
}
(true, false, false, false) => self.execute_commands(&self.args.commands).await,
(true, false, false, true) => self.benchmark_commands(&self.args.commands).await,
(false, false, false, true) => Err(eyre!("Cannot benchmark without a command or file")),
(false, false, _, _) => Err(eyre!(
"Cannot execute both files and commands at the same time"
)),
(true, false, false, true, false) => self.benchmark_commands(&self.args.commands).await,

// Analyze cases
(true, false, false, false, true) => self.analyze_commands(&self.args.commands).await,
(false, true, false, false, true) => self.analyze_files(&self.args.files).await,
}
}

Expand Down Expand Up @@ -133,6 +170,15 @@ impl CliApp {
Ok(())
}

async fn analyze_files(&self, files: &[PathBuf]) -> Result<()> {
info!("Analyzing files: {:?}", files);
for file in files {
let query = std::fs::read_to_string(file)?;
self.analyze_from_string(&query).await?;
}
Ok(())
}

#[cfg(feature = "flightsql")]
async fn flightsql_execute_files(&self, files: &[PathBuf]) -> color_eyre::Result<()> {
info!("Executing FlightSQL files: {:?}", files);
Expand Down Expand Up @@ -257,6 +303,15 @@ impl CliApp {
Ok(())
}

async fn analyze_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
info!("Analyzing commands: {:?}", commands);
for command in commands {
self.analyze_from_string(command).await?;
}

Ok(())
}

#[cfg(feature = "flightsql")]
async fn flightsql_execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> {
info!("Executing FlightSQL commands: {:?}", commands);
Expand Down Expand Up @@ -336,6 +391,17 @@ impl CliApp {
Ok(stats)
}

async fn analyze_from_string(&self, sql: &str) -> Result<()> {
let mut stats = self
.app_execution
.execution_ctx()
.analyze_query(sql)
.await?;
stats.collect_stats();
println!("{}", stats);
Ok(())
}

#[cfg(feature = "flightsql")]
async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result<FlightSQLBenchmarkStats> {
let stats = self
Expand Down
58 changes: 58 additions & 0 deletions src/execution/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use datafusion::sql::parser::{DFParser, Statement};
use tokio_stream::StreamExt;

use super::local_benchmarks::LocalBenchmarkStats;
use super::stats::{ExecutionDurationStats, ExecutionStats};
use super::AppType;

/// Structure for executing queries locally
Expand Down Expand Up @@ -220,6 +221,10 @@ impl ExecutionContext {
if statement.trim().is_empty() {
continue;
}
if statement.trim().starts_with("--") {
continue;
}

debug!("Executing DDL statement: {:?}", statement);
match self.execute_sql_and_discard_results(statement).await {
Ok(_) => {
Expand Down Expand Up @@ -292,4 +297,57 @@ impl ExecutionContext {
total_durations,
))
}

pub async fn analyze_query(&self, query: &str) -> Result<ExecutionStats> {
let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {};
let start = std::time::Instant::now();
let statements = DFParser::parse_sql_with_dialect(query, &dialect)?;
let parsing_duration = start.elapsed();
if statements.len() == 1 {
let statement = statements[0].clone();
let logical_plan = self
.session_ctx()
.state()
.statement_to_plan(statement.clone())
.await?;
let logical_planning_duration = start.elapsed();
let physical_plan = self
.session_ctx()
.state()
.create_physical_plan(&logical_plan)
.await?;
let physical_planning_duration = start.elapsed();
let task_ctx = self.session_ctx().task_ctx();
let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx)?;
let mut rows = 0;
let mut batches = 0;
let mut bytes = 0;
while let Some(b) = stream.next().await {
let batch = b?;
rows += batch.num_rows();
batches += 1;
bytes += batch.get_array_memory_size();
}
let execution_duration = start.elapsed();
let durations = ExecutionDurationStats::new(
parsing_duration,
logical_planning_duration - parsing_duration,
physical_planning_duration - logical_planning_duration,
execution_duration - physical_planning_duration,
start.elapsed(),
);
ExecutionStats::try_new(
query.to_string(),
durations,
rows,
batches,
bytes,
physical_plan,
)
} else {
Err(eyre::eyre!("Only a single statement can be benchmarked"))
}

// Ok(())
}
}
4 changes: 2 additions & 2 deletions src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ pub mod flightsql;
pub mod flightsql_benchmarks;
pub mod local;
pub mod sql_utils;
pub mod stats;

pub mod local_benchmarks;
mod stats;

pub use stats::{collect_plan_stats, ExecutionStats};
pub use stats::{collect_plan_io_stats, ExecutionStats};

#[cfg(feature = "flightsql")]
use self::flightsql::{FlightSQLClient, FlightSQLContext};
Expand Down
Loading

0 comments on commit 0060fb2

Please sign in to comment.