From 5b1ed72a0a9aef6b6b32d4e8b2ce14110cbb5bde Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 15 Aug 2024 14:17:58 +0100 Subject: [PATCH] `Exact` support for more expressions (#628) Start moving from `TableProviderFilterPushDown::Inexact` support to `TableProviderFilterPushDown::Exact` to allow for better performance in supported expressions. --------- Co-authored-by: Robert Kruszewski --- bench-vortex/src/bin/tpch_benchmark.rs | 50 +++++++------ vortex-array/src/arrow/recordbatch.rs | 19 ++--- vortex-datafusion/src/persistent/opener.rs | 75 ++++++++++++++++++-- vortex-datafusion/src/persistent/provider.rs | 13 +++- 4 files changed, 120 insertions(+), 37 deletions(-) diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 2b70eb4362..befe6c69e6 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -7,7 +7,7 @@ use std::time::SystemTime; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; use bench_vortex::tpch::{load_datasets, tpch_queries, Format, EXPECTED_ROW_COUNTS}; -use clap::Parser; +use clap::{ArgAction, Parser}; use futures::future::try_join_all; use indicatif::ProgressBar; use itertools::Itertools; @@ -20,6 +20,10 @@ struct Args { queries: Option>, #[arg(short, long)] threads: Option, + #[arg(short, long, default_value_t = true, default_missing_value = "true", action = ArgAction::Set)] + warmup: bool, + #[arg(short, long, default_value = "10")] + iterations: usize, } fn main() -> ExitCode { @@ -40,10 +44,10 @@ fn main() -> ExitCode { } .expect("Failed building the Runtime"); - runtime.block_on(bench_main(args.queries)) + runtime.block_on(bench_main(args.queries, args.iterations, args.warmup)) } -async fn bench_main(queries: Option>) -> ExitCode { +async fn bench_main(queries: Option>, iterations: usize, warmup: bool) -> ExitCode { // uncomment the below to enable trace logging of datafusion execution // setup_logger(LevelFilter::Trace); @@ -106,27 +110,31 @@ async fn bench_main(queries: Option>) -> ExitCode { .build() .unwrap(); for (ctx, format) in ctxs.iter().zip(formats.iter()) { - for i in 0..3 { - // warmup - let row_count: usize = rt.block_on(async { - ctx.sql(&query) - .await - .map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e)) - .unwrap() - .collect() - .await - .map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e)) - .unwrap() - .iter() - .map(|r| r.num_rows()) - .sum() - }); - if i == 0 { - count_tx.send((q, *format, row_count)).unwrap(); + if warmup { + for i in 0..3 { + let row_count: usize = rt.block_on(async { + ctx.sql(&query) + .await + .map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e)) + .unwrap() + .collect() + .await + .map_err(|e| { + println!("Failed to collect {} {:?}: {}", q, format, e) + }) + .unwrap() + .iter() + .map(|r| r.num_rows()) + .sum() + }); + if i == 0 { + count_tx.send((q, *format, row_count)).unwrap(); + } } } + let mut measure = Vec::new(); - for _ in 0..10 { + for _ in 0..iterations { let start = SystemTime::now(); rt.block_on(async { ctx.sql(&query) diff --git a/vortex-array/src/arrow/recordbatch.rs b/vortex-array/src/arrow/recordbatch.rs index 7a23d6f913..020988890e 100644 --- a/vortex-array/src/arrow/recordbatch.rs +++ b/vortex-array/src/arrow/recordbatch.rs @@ -5,7 +5,7 @@ use itertools::Itertools; use crate::array::StructArray; use crate::arrow::FromArrowArray; use crate::validity::Validity; -use crate::{Array, IntoArray, IntoCanonical}; +use crate::{Array, IntoArrayVariant, IntoCanonical}; impl From for Array { fn from(value: RecordBatch) -> Self { @@ -33,17 +33,20 @@ impl From for Array { impl From for RecordBatch { fn from(value: Array) -> Self { - let array_ref = value - .into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow(); - let struct_array = as_struct_array(array_ref.as_ref()); - RecordBatch::from(struct_array) + let struct_arr = value + .into_struct() + .expect("RecordBatch can only be constructed from a Vortex StructArray"); + Self::from(struct_arr) } } impl From for RecordBatch { fn from(value: StructArray) -> Self { - RecordBatch::from(value.into_array()) + let array_ref = value + .into_canonical() + .expect("Struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(array_ref.as_ref()); + Self::from(struct_array) } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 047799559f..0c547f0042 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use arrow_array::cast::AsArray; @@ -5,9 +6,11 @@ use arrow_array::{Array as _, BooleanArray, RecordBatch}; use arrow_schema::SchemaRef; use datafusion::arrow::buffer::{buffer_bin_and, BooleanBuffer}; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; -use datafusion_common::Result as DFResult; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::{DataFusionError, Result as DFResult}; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; +use itertools::Itertools; use object_store::ObjectStore; use vortex::array::BoolArray; use vortex::arrow::FromArrowArray; @@ -18,7 +21,7 @@ use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder; use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer}; use vortex_serde::layouts::reader::projections::Projection; -use crate::expr::convert_expr_to_vortex; +use crate::expr::{convert_expr_to_vortex, VortexPhysicalExpr}; pub struct VortexFileOpener { pub ctx: Arc, @@ -43,14 +46,33 @@ impl FileOpener for VortexFileOpener { builder = builder.with_batch_size(batch_size); } - let predicate = self.predicate.clone().and_then(|predicate| { - convert_expr_to_vortex(predicate, self.arrow_schema.as_ref()).ok() - }); + let predicate_projection = + extract_column_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?; + + let predicate = self + .predicate + .clone() + .map(|predicate| -> DFResult> { + let vtx_expr = convert_expr_to_vortex(predicate, self.arrow_schema.as_ref()) + .map_err(|e| DataFusionError::External(e.into()))?; + + DFResult::Ok(vtx_expr) + }) + .transpose()?; if let Some(projection) = self.projection.as_ref() { + let mut projection = projection.clone(); + for col_idx in predicate_projection.into_iter() { + if !projection.contains(&col_idx) { + projection.push(col_idx); + } + } + builder = builder.with_projection(Projection::new(projection)) } + let original_projection_len = self.projection.as_ref().map(|v| v.len()); + Ok(async move { let reader = builder.build().await?; @@ -58,7 +80,7 @@ impl FileOpener for VortexFileOpener { .and_then(move |array| { let predicate = predicate.clone(); async move { - let array = if let Some(predicate) = predicate.as_ref() { + let array = if let Some(predicate) = predicate { let predicate_result = predicate.evaluate(&array)?; let filter_array = null_as_false(predicate_result.into_bool()?)?; @@ -67,7 +89,14 @@ impl FileOpener for VortexFileOpener { array }; - Ok(RecordBatch::from(array)) + let rb = RecordBatch::from(array); + + // If we had a projection, we cut the record batch down to the desired columns + if let Some(len) = original_projection_len { + Ok(rb.project(&(0..len).collect_vec())?) + } else { + Ok(rb) + } } }) .map_err(|e| e.into()); @@ -102,6 +131,38 @@ fn null_as_false(array: BoolArray) -> VortexResult { Ok(Array::from_arrow(boolean_array, false)) } +/// Extract all indexes of all columns referenced by the physical expressions from the schema +fn extract_column_from_expr( + expr: Option<&Arc>, + schema_ref: SchemaRef, +) -> DFResult> { + let mut predicate_projection = HashSet::new(); + + if let Some(expr) = expr { + expr.apply(|expr| { + if let Some(column) = expr + .as_any() + .downcast_ref::() + { + match schema_ref.column_with_name(column.name()) { + Some(_) => { + predicate_projection.insert(column.index()); + } + None => { + return Err(DataFusionError::External( + format!("Could not find expected column {} in schema", column.name()) + .into(), + )) + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; + } + + Ok(predicate_projection) +} + #[cfg(test)] mod tests { use vortex::array::BoolArray; diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index baeefd4f35..b4206160fb 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -15,8 +15,10 @@ use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; +use itertools::Itertools; use super::config::VortexTableOptions; +use crate::can_be_pushed_down; use crate::persistent::execution::VortexExec; pub struct VortexFileTableProvider { @@ -100,7 +102,16 @@ impl TableProvider for VortexFileTableProvider { &self, filters: &[&Expr], ) -> DFResult> { - Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + filters + .iter() + .map(|expr| { + if can_be_pushed_down(expr, self.schema().as_ref()) { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .try_collect() } fn statistics(&self) -> Option {