From 0060fb204cdcf33d5d36636c4307a90ef32b8e1e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 8 Nov 2024 08:04:08 -0500 Subject: [PATCH] Add `--analyze` param to CLI (#216) TLDR: Execution plan output from EXPLAIN ANALYZE tells a story of how a query was executed and the goal of this command is to output a summary of that story. I am working on adding this feature because I often look at the output of EXPLAIN ANALYZE and need to connect several dots myself to see what is important which can be quite manual and then if you are trying to do this across different queries or with different source data layouts you end up redoing a lot of work. The new analyze parameter generates output like the below which is designed to make it easier to analyze queries and find hot spots (IO vs Compute and a breakdown of the core parts for each) and / or optimization opportunties. It is still very much a WIP (organization, stats to shows, wording, etc) but it is now starting to take shape into something close to what i envision the v1 version will look like. IO stats will have some relevant metrics available for different io execs (parquet, csv, ndjson, arrow, etc). Starting with just parquet for now. For compute stats i think im going to have 4 sections (projection, filter, sort, join, and aggregate, and other which will include everything else - getting more insight from others on potential execs to extract would be interesting). One of the open points is about how i aggregate stats on the same nodes / execs. Im probably going to just rollup details per exec for v1 but you do lose some interesting information by doing that so ideally id like to improve on that. An example of using this could be benchmarking and analyzing parquet files with different layouts (for example file sizes, row group sizes, bloom filters, page sizes, etc). Ideally running this command and comparing the results should make it easier to choose the optimal layout. Todo for v1 - [x] Selectivity based on rows - [x] Selectivity based on bytes - [x] Add "Other" compute stats - [x] Add "Join" compute stats - [x] Add "Aggregate" compute stats - [x] Cleanup display (alignment, same header widths, fixed decimals, etc) - [x] ~For IO add % of total for scanning and opening~ Going to revisit this - [x] ~Add throughput to summary based on output bytes from IO layer. Would be cool to do a pass that pulled actual file sizes that could be used.~ Going to revisit this - [x] Ratio of selectivity to pruning for parquet - [x] Avg/median time to scan single row group (total scanning time / row groups) - [x] Update README Future - identify nodes where some partitions are "hot" (i.e doing much more compute than others) image --- .gitignore | 3 + README.md | 10 +- f.sql | 1 - src/args.rs | 6 + src/cli/mod.rs | 108 ++++- src/execution/local.rs | 58 +++ src/execution/mod.rs | 4 +- src/execution/stats.rs | 738 ++++++++++++++++++++++++++++++++-- src/tui/state/tabs/history.rs | 2 +- 9 files changed, 875 insertions(+), 55 deletions(-) delete mode 100644 f.sql diff --git a/.gitignore b/.gitignore index f4b34b8..f5b7dae 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,6 @@ Cargo.lock # For local development of benchmarking benchmark_results/ + +# For local query testing +queries/** diff --git a/README.md b/README.md index e866e16..f457cb2 100644 --- a/README.md +++ b/README.md @@ -98,13 +98,21 @@ Both of the commands above support the `--flightsql` parameter to run the SQL wi The CLI can also run your configured DDL prior to executing the query by adding the `--run-ddl` parameter. -#### Benchmarking +#### Benchmarking Queries You can benchmark queries by adding the `--bench` parameter. This will run the query a configurable number of times and output a breakdown of the queries execution time with summary statistics for each component of the query (logical planning, physical planning, execution time, and total time). Optionally you can use the `--run-before` param to run a query before the benchmark is run. This is useful in cases where you want to hit a temp table or write a file to disk that your benchmark query will use. To save benchmark results to a file use the `--save` parameter with a file path. Further, you can use the `--append` parameter to append to the file instead of overwriting it. +#### Analyze Queries + +The output from `EXPLAIN ANALYZE` provides a wealth of information on a queries execution - however, the amount of information and connecting the dots can be difficult and manual. Further, there is detail in the `MetricSet`'s of the underlying `ExecutionPlan`'s that is lost in the output. + +To help with this the `--analyze` flag can used to generate a summary of the underlying `ExecutionPlan` `MetricSet`s. The summary presents the information in a way that is hopefully easier to understand and easier to draw conclusions on a query's performance. + +This feature is still in it's early stages and is expected to evolve. Once it has gone through enough real world testing and it has been confirmed the metrics make sense documentation will be added on the exact calculations - until then the source will need to be inspected to see the calculations. + ## `dft` FlightSQL Server The `dft` FlightSQL server (feature flag `experimental-flightsql-server`) is a Flight service that can be used to execute SQL queries against DataFusion. The server is started by running `dft --serve` and can optionally run your configured DDL with the `--run-ddl` parameter. diff --git a/f.sql b/f.sql deleted file mode 100644 index 2e3761f..0000000 --- a/f.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT 1 diff --git a/src/args.rs b/src/args.rs index ae93b07..a329bce 100644 --- a/src/args.rs +++ b/src/args.rs @@ -73,6 +73,12 @@ pub struct DftArgs { #[clap(long, short, help = "Benchmark the provided query")] pub bench: bool, + #[clap( + long, + help = "Print a summary of the query's execution plan and statistics" + )] + pub analyze: bool, + #[clap(long, help = "Run the provided query before running the benchmark")] pub run_before: Option, diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 8239d81..db95adb 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -34,6 +34,7 @@ use {crate::execution::flightsql_benchmarks::FlightSQLBenchmarkStats, tonic::Int const LOCAL_BENCHMARK_HEADER_ROW: &str = "query,runs,logical_planning_min,logical_planning_max,logical_planning_mean,logical_planning_median,logical_planning_percent_of_total,physical_planning_min,physical_planning_max,physical_planning,mean,physical_planning_median,physical_planning_percent_of_total,execution_min,execution_max,execution_execution_mean,execution_median,execution_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total"; +#[cfg(feature = "flightsql")] const FLIGHTSQL_BENCHMARK_HEADER_ROW: &str = "query,runs,get_flight_info_min,get_flight_info_max,get_flight_info_mean,get_flight_info_median,get_flight_info_percent_of_total,ttfb_min,ttfb_max,ttfb,mean,ttfb_median,ttfb_percent_of_total,do_get_min,do_get_max,do_get_mean,do_get_median,do_get_percent_of_total,total_min,total_max,total_mean,total_median,total_percent_of_total"; @@ -66,19 +67,34 @@ impl CliApp { self.args.commands.is_empty(), self.args.flightsql, self.args.bench, + self.args.analyze, ) { - (_, _, true, _) => Err(eyre!( + // Error cases + (_, _, true, _, _) => Err(eyre!( "FLightSQL feature isn't enabled. Reinstall `dft` with `--features=flightsql`" )), - (true, true, _, _) => Err(eyre!("No files or commands provided to execute")), - (false, true, _, false) => self.execute_files(&self.args.files).await, - (false, true, _, true) => self.benchmark_files(&self.args.files).await, - (true, false, _, false) => self.execute_commands(&self.args.commands).await, - (true, false, _, true) => self.benchmark_commands(&self.args.commands).await, - (false, false, _, false) => Err(eyre!( + (false, false, false, true, _) => { + Err(eyre!("Cannot benchmark without a command or file")) + } + (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")), + (false, false, _, false, _) => Err(eyre!( "Cannot execute both files and commands at the same time" )), - (false, false, false, true) => Err(eyre!("Cannot benchmark without a command or file")), + (_, _, false, true, true) => Err(eyre!( + "The `benchmark` and `analyze` flags are mutually exclusive" + )), + + // Execution cases + (false, true, _, false, false) => self.execute_files(&self.args.files).await, + (true, false, _, false, false) => self.execute_commands(&self.args.commands).await, + + // Benchmark cases + (false, true, _, true, false) => self.benchmark_files(&self.args.files).await, + (true, false, _, true, false) => self.benchmark_commands(&self.args.commands).await, + + // Analyze cases + (false, true, _, false, true) => self.analyze_files(&self.args.files).await, + (true, false, _, false, true) => self.analyze_commands(&self.args.commands).await, } #[cfg(feature = "flightsql")] match ( @@ -86,25 +102,46 @@ impl CliApp { self.args.commands.is_empty(), self.args.flightsql, self.args.bench, + self.args.analyze, ) { - (true, true, _, _) => Err(eyre!("No files or commands provided to execute")), - (false, true, true, false) => self.flightsql_execute_files(&self.args.files).await, - (false, true, true, true) => self.flightsql_benchmark_files(&self.args.files).await, - (false, true, false, false) => self.execute_files(&self.args.files).await, - (false, true, false, true) => self.benchmark_files(&self.args.files).await, + // Error cases + (true, true, _, _, _) => Err(eyre!("No files or commands provided to execute")), + (false, false, false, true, _) => { + Err(eyre!("Cannot benchmark without a command or file")) + } + (false, false, _, _, _) => Err(eyre!( + "Cannot execute both files and commands at the same time" + )), + (_, _, _, true, true) => Err(eyre!( + "The `benchmark` and `analyze` flags are mutually exclusive" + )), + (_, _, true, false, true) => Err(eyre!( + "The `analyze` flag is not currently supported with FlightSQL" + )), - (true, false, true, false) => { + // Execution cases + (true, false, false, false, false) => self.execute_commands(&self.args.commands).await, + (false, true, false, false, false) => self.execute_files(&self.args.files).await, + (false, true, true, false, false) => { + self.flightsql_execute_files(&self.args.files).await + } + (true, false, true, false, false) => { self.flightsql_execute_commands(&self.args.commands).await } - (true, false, true, true) => { + + // Benchmark cases + (false, true, false, true, false) => self.benchmark_files(&self.args.files).await, + (false, true, true, true, false) => { + self.flightsql_benchmark_files(&self.args.files).await + } + (true, false, true, true, false) => { self.flightsql_benchmark_commands(&self.args.commands).await } - (true, false, false, false) => self.execute_commands(&self.args.commands).await, - (true, false, false, true) => self.benchmark_commands(&self.args.commands).await, - (false, false, false, true) => Err(eyre!("Cannot benchmark without a command or file")), - (false, false, _, _) => Err(eyre!( - "Cannot execute both files and commands at the same time" - )), + (true, false, false, true, false) => self.benchmark_commands(&self.args.commands).await, + + // Analyze cases + (true, false, false, false, true) => self.analyze_commands(&self.args.commands).await, + (false, true, false, false, true) => self.analyze_files(&self.args.files).await, } } @@ -133,6 +170,15 @@ impl CliApp { Ok(()) } + async fn analyze_files(&self, files: &[PathBuf]) -> Result<()> { + info!("Analyzing files: {:?}", files); + for file in files { + let query = std::fs::read_to_string(file)?; + self.analyze_from_string(&query).await?; + } + Ok(()) + } + #[cfg(feature = "flightsql")] async fn flightsql_execute_files(&self, files: &[PathBuf]) -> color_eyre::Result<()> { info!("Executing FlightSQL files: {:?}", files); @@ -257,6 +303,15 @@ impl CliApp { Ok(()) } + async fn analyze_commands(&self, commands: &[String]) -> color_eyre::Result<()> { + info!("Analyzing commands: {:?}", commands); + for command in commands { + self.analyze_from_string(command).await?; + } + + Ok(()) + } + #[cfg(feature = "flightsql")] async fn flightsql_execute_commands(&self, commands: &[String]) -> color_eyre::Result<()> { info!("Executing FlightSQL commands: {:?}", commands); @@ -336,6 +391,17 @@ impl CliApp { Ok(stats) } + async fn analyze_from_string(&self, sql: &str) -> Result<()> { + let mut stats = self + .app_execution + .execution_ctx() + .analyze_query(sql) + .await?; + stats.collect_stats(); + println!("{}", stats); + Ok(()) + } + #[cfg(feature = "flightsql")] async fn flightsql_benchmark_from_string(&self, sql: &str) -> Result { let stats = self diff --git a/src/execution/local.rs b/src/execution/local.rs index c1737e2..a84d76c 100644 --- a/src/execution/local.rs +++ b/src/execution/local.rs @@ -33,6 +33,7 @@ use datafusion::sql::parser::{DFParser, Statement}; use tokio_stream::StreamExt; use super::local_benchmarks::LocalBenchmarkStats; +use super::stats::{ExecutionDurationStats, ExecutionStats}; use super::AppType; /// Structure for executing queries locally @@ -220,6 +221,10 @@ impl ExecutionContext { if statement.trim().is_empty() { continue; } + if statement.trim().starts_with("--") { + continue; + } + debug!("Executing DDL statement: {:?}", statement); match self.execute_sql_and_discard_results(statement).await { Ok(_) => { @@ -292,4 +297,57 @@ impl ExecutionContext { total_durations, )) } + + pub async fn analyze_query(&self, query: &str) -> Result { + let dialect = datafusion::sql::sqlparser::dialect::GenericDialect {}; + let start = std::time::Instant::now(); + let statements = DFParser::parse_sql_with_dialect(query, &dialect)?; + let parsing_duration = start.elapsed(); + if statements.len() == 1 { + let statement = statements[0].clone(); + let logical_plan = self + .session_ctx() + .state() + .statement_to_plan(statement.clone()) + .await?; + let logical_planning_duration = start.elapsed(); + let physical_plan = self + .session_ctx() + .state() + .create_physical_plan(&logical_plan) + .await?; + let physical_planning_duration = start.elapsed(); + let task_ctx = self.session_ctx().task_ctx(); + let mut stream = execute_stream(Arc::clone(&physical_plan), task_ctx)?; + let mut rows = 0; + let mut batches = 0; + let mut bytes = 0; + while let Some(b) = stream.next().await { + let batch = b?; + rows += batch.num_rows(); + batches += 1; + bytes += batch.get_array_memory_size(); + } + let execution_duration = start.elapsed(); + let durations = ExecutionDurationStats::new( + parsing_duration, + logical_planning_duration - parsing_duration, + physical_planning_duration - logical_planning_duration, + execution_duration - physical_planning_duration, + start.elapsed(), + ); + ExecutionStats::try_new( + query.to_string(), + durations, + rows, + batches, + bytes, + physical_plan, + ) + } else { + Err(eyre::eyre!("Only a single statement can be benchmarked")) + } + + // Ok(()) + } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 7b20f0b..90b7c8d 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -21,11 +21,11 @@ pub mod flightsql; pub mod flightsql_benchmarks; pub mod local; pub mod sql_utils; +pub mod stats; pub mod local_benchmarks; -mod stats; -pub use stats::{collect_plan_stats, ExecutionStats}; +pub use stats::{collect_plan_io_stats, ExecutionStats}; #[cfg(feature = "flightsql")] use self::flightsql::{FlightSQLClient, FlightSQLContext}; diff --git a/src/execution/stats.rs b/src/execution/stats.rs index bff4f29..580c03a 100644 --- a/src/execution/stats.rs +++ b/src/execution/stats.rs @@ -15,63 +15,743 @@ // specific language governing permissions and limitations // under the License. -use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; -use log::info; -use std::sync::Arc; +use datafusion::{ + datasource::physical_plan::ParquetExec, + physical_plan::{ + aggregates::AggregateExec, + filter::FilterExec, + joins::{ + CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, + SymmetricHashJoinExec, + }, + metrics::MetricValue, + projection::ProjectionExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, + visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, + }, +}; +use itertools::Itertools; +use std::{sync::Arc, time::Duration}; #[derive(Clone, Debug)] pub struct ExecutionStats { - bytes_scanned: usize, - // exec_metrics: Vec, + query: String, + rows: usize, + batches: i32, + bytes: usize, + durations: ExecutionDurationStats, + io: Option, + compute: Option, + plan: Arc, } impl ExecutionStats { - pub fn bytes_scanned(&self) -> usize { - self.bytes_scanned + pub fn try_new( + query: String, + durations: ExecutionDurationStats, + rows: usize, + batches: i32, + bytes: usize, + plan: Arc, + ) -> color_eyre::Result { + Ok(Self { + query, + durations, + rows, + batches, + bytes, + plan, + io: None, + compute: None, + }) + } + + pub fn collect_stats(&mut self) { + if let Some(io) = collect_plan_io_stats(Arc::clone(&self.plan)) { + self.io = Some(io) + } + if let Some(compute) = collect_plan_compute_stats(Arc::clone(&self.plan)) { + self.compute = Some(compute) + } + } + + pub fn rows_selectivity(&self) -> f64 { + let maybe_io_output_rows = self.io.as_ref().and_then(|io| io.parquet_output_rows); + if let Some(io_output_rows) = maybe_io_output_rows { + self.rows as f64 / io_output_rows as f64 + } else { + 0.0 + } + } + + pub fn bytes_selectivity(&self) -> f64 { + let maybe_io_output_bytes = self.io.as_ref().and_then(|io| io.bytes_scanned.clone()); + if let Some(io_output_bytes) = maybe_io_output_bytes { + self.bytes as f64 / io_output_bytes.as_usize() as f64 + } else { + 0.0 + } + } + + pub fn selectivity_efficiency(&self) -> f64 { + if let Some(io) = &self.io { + io.parquet_rg_pruned_stats_ratio() / self.rows_selectivity() + } else { + 0.0 + } } } -#[derive(Default)] -struct PlanVisitor { - total_bytes_scanned: usize, - // exec_metrics: Vec, +impl std::fmt::Display for ExecutionStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "========================= Query ===========================" + )?; + writeln!(f, "{}", self.query)?; + writeln!( + f, + "==================== Execution Summary ====================" + )?; + writeln!( + f, + "{:<20} {:<20} {:<20}", + "Output Rows (%)", "Output Bytes (%)", "Batches Processed", + )?; + writeln!( + f, + "{:<20} {:<20} {:<20}", + format!("{} ({:.2})", self.rows, self.rows_selectivity()), + format!("{} ({:.2})", self.bytes, self.bytes_selectivity()), + self.batches, + )?; + writeln!(f)?; + writeln!(f, "{}", self.durations)?; + writeln!(f, "{:<20}", "Parquet Efficiency (Pruning / Selectivity)")?; + writeln!(f, "{:<20.2}", self.selectivity_efficiency())?; + writeln!(f)?; + if let Some(io_stats) = &self.io { + writeln!(f, "{}", io_stats)?; + }; + if let Some(compute_stats) = &self.compute { + writeln!(f, "{}", compute_stats)?; + }; + Ok(()) + } } -impl From for ExecutionStats { - fn from(value: PlanVisitor) -> Self { +#[derive(Clone, Debug)] +pub struct ExecutionDurationStats { + parsing: Duration, + logical_planning: Duration, + physical_planning: Duration, + execution: Duration, + total: Duration, +} + +impl ExecutionDurationStats { + pub fn new( + parsing: Duration, + logical_planning: Duration, + physical_planning: Duration, + execution: Duration, + total: Duration, + ) -> Self { Self { - bytes_scanned: value.total_bytes_scanned, + parsing, + logical_planning, + physical_planning, + execution, + total, + } + } +} + +impl std::fmt::Display for ExecutionDurationStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "{:<20} {:<20} {:<20}", + "Parsing", "Logical Planning", "Physical Planning" + )?; + writeln!( + f, + "{:<20?} {:<20?} {:<20?}", + self.parsing, self.logical_planning, self.physical_planning + )?; + writeln!(f)?; + writeln!(f, "{:<20} {:<20}", "Execution", "Total",)?; + writeln!(f, "{:<20?} {:<20?}", self.execution, self.total)?; + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct ExecutionIOStats { + bytes_scanned: Option, + time_opening: Option, + time_scanning: Option, + parquet_output_rows: Option, + parquet_pruned_page_index: Option, + parquet_matched_page_index: Option, + parquet_rg_pruned_stats: Option, + parquet_rg_matched_stats: Option, + parquet_rg_pruned_bloom_filter: Option, + parquet_rg_matched_bloom_filter: Option, +} + +impl ExecutionIOStats { + fn parquet_rg_pruned_stats_ratio(&self) -> f64 { + if let (Some(pruned), Some(matched)) = ( + self.parquet_rg_matched_stats.as_ref(), + self.parquet_rg_pruned_stats.as_ref(), + ) { + let pruned = pruned.as_usize() as f64; + let matched = matched.as_usize() as f64; + matched / (pruned + matched) + } else { + 0.0 + } + } + + fn parquet_rg_pruned_bloom_filter_ratio(&self) -> f64 { + if let (Some(pruned), Some(matched)) = ( + self.parquet_rg_matched_bloom_filter.as_ref(), + self.parquet_rg_pruned_bloom_filter.as_ref(), + ) { + let pruned = pruned.as_usize() as f64; + let matched = matched.as_usize() as f64; + matched / (pruned + matched) + } else { + 0.0 + } + } + + fn parquet_rg_pruned_page_index_ratio(&self) -> f64 { + if let (Some(pruned), Some(matched)) = ( + self.parquet_matched_page_index.as_ref(), + self.parquet_pruned_page_index.as_ref(), + ) { + let pruned = pruned.as_usize() as f64; + let matched = matched.as_usize() as f64; + matched / (pruned + matched) + } else { + 0.0 + } + } + + fn row_group_count(&self) -> usize { + if let (Some(pruned), Some(matched)) = ( + self.parquet_rg_matched_stats.as_ref(), + self.parquet_rg_pruned_stats.as_ref(), + ) { + let pruned = pruned.as_usize(); + let matched = matched.as_usize(); + pruned + matched + } else { + 0 } } } -impl ExecutionPlanVisitor for PlanVisitor { +impl std::fmt::Display for ExecutionIOStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "======================= IO Summary ========================" + )?; + writeln!( + f, + "{:<20} {:<20} {:<20}", + "Bytes Scanned", "Time Opening", "Time Scanning" + )?; + writeln!( + f, + "{:<20} {:<20} {:<20}", + self.bytes_scanned + .as_ref() + .map(|m| m.to_string()) + .unwrap_or("None".to_string()), + self.time_opening + .as_ref() + .map(|m| m.to_string()) + .unwrap_or("None".to_string()), + self.time_scanning + .as_ref() + .map(|m| m.to_string()) + .unwrap_or("None".to_string()) + )?; + writeln!(f)?; + writeln!( + f, + "Parquet Pruning Stats (Output Rows: {}, Row Groups: {} [{}ms per row group])", + self.parquet_output_rows + .as_ref() + .map(|m| m.to_string()) + .unwrap_or("None".to_string()), + self.row_group_count(), + self.time_scanning + .as_ref() + .map(|ts| format!( + "{:.2}", + (ts.as_usize() / 1_000_000) as f64 / self.row_group_count() as f64 + )) + .unwrap_or("None".to_string()) + )?; + writeln!( + f, + "{:<20} {:<20} {:<20}", + "Matched RG Stats %", "Matched RG Bloom %", "Matched Page Index %" + )?; + writeln!( + f, + "{:<20.2} {:<20.2} {:<20.2}", + self.parquet_rg_pruned_stats_ratio(), + self.parquet_rg_pruned_bloom_filter_ratio(), + self.parquet_rg_pruned_page_index_ratio() + )?; + Ok(()) + } +} + +/// Visitor to collect IO metrics from an execution plan +/// +/// IO metrics are collected from nodes that perform IO operations, such as +/// `CsvExec`, `ParquetExec`, and `ArrowExec`. +struct PlanIOVisitor { + bytes_scanned: Option, + time_opening: Option, + time_scanning: Option, + parquet_output_rows: Option, + parquet_pruned_page_index: Option, + parquet_matched_page_index: Option, + parquet_rg_pruned_stats: Option, + parquet_rg_matched_stats: Option, + parquet_rg_pruned_bloom_filter: Option, + parquet_rg_matched_bloom_filter: Option, +} + +impl PlanIOVisitor { + fn new() -> Self { + Self { + bytes_scanned: None, + time_opening: None, + time_scanning: None, + parquet_output_rows: None, + parquet_pruned_page_index: None, + parquet_matched_page_index: None, + parquet_rg_pruned_stats: None, + parquet_rg_matched_stats: None, + parquet_rg_pruned_bloom_filter: None, + parquet_rg_matched_bloom_filter: None, + } + } + + fn collect_io_metrics(&mut self, plan: &dyn ExecutionPlan) { + let io_metrics = plan.metrics(); + if let Some(metrics) = io_metrics { + self.bytes_scanned = metrics.sum_by_name("bytes_scanned"); + self.time_opening = metrics.sum_by_name("time_elapsed_opening"); + self.time_scanning = metrics.sum_by_name("time_elapsed_scanning_total"); + + if plan.as_any().downcast_ref::().is_some() { + self.parquet_output_rows = metrics.output_rows(); + self.parquet_rg_pruned_stats = metrics.sum_by_name("row_groups_pruned_statistics"); + self.parquet_rg_matched_stats = + metrics.sum_by_name("row_groups_matched_statistics"); + } + } + } +} + +impl From for ExecutionIOStats { + fn from(value: PlanIOVisitor) -> Self { + Self { + bytes_scanned: value.bytes_scanned, + time_opening: value.time_opening, + time_scanning: value.time_scanning, + parquet_output_rows: value.parquet_output_rows, + parquet_pruned_page_index: value.parquet_pruned_page_index, + parquet_matched_page_index: value.parquet_matched_page_index, + parquet_rg_pruned_stats: value.parquet_rg_pruned_stats, + parquet_rg_matched_stats: value.parquet_rg_matched_stats, + parquet_rg_pruned_bloom_filter: value.parquet_rg_pruned_bloom_filter, + parquet_rg_matched_bloom_filter: value.parquet_rg_matched_bloom_filter, + } + } +} + +impl ExecutionPlanVisitor for PlanIOVisitor { type Error = datafusion_common::DataFusionError; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result { - match plan.metrics() { - Some(metrics) => match metrics.sum_by_name("bytes_scanned") { - Some(bytes_scanned) => { - info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize()); - self.total_bytes_scanned += bytes_scanned.as_usize(); - } - None => { - info!("No bytes_scanned for {}", plan.name()) - } - }, - None => { - info!("No MetricsSet for {}", plan.name()) + if is_io_plan(plan) { + self.collect_io_metrics(plan); + } + Ok(true) + } +} + +#[derive(Clone, Debug)] +pub struct PartitionsComputeStats { + name: String, + /// Sorted elapsed compute times + elapsed_computes: Vec, +} + +impl PartitionsComputeStats { + fn summary_stats(&self) -> (usize, usize, usize, usize, usize) { + if self.elapsed_computes.is_empty() { + (0, 0, 0, 0, 0) + } else { + let min = self.elapsed_computes[0]; + let median = self.elapsed_computes[self.elapsed_computes.len() / 2]; + let max = self.elapsed_computes[self.elapsed_computes.len() - 1]; + let total: usize = self.elapsed_computes.iter().sum(); + let mean = total / self.elapsed_computes.len(); + (min, median, mean, max, total) + } + } + + fn partitions(&self) -> usize { + self.elapsed_computes.len() + } +} + +#[derive(Clone, Debug)] +pub struct ExecutionComputeStats { + elapsed_compute: Option, + projection_compute: Option>, + filter_compute: Option>, + sort_compute: Option>, + join_compute: Option>, + aggregate_compute: Option>, + other_compute: Option>, +} + +impl ExecutionComputeStats { + fn display_compute( + &self, + f: &mut std::fmt::Formatter<'_>, + compute: &Option>, + label: &str, + ) -> std::fmt::Result { + if let (Some(filter_compute), Some(elapsed_compute)) = (compute, &self.elapsed_compute) { + let partitions = filter_compute.iter().fold(0, |acc, c| acc + c.partitions()); + writeln!( + f, + "{label} Stats ({} nodes, {} partitions)", + filter_compute.len(), + partitions + )?; + writeln!( + f, + "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}", + "Node(Partitions)", "Min", "Median", "Mean", "Max", "Total (%)" + )?; + filter_compute.iter().try_for_each(|node| { + let (min, median, mean, max, total) = node.summary_stats(); + let total = format!( + "{} ({:.2}%)", + total, + (total as f32 / *elapsed_compute as f32) * 100.0 + ); + writeln!( + f, + "{:<30} {:<16} {:<16} {:<16} {:<16} {:<16}", + format!("{}({})", node.name, node.elapsed_computes.len()), + min, + median, + mean, + max, + total, + )?; + Ok(()) + })? + } else { + writeln!(f, "No {label} Stats")?; + }; + Ok(()) + } +} + +impl std::fmt::Display for ExecutionComputeStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!( + f, + "==================================== Compute Summary =====================================" + )?; + writeln!(f, "{:<20}", "Elapsed Compute",)?; + writeln!( + f, + "{:<20}", + self.elapsed_compute + .as_ref() + .map(|m| m.to_string()) + .unwrap_or("None".to_string()), + )?; + writeln!(f)?; + self.display_compute(f, &self.projection_compute, "Projection")?; + writeln!(f)?; + self.display_compute(f, &self.filter_compute, "Filter")?; + writeln!(f)?; + self.display_compute(f, &self.sort_compute, "Sort")?; + writeln!(f)?; + self.display_compute(f, &self.join_compute, "Join")?; + writeln!(f)?; + self.display_compute(f, &self.aggregate_compute, "Aggregate")?; + writeln!(f)?; + self.display_compute(f, &self.other_compute, "Other")?; + writeln!(f) + } +} + +#[derive(Default)] +pub struct PlanComputeVisitor { + elapsed_compute: Option, + filter_computes: Vec, + sort_computes: Vec, + projection_computes: Vec, + join_computes: Vec, + aggregate_computes: Vec, + other_computes: Vec, +} + +impl PlanComputeVisitor { + fn add_elapsed_compute(&mut self, node_elapsed_compute: Option) { + match (self.elapsed_compute, node_elapsed_compute) { + (Some(agg_elapsed_compute), Some(node_elapsed_compute)) => { + self.elapsed_compute = Some(agg_elapsed_compute + node_elapsed_compute) + } + (Some(_), None) | (None, None) => {} + (None, Some(node_elapsed_compute)) => self.elapsed_compute = Some(node_elapsed_compute), + } + } + + fn collect_compute_metrics(&mut self, plan: &dyn ExecutionPlan) { + let compute_metrics = plan.metrics(); + if let Some(metrics) = compute_metrics { + self.add_elapsed_compute(metrics.elapsed_compute()); + } + self.collect_filter_metrics(plan); + self.collect_sort_metrics(plan); + self.collect_projection_metrics(plan); + self.collect_join_metrics(plan); + self.collect_aggregate_metrics(plan); + self.collect_other_metrics(plan); + } + + // TODO: Refactor to have a single function that takes predicate and collector + fn collect_filter_metrics(&mut self, plan: &dyn ExecutionPlan) { + if is_filter_plan(plan) { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.filter_computes.push(p) + } + } + } + + fn collect_sort_metrics(&mut self, plan: &dyn ExecutionPlan) { + if is_sort_plan(plan) { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.sort_computes.push(p) + } + } + } + + fn collect_projection_metrics(&mut self, plan: &dyn ExecutionPlan) { + if is_projection_plan(plan) { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.projection_computes.push(p) + } + } + } + + fn collect_join_metrics(&mut self, plan: &dyn ExecutionPlan) { + if is_join_plan(plan) { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.join_computes.push(p) + } + } + } + + fn collect_aggregate_metrics(&mut self, plan: &dyn ExecutionPlan) { + if is_aggregate_plan(plan) { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.aggregate_computes.push(p) + } + } + } + + fn collect_other_metrics(&mut self, plan: &dyn ExecutionPlan) { + if !is_filter_plan(plan) + && !is_sort_plan(plan) + && !is_projection_plan(plan) + && !is_aggregate_plan(plan) + && !is_join_plan(plan) + { + if let Some(metrics) = plan.metrics() { + let sorted_computes: Vec = metrics + .iter() + .filter_map(|m| match m.value() { + MetricValue::ElapsedCompute(t) => Some(t.value()), + _ => None, + }) + .sorted() + .collect(); + let p = PartitionsComputeStats { + name: plan.name().to_string(), + elapsed_computes: sorted_computes, + }; + self.other_computes.push(p) } } + } +} + +fn is_filter_plan(plan: &dyn ExecutionPlan) -> bool { + plan.as_any().downcast_ref::().is_some() +} + +fn is_sort_plan(plan: &dyn ExecutionPlan) -> bool { + plan.as_any().downcast_ref::().is_some() + || plan + .as_any() + .downcast_ref::() + .is_some() +} + +fn is_projection_plan(plan: &dyn ExecutionPlan) -> bool { + plan.as_any().downcast_ref::().is_some() +} + +fn is_join_plan(plan: &dyn ExecutionPlan) -> bool { + plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan.as_any().downcast_ref::().is_some() + || plan + .as_any() + .downcast_ref::() + .is_some() +} + +fn is_aggregate_plan(plan: &dyn ExecutionPlan) -> bool { + plan.as_any().downcast_ref::().is_some() +} + +impl From for ExecutionComputeStats { + fn from(value: PlanComputeVisitor) -> Self { + Self { + elapsed_compute: value.elapsed_compute, + filter_compute: Some(value.filter_computes), + sort_compute: Some(value.sort_computes), + projection_compute: Some(value.projection_computes), + join_compute: Some(value.join_computes), + aggregate_compute: Some(value.aggregate_computes), + other_compute: Some(value.other_computes), + } + } +} + +impl ExecutionPlanVisitor for PlanComputeVisitor { + type Error = datafusion_common::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> color_eyre::Result { + if !is_io_plan(plan) { + self.collect_compute_metrics(plan); + } Ok(true) } } -pub fn collect_plan_stats(plan: Arc) -> Option { - let mut visitor = PlanVisitor::default(); +fn is_io_plan(plan: &dyn ExecutionPlan) -> bool { + let io_plans = ["CsvExec", "ParquetExec", "ArrowExec"]; + io_plans.contains(&plan.name()) +} + +pub fn collect_plan_io_stats(plan: Arc) -> Option { + let mut visitor = PlanIOVisitor::new(); + if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { + Some(visitor.into()) + } else { + None + } +} + +pub fn collect_plan_compute_stats(plan: Arc) -> Option { + let mut visitor = PlanComputeVisitor::default(); if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() { Some(visitor.into()) } else { None } } + +pub fn print_io_summary(plan: Arc) { + println!("======================= IO Summary ========================"); + if let Some(stats) = collect_plan_io_stats(plan) { + println!("IO Stats: {:#?}", stats); + } else { + println!("No IO metrics found"); + } +} diff --git a/src/tui/state/tabs/history.rs b/src/tui/state/tabs/history.rs index 7ed6d77..c12a6b7 100644 --- a/src/tui/state/tabs/history.rs +++ b/src/tui/state/tabs/history.rs @@ -20,7 +20,7 @@ use std::time::Duration; use ratatui::widgets::TableState; -use crate::execution::ExecutionStats; +use crate::execution::stats::ExecutionStats; #[derive(Debug)] pub enum Context {