Skip to content

Commit

Permalink
Unify expression evaluation for both Table Providers (#632)
Browse files Browse the repository at this point in the history
closes #631
  • Loading branch information
AdamGS authored Aug 15, 2024
1 parent 5b1ed72 commit 89c9e0f
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 183 deletions.
48 changes: 0 additions & 48 deletions vortex-datafusion/src/eval.rs

This file was deleted.

36 changes: 35 additions & 1 deletion vortex-datafusion/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#![allow(dead_code)]

use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion_common::{Result as DFResult, ToDFSchema};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result as DFResult, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{and, lit, Expr, Operator as DFOperator};
Expand Down Expand Up @@ -173,6 +175,38 @@ pub fn convert_expr_to_vortex(
vortex_bail!("Couldn't convert DataFusion physical expression to a vortex expression")
}

/// Extract all indexes of all columns referenced by the physical expressions from the schema
pub(crate) fn extract_columns_from_expr(
expr: Option<&Arc<dyn PhysicalExpr>>,
schema_ref: SchemaRef,
) -> DFResult<HashSet<usize>> {
let mut predicate_projection = HashSet::new();

if let Some(expr) = expr {
expr.apply(|expr| {
if let Some(column) = expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
match schema_ref.column_with_name(column.name()) {
Some(_) => {
predicate_projection.insert(column.index());
}
None => {
return Err(DataFusionError::External(
format!("Could not find expected column {} in schema", column.name())
.into(),
))
}
}
}
Ok(TreeNodeRecursion::Continue)
})?;
}

Ok(predicate_projection)
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
34 changes: 0 additions & 34 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#![allow(clippy::nonminimal_bool)]

use std::any::Any;
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -13,13 +12,11 @@ use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{Expr, Operator};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::Stream;
use itertools::Itertools;
use memory::{VortexMemTable, VortexMemTableOptions};
use persistent::config::VortexTableOptions;
use persistent::provider::VortexFileTableProvider;
Expand All @@ -33,7 +30,6 @@ pub mod persistent;
pub mod scalar;

mod datatype;
mod eval;
mod plans;

const SUPPORTED_BINARY_OPS: &[Operator] = &[
Expand Down Expand Up @@ -170,36 +166,6 @@ fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool {
}
}

fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec<usize> {
let referenced_columns: HashSet<String> =
exprs.iter().flat_map(get_column_references).collect();

let projection: Vec<usize> = referenced_columns
.iter()
.map(|col_name| schema.column_with_name(col_name).unwrap().0)
.sorted()
.collect();

projection
}

/// Extract out the columns from our table referenced by the expression.
fn get_column_references(expr: &Expr) -> HashSet<String> {
let mut references = HashSet::new();

expr.apply(|node| match node {
Expr::Column(col) => {
references.insert(col.name.clone());

Ok(TreeNodeRecursion::Continue)
}
_ => Ok(TreeNodeRecursion::Continue),
})
.unwrap();

references
}

/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array.
#[derive(Clone)]
struct VortexScanExec {
Expand Down
47 changes: 25 additions & 22 deletions vortex-datafusion/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use datafusion_common::Result as DFResult;
use datafusion_common::{Result as DFResult, ToDFSchema};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties, PhysicalExpr};
use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties};
use itertools::Itertools;
use vortex::array::ChunkedArray;
use vortex::{Array, ArrayDType as _};

use crate::datatype::infer_schema;
use crate::expr::extract_columns_from_expr;
use crate::plans::{RowSelectorExec, TakeRowsExec};
use crate::{can_be_pushed_down, get_filter_projection, VortexScanExec};
use crate::{can_be_pushed_down, VortexScanExec};

/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine.
///
Expand Down Expand Up @@ -80,33 +82,33 @@ impl TableProvider for VortexMemTable {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let filter_exprs = if filters.is_empty() {
None
} else {
Some(filters)
};

let output_projection: Vec<usize> = match projection {
None => (0..self.schema_ref.fields().len()).collect(),
Some(proj) => proj.clone(),
};

match filter_exprs {
match conjunction(filters.to_vec()) {
// If there is a filter expression, we execute in two phases, first performing a filter
// on the input to get back row indices, and then taking the remaining struct columns
// using the calculated indices from the filter.
Some(filter_exprs) => {
Some(expr) => {
let df_schema = self.schema_ref.clone().to_dfschema()?;

let filter_expr = create_physical_expr(&expr, &df_schema, state.execution_props())?;

let filter_projection =
get_filter_projection(filter_exprs, self.schema_ref.clone());
extract_columns_from_expr(Some(&filter_expr), self.schema_ref.clone())?
.into_iter()
.collect();

Ok(make_filter_then_take_plan(
make_filter_then_take_plan(
self.schema_ref.clone(),
filter_exprs,
filter_expr,
filter_projection,
self.array.clone(),
output_projection.clone(),
state,
))
)
}

// If no filters were pushed down, we materialize the entire StructArray into a
Expand Down Expand Up @@ -190,24 +192,25 @@ impl VortexMemTableOptions {
/// columns.
fn make_filter_then_take_plan(
schema: SchemaRef,
filter_exprs: &[Expr],
filter_expr: Arc<dyn PhysicalExpr>,
filter_projection: Vec<usize>,
chunked_array: ChunkedArray,
output_projection: Vec<usize>,
_session_state: &dyn Session,
) -> Arc<dyn ExecutionPlan> {
let row_selector_op = Arc::new(RowSelectorExec::new(
filter_exprs,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let row_selector_op = Arc::new(RowSelectorExec::try_new(
filter_expr,
filter_projection,
&chunked_array,
));
schema.clone(),
)?);

Arc::new(TakeRowsExec::new(
Ok(Arc::new(TakeRowsExec::new(
schema.clone(),
&output_projection,
row_selector_op.clone(),
&chunked_array,
))
)))
}

#[cfg(test)]
Expand Down
38 changes: 2 additions & 36 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::collections::HashSet;
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{Array as _, BooleanArray, RecordBatch};
use arrow_schema::SchemaRef;
use datafusion::arrow::buffer::{buffer_bin_and, BooleanBuffer};
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result as DFResult};
use datafusion_physical_expr::PhysicalExpr;
use futures::{FutureExt as _, TryStreamExt};
Expand All @@ -21,7 +19,7 @@ use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder;
use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer};
use vortex_serde::layouts::reader::projections::Projection;

use crate::expr::{convert_expr_to_vortex, VortexPhysicalExpr};
use crate::expr::{convert_expr_to_vortex, extract_columns_from_expr, VortexPhysicalExpr};

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
Expand All @@ -47,7 +45,7 @@ impl FileOpener for VortexFileOpener {
}

let predicate_projection =
extract_column_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?;
extract_columns_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?;

let predicate = self
.predicate
Expand Down Expand Up @@ -131,38 +129,6 @@ fn null_as_false(array: BoolArray) -> VortexResult<Array> {
Ok(Array::from_arrow(boolean_array, false))
}

/// Extract all indexes of all columns referenced by the physical expressions from the schema
fn extract_column_from_expr(
expr: Option<&Arc<dyn PhysicalExpr>>,
schema_ref: SchemaRef,
) -> DFResult<HashSet<usize>> {
let mut predicate_projection = HashSet::new();

if let Some(expr) = expr {
expr.apply(|expr| {
if let Some(column) = expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
match schema_ref.column_with_name(column.name()) {
Some(_) => {
predicate_projection.insert(column.index());
}
None => {
return Err(DataFusionError::External(
format!("Could not find expected column {} in schema", column.name())
.into(),
))
}
}
}
Ok(TreeNodeRecursion::Continue)
})?;
}

Ok(predicate_projection)
}

#[cfg(test)]
mod tests {
use vortex::array::BoolArray;
Expand Down
Loading

0 comments on commit 89c9e0f

Please sign in to comment.