From 89c9e0f788b243481a15e8a9cd261e0fd683f84a Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 15 Aug 2024 16:20:44 +0100 Subject: [PATCH] Unify expression evaluation for both Table Providers (#632) closes #631 --- vortex-datafusion/src/eval.rs | 48 ------------- vortex-datafusion/src/expr.rs | 36 +++++++++- vortex-datafusion/src/lib.rs | 34 --------- vortex-datafusion/src/memory.rs | 47 +++++++------ vortex-datafusion/src/persistent/opener.rs | 38 +--------- vortex-datafusion/src/plans.rs | 82 +++++++++++----------- 6 files changed, 102 insertions(+), 183 deletions(-) delete mode 100644 vortex-datafusion/src/eval.rs diff --git a/vortex-datafusion/src/eval.rs b/vortex-datafusion/src/eval.rs deleted file mode 100644 index 4112085826..0000000000 --- a/vortex-datafusion/src/eval.rs +++ /dev/null @@ -1,48 +0,0 @@ -use arrow_schema::Schema; -use datafusion_expr::{Expr, Operator as DFOperator}; -use vortex::array::ConstantArray; -use vortex::compute::{and, compare, or}; -use vortex::{Array, IntoArray}; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use vortex_expr::Operator; - -use crate::can_be_pushed_down; -use crate::scalar::dfvalue_to_scalar; - -pub struct ExpressionEvaluator; - -impl ExpressionEvaluator { - pub fn eval(array: Array, expr: &Expr, schema: &Schema) -> VortexResult { - debug_assert!(can_be_pushed_down(expr, schema)); - - match expr { - Expr::BinaryExpr(expr) => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref(), schema)?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref(), schema)?; - // TODO(adamg): turn and/or into more general compute functions - match expr.op { - DFOperator::And => and(&lhs, &rhs), - DFOperator::Or => or(&lhs, &rhs), - DFOperator::Eq => compare(&lhs, &rhs, Operator::Eq), - DFOperator::Gt => compare(&lhs, &rhs, Operator::Gt), - DFOperator::GtEq => compare(&lhs, &rhs, Operator::Gte), - DFOperator::Lt => compare(&lhs, &rhs, Operator::Lt), - DFOperator::LtEq => compare(&lhs, &rhs, Operator::Lte), - DFOperator::NotEq => compare(&lhs, &rhs, Operator::NotEq), - _ => vortex_bail!("{} is an unsupported operator", expr.op), - } - } - Expr::Column(col) => array.with_dyn(|a| { - let name = col.name(); - a.as_struct_array() - .and_then(|a| a.field_by_name(name)) - .ok_or(vortex_err!("Missing field {name} in struct array")) - }), - Expr::Literal(lit) => { - let lit = dfvalue_to_scalar(lit.clone()); - Ok(ConstantArray::new(lit, array.len()).into_array()) - } - _ => unreachable!(), - } - } -} diff --git a/vortex-datafusion/src/expr.rs b/vortex-datafusion/src/expr.rs index 58bfca8efd..260f935be6 100644 --- a/vortex-datafusion/src/expr.rs +++ b/vortex-datafusion/src/expr.rs @@ -1,11 +1,13 @@ #![allow(dead_code)] +use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; use arrow_schema::{Schema, SchemaRef}; use datafusion::optimizer::simplify_expressions::ExprSimplifier; -use datafusion_common::{Result as DFResult, ToDFSchema}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::{DataFusionError, Result as DFResult, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{and, lit, Expr, Operator as DFOperator}; @@ -173,6 +175,38 @@ pub fn convert_expr_to_vortex( vortex_bail!("Couldn't convert DataFusion physical expression to a vortex expression") } +/// Extract all indexes of all columns referenced by the physical expressions from the schema +pub(crate) fn extract_columns_from_expr( + expr: Option<&Arc>, + schema_ref: SchemaRef, +) -> DFResult> { + let mut predicate_projection = HashSet::new(); + + if let Some(expr) = expr { + expr.apply(|expr| { + if let Some(column) = expr + .as_any() + .downcast_ref::() + { + match schema_ref.column_with_name(column.name()) { + Some(_) => { + predicate_projection.insert(column.index()); + } + None => { + return Err(DataFusionError::External( + format!("Could not find expected column {} in schema", column.name()) + .into(), + )) + } + } + } + Ok(TreeNodeRecursion::Continue) + })?; + } + + Ok(predicate_projection) +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index f4a3f382c6..6a8d0137d6 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -3,7 +3,6 @@ #![allow(clippy::nonminimal_bool)] use std::any::Any; -use std::collections::HashSet; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; @@ -13,13 +12,11 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema, SchemaRef}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::{DataFrame, SessionContext}; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{Expr, Operator}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; -use itertools::Itertools; use memory::{VortexMemTable, VortexMemTableOptions}; use persistent::config::VortexTableOptions; use persistent::provider::VortexFileTableProvider; @@ -33,7 +30,6 @@ pub mod persistent; pub mod scalar; mod datatype; -mod eval; mod plans; const SUPPORTED_BINARY_OPS: &[Operator] = &[ @@ -170,36 +166,6 @@ fn can_be_pushed_down(expr: &Expr, schema: &Schema) -> bool { } } -fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { - let referenced_columns: HashSet = - exprs.iter().flat_map(get_column_references).collect(); - - let projection: Vec = referenced_columns - .iter() - .map(|col_name| schema.column_with_name(col_name).unwrap().0) - .sorted() - .collect(); - - projection -} - -/// Extract out the columns from our table referenced by the expression. -fn get_column_references(expr: &Expr) -> HashSet { - let mut references = HashSet::new(); - - expr.apply(|node| match node { - Expr::Column(col) => { - references.insert(col.name.clone()); - - Ok(TreeNodeRecursion::Continue) - } - _ => Ok(TreeNodeRecursion::Continue), - }) - .unwrap(); - - references -} - /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Clone)] struct VortexScanExec { diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index d5dae48352..761483b835 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -6,17 +6,19 @@ use async_trait::async_trait; use datafusion::catalog::Session; use datafusion::datasource::TableProvider; use datafusion::prelude::*; -use datafusion_common::Result as DFResult; +use datafusion_common::{Result as DFResult, ToDFSchema}; +use datafusion_expr::utils::conjunction; use datafusion_expr::{TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties, PhysicalExpr}; use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}; use itertools::Itertools; use vortex::array::ChunkedArray; use vortex::{Array, ArrayDType as _}; use crate::datatype::infer_schema; +use crate::expr::extract_columns_from_expr; use crate::plans::{RowSelectorExec, TakeRowsExec}; -use crate::{can_be_pushed_down, get_filter_projection, VortexScanExec}; +use crate::{can_be_pushed_down, VortexScanExec}; /// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. /// @@ -80,33 +82,33 @@ impl TableProvider for VortexMemTable { filters: &[Expr], _limit: Option, ) -> DFResult> { - let filter_exprs = if filters.is_empty() { - None - } else { - Some(filters) - }; - let output_projection: Vec = match projection { None => (0..self.schema_ref.fields().len()).collect(), Some(proj) => proj.clone(), }; - match filter_exprs { + match conjunction(filters.to_vec()) { // If there is a filter expression, we execute in two phases, first performing a filter // on the input to get back row indices, and then taking the remaining struct columns // using the calculated indices from the filter. - Some(filter_exprs) => { + Some(expr) => { + let df_schema = self.schema_ref.clone().to_dfschema()?; + + let filter_expr = create_physical_expr(&expr, &df_schema, state.execution_props())?; + let filter_projection = - get_filter_projection(filter_exprs, self.schema_ref.clone()); + extract_columns_from_expr(Some(&filter_expr), self.schema_ref.clone())? + .into_iter() + .collect(); - Ok(make_filter_then_take_plan( + make_filter_then_take_plan( self.schema_ref.clone(), - filter_exprs, + filter_expr, filter_projection, self.array.clone(), output_projection.clone(), state, - )) + ) } // If no filters were pushed down, we materialize the entire StructArray into a @@ -190,24 +192,25 @@ impl VortexMemTableOptions { /// columns. fn make_filter_then_take_plan( schema: SchemaRef, - filter_exprs: &[Expr], + filter_expr: Arc, filter_projection: Vec, chunked_array: ChunkedArray, output_projection: Vec, _session_state: &dyn Session, -) -> Arc { - let row_selector_op = Arc::new(RowSelectorExec::new( - filter_exprs, +) -> DFResult> { + let row_selector_op = Arc::new(RowSelectorExec::try_new( + filter_expr, filter_projection, &chunked_array, - )); + schema.clone(), + )?); - Arc::new(TakeRowsExec::new( + Ok(Arc::new(TakeRowsExec::new( schema.clone(), &output_projection, row_selector_op.clone(), &chunked_array, - )) + ))) } #[cfg(test)] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 0c547f0042..af50f59608 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use arrow_array::cast::AsArray; @@ -6,7 +5,6 @@ use arrow_array::{Array as _, BooleanArray, RecordBatch}; use arrow_schema::SchemaRef; use datafusion::arrow::buffer::{buffer_bin_and, BooleanBuffer}; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; -use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{DataFusionError, Result as DFResult}; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; @@ -21,7 +19,7 @@ use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder; use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer}; use vortex_serde::layouts::reader::projections::Projection; -use crate::expr::{convert_expr_to_vortex, VortexPhysicalExpr}; +use crate::expr::{convert_expr_to_vortex, extract_columns_from_expr, VortexPhysicalExpr}; pub struct VortexFileOpener { pub ctx: Arc, @@ -47,7 +45,7 @@ impl FileOpener for VortexFileOpener { } let predicate_projection = - extract_column_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?; + extract_columns_from_expr(self.predicate.as_ref(), self.arrow_schema.clone())?; let predicate = self .predicate @@ -131,38 +129,6 @@ fn null_as_false(array: BoolArray) -> VortexResult { Ok(Array::from_arrow(boolean_array, false)) } -/// Extract all indexes of all columns referenced by the physical expressions from the schema -fn extract_column_from_expr( - expr: Option<&Arc>, - schema_ref: SchemaRef, -) -> DFResult> { - let mut predicate_projection = HashSet::new(); - - if let Some(expr) = expr { - expr.apply(|expr| { - if let Some(column) = expr - .as_any() - .downcast_ref::() - { - match schema_ref.column_with_name(column.name()) { - Some(_) => { - predicate_projection.insert(column.index()); - } - None => { - return Err(DataFusionError::External( - format!("Could not find expected column {} in schema", column.name()) - .into(), - )) - } - } - } - Ok(TreeNodeRecursion::Continue) - })?; - } - - Ok(predicate_projection) -} - #[cfg(test)] mod tests { use vortex::array::BoolArray; diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 3187f91561..25ecdabbb9 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -1,7 +1,6 @@ //! Physical operators needed to implement scanning of Vortex arrays with pushdown. use std::any::Any; -use std::backtrace::Backtrace; use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; @@ -13,8 +12,7 @@ use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_common::{DataFusionError, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion_expr::Expr; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; @@ -24,25 +22,20 @@ use pin_project::pin_project; use vortex::array::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::compute::take; -use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant, IntoCanonical}; +use vortex::{Array, AsArray as _, IntoArray, IntoArrayVariant, IntoCanonical}; use vortex_error::vortex_err; -use crate::datatype::infer_schema; -use crate::eval::ExpressionEvaluator; -use crate::expr::{make_conjunction, simplify_expr}; +use crate::expr::{convert_expr_to_vortex, VortexPhysicalExpr}; /// Physical plan operator that applies a set of [filters][Expr] against the input, producing a /// row mask that can be used downstream to force a take against the corresponding struct array /// chunks but for different columns. pub(crate) struct RowSelectorExec { - filter_exprs: Vec, - + filter_expr: Arc, filter_projection: Vec, - - // cached PlanProperties object. We do not make use of this. + /// cached PlanProperties object. We do not make use of this. cached_plan_props: PlanProperties, - - // Full array. We only access partitions of this data. + /// Full array. We only access partitions of this data. chunked_array: ChunkedArray, } @@ -55,30 +48,33 @@ lazy_static! { } impl RowSelectorExec { - pub(crate) fn new( - filter_exprs: &[Expr], + pub(crate) fn try_new( + filter_expr: Arc, filter_projection: Vec, chunked_array: &ChunkedArray, - ) -> Self { + schema: SchemaRef, + ) -> DFResult { let cached_plan_props = PlanProperties::new( EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); - Self { - filter_exprs: filter_exprs.to_owned(), + let filter_expr = convert_expr_to_vortex(filter_expr, schema.as_ref())?; + + Ok(Self { + filter_expr, filter_projection: filter_projection.clone(), chunked_array: chunked_array.clone(), cached_plan_props, - } + }) } } impl Debug for RowSelectorExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("RowSelectorExec") - .field("filter_exprs", &self.filter_exprs) + .field("filter_expr", &self.filter_expr) .finish() } } @@ -132,23 +128,11 @@ impl ExecutionPlan for RowSelectorExec { .into()); } - // Derive a schema using the provided set of fields. - let filter_schema = Arc::new( - infer_schema(self.chunked_array.dtype()) - .project(self.filter_projection.as_slice()) - .map_err(|err| { - DataFusionError::ArrowError(err, Some(Backtrace::capture().to_string())) - })?, - ); - - let conjunction_expr = - simplify_expr(&make_conjunction(&self.filter_exprs)?, filter_schema)?; - Ok(Box::pin(RowIndicesStream { chunked_array: self.chunked_array.clone(), chunk_idx: 0, filter_projection: self.filter_projection.clone(), - conjunction_expr, + conjunction_expr: self.filter_expr.clone(), })) } } @@ -157,7 +141,7 @@ impl ExecutionPlan for RowSelectorExec { pub(crate) struct RowIndicesStream { chunked_array: ChunkedArray, chunk_idx: usize, - conjunction_expr: Expr, + conjunction_expr: Arc, filter_projection: Vec, } @@ -186,13 +170,13 @@ impl Stream for RowIndicesStream { .project(this.filter_projection.as_slice()) .expect("projection should succeed"); - let schema = infer_schema(vortex_struct.dtype()); - - // TODO(adamg): Filter on vortex arrays - let array = - ExpressionEvaluator::eval(vortex_struct.into_array(), &this.conjunction_expr, &schema) - .unwrap(); - let selection = array.into_canonical().unwrap().into_arrow(); + let selection = this + .conjunction_expr + .evaluate(vortex_struct.as_array_ref()) + .map_err(|e| DataFusionError::External(e.into()))? + .into_canonical() + .unwrap() + .into_arrow(); // Convert the `selection` BooleanArray into a UInt64Array of indices. let selection_indices = selection @@ -411,13 +395,18 @@ mod test { use std::sync::Arc; use arrow_array::{RecordBatch, UInt64Array}; + use datafusion_common::ToDFSchema; + use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{and, col, lit}; + use datafusion_physical_expr::create_physical_expr; use itertools::Itertools; use vortex::array::{BoolArray, ChunkedArray, PrimitiveArray, StructArray}; use vortex::validity::Validity; use vortex::{ArrayDType, IntoArray}; use vortex_dtype::FieldName; + use crate::datatype::infer_schema; + use crate::expr::convert_expr_to_vortex; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; #[tokio::test] @@ -439,10 +428,19 @@ mod test { let chunked_array = ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap(); + let schema = infer_schema(chunk.dtype()); + let logical_expr = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); + let df_expr = create_physical_expr( + &logical_expr, + &schema.clone().to_dfschema().unwrap(), + &ExecutionProps::new(), + ) + .unwrap(); + let filtering_stream = RowIndicesStream { chunked_array: chunked_array.clone(), chunk_idx: 0, - conjunction_expr: and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))), + conjunction_expr: convert_expr_to_vortex(df_expr, &schema).unwrap(), filter_projection: vec![0, 1], };