Skip to content

Commit

Permalink
Exact support for more expressions (#628)
Browse files Browse the repository at this point in the history
Start moving from `TableProviderFilterPushDown::Inexact` support to
`TableProviderFilterPushDown::Exact` to allow for better performance in
supported expressions.

---------

Co-authored-by: Robert Kruszewski <[email protected]>
  • Loading branch information
AdamGS and robert3005 authored Aug 15, 2024
1 parent a61a052 commit 5b1ed72
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 37 deletions.
50 changes: 29 additions & 21 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::SystemTime;

use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format, EXPECTED_ROW_COUNTS};
use clap::Parser;
use clap::{ArgAction, Parser};
use futures::future::try_join_all;
use indicatif::ProgressBar;
use itertools::Itertools;
Expand All @@ -20,6 +20,10 @@ struct Args {
queries: Option<Vec<usize>>,
#[arg(short, long)]
threads: Option<usize>,
#[arg(short, long, default_value_t = true, default_missing_value = "true", action = ArgAction::Set)]
warmup: bool,
#[arg(short, long, default_value = "10")]
iterations: usize,
}

fn main() -> ExitCode {
Expand All @@ -40,10 +44,10 @@ fn main() -> ExitCode {
}
.expect("Failed building the Runtime");

runtime.block_on(bench_main(args.queries))
runtime.block_on(bench_main(args.queries, args.iterations, args.warmup))
}

async fn bench_main(queries: Option<Vec<usize>>) -> ExitCode {
async fn bench_main(queries: Option<Vec<usize>>, iterations: usize, warmup: bool) -> ExitCode {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);

Expand Down Expand Up @@ -106,27 +110,31 @@ async fn bench_main(queries: Option<Vec<usize>>) -> ExitCode {
.build()
.unwrap();
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
for i in 0..3 {
// warmup
let row_count: usize = rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e))
.unwrap()
.collect()
.await
.map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e))
.unwrap()
.iter()
.map(|r| r.num_rows())
.sum()
});
if i == 0 {
count_tx.send((q, *format, row_count)).unwrap();
if warmup {
for i in 0..3 {
let row_count: usize = rt.block_on(async {
ctx.sql(&query)
.await
.map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e))
.unwrap()
.collect()
.await
.map_err(|e| {
println!("Failed to collect {} {:?}: {}", q, format, e)
})
.unwrap()
.iter()
.map(|r| r.num_rows())
.sum()
});
if i == 0 {
count_tx.send((q, *format, row_count)).unwrap();
}
}
}

let mut measure = Vec::new();
for _ in 0..10 {
for _ in 0..iterations {
let start = SystemTime::now();
rt.block_on(async {
ctx.sql(&query)
Expand Down
19 changes: 11 additions & 8 deletions vortex-array/src/arrow/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use itertools::Itertools;
use crate::array::StructArray;
use crate::arrow::FromArrowArray;
use crate::validity::Validity;
use crate::{Array, IntoArray, IntoCanonical};
use crate::{Array, IntoArrayVariant, IntoCanonical};

impl From<RecordBatch> for Array {
fn from(value: RecordBatch) -> Self {
Expand Down Expand Up @@ -33,17 +33,20 @@ impl From<RecordBatch> for Array {

impl From<Array> for RecordBatch {
fn from(value: Array) -> Self {
let array_ref = value
.into_canonical()
.expect("struct arrays must canonicalize")
.into_arrow();
let struct_array = as_struct_array(array_ref.as_ref());
RecordBatch::from(struct_array)
let struct_arr = value
.into_struct()
.expect("RecordBatch can only be constructed from a Vortex StructArray");
Self::from(struct_arr)
}
}

impl From<StructArray> for RecordBatch {
fn from(value: StructArray) -> Self {
RecordBatch::from(value.into_array())
let array_ref = value
.into_canonical()
.expect("Struct arrays must canonicalize")
.into_arrow();
let struct_array = as_struct_array(array_ref.as_ref());
Self::from(struct_array)
}
}
75 changes: 68 additions & 7 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{Array as _, BooleanArray, RecordBatch};
use arrow_schema::SchemaRef;
use datafusion::arrow::buffer::{buffer_bin_and, BooleanBuffer};
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion_common::Result as DFResult;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result as DFResult};
use datafusion_physical_expr::PhysicalExpr;
use futures::{FutureExt as _, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use vortex::array::BoolArray;
use vortex::arrow::FromArrowArray;
Expand All @@ -18,7 +21,7 @@ use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder;
use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer};
use vortex_serde::layouts::reader::projections::Projection;

use crate::expr::convert_expr_to_vortex;
use crate::expr::{convert_expr_to_vortex, VortexPhysicalExpr};

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
Expand All @@ -43,22 +46,41 @@ impl FileOpener for VortexFileOpener {
builder = builder.with_batch_size(batch_size);
}

let predicate = self.predicate.clone().and_then(|predicate| {
convert_expr_to_vortex(predicate, self.arrow_schema.as_ref()).ok()
});
let predicate_projection =
extract_column_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?;

let predicate = self
.predicate
.clone()
.map(|predicate| -> DFResult<Arc<dyn VortexPhysicalExpr>> {
let vtx_expr = convert_expr_to_vortex(predicate, self.arrow_schema.as_ref())
.map_err(|e| DataFusionError::External(e.into()))?;

DFResult::Ok(vtx_expr)
})
.transpose()?;

if let Some(projection) = self.projection.as_ref() {
let mut projection = projection.clone();
for col_idx in predicate_projection.into_iter() {
if !projection.contains(&col_idx) {
projection.push(col_idx);
}
}

builder = builder.with_projection(Projection::new(projection))
}

let original_projection_len = self.projection.as_ref().map(|v| v.len());

Ok(async move {
let reader = builder.build().await?;

let stream = reader
.and_then(move |array| {
let predicate = predicate.clone();
async move {
let array = if let Some(predicate) = predicate.as_ref() {
let array = if let Some(predicate) = predicate {
let predicate_result = predicate.evaluate(&array)?;

let filter_array = null_as_false(predicate_result.into_bool()?)?;
Expand All @@ -67,7 +89,14 @@ impl FileOpener for VortexFileOpener {
array
};

Ok(RecordBatch::from(array))
let rb = RecordBatch::from(array);

// If we had a projection, we cut the record batch down to the desired columns
if let Some(len) = original_projection_len {
Ok(rb.project(&(0..len).collect_vec())?)
} else {
Ok(rb)
}
}
})
.map_err(|e| e.into());
Expand Down Expand Up @@ -102,6 +131,38 @@ fn null_as_false(array: BoolArray) -> VortexResult<Array> {
Ok(Array::from_arrow(boolean_array, false))
}

/// Extract all indexes of all columns referenced by the physical expressions from the schema
fn extract_column_from_expr(
expr: Option<&Arc<dyn PhysicalExpr>>,
schema_ref: SchemaRef,
) -> DFResult<HashSet<usize>> {
let mut predicate_projection = HashSet::new();

if let Some(expr) = expr {
expr.apply(|expr| {
if let Some(column) = expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
match schema_ref.column_with_name(column.name()) {
Some(_) => {
predicate_projection.insert(column.index());
}
None => {
return Err(DataFusionError::External(
format!("Could not find expected column {} in schema", column.name())
.into(),
))
}
}
}
Ok(TreeNodeRecursion::Continue)
})?;
}

Ok(predicate_projection)
}

#[cfg(test)]
mod tests {
use vortex::array::BoolArray;
Expand Down
13 changes: 12 additions & 1 deletion vortex-datafusion/src/persistent/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use itertools::Itertools;

use super::config::VortexTableOptions;
use crate::can_be_pushed_down;
use crate::persistent::execution::VortexExec;

pub struct VortexFileTableProvider {
Expand Down Expand Up @@ -100,7 +102,16 @@ impl TableProvider for VortexFileTableProvider {
&self,
filters: &[&Expr],
) -> DFResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
filters
.iter()
.map(|expr| {
if can_be_pushed_down(expr, self.schema().as_ref()) {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
}
})
.try_collect()
}

fn statistics(&self) -> Option<Statistics> {
Expand Down

0 comments on commit 5b1ed72

Please sign in to comment.