From 3214287a5f93bdfb5f43c2cd63117af4dac6ae7a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 08:52:36 -0400 Subject: [PATCH] save --- vortex-datafusion/src/lib.rs | 100 ++++++++++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index ba7330848f..6361d36869 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -9,13 +9,20 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::buffer::NullBuffer; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::prelude::SessionContext; -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_common::{ + exec_datafusion_err, DataFusionError, Result as DFResult, ScalarValue, ToDFSchema, +}; +use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, @@ -99,9 +106,16 @@ impl TableProvider for VortexInMemoryTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - if !filters.is_empty() { - return exec_err!("vortex does not support filter pushdown"); - } + let filter_expr = if filters.is_empty() { + None + } else { + Some(make_simplified_conjunction( + filters, + self.schema_ref.clone(), + )?) + }; + + println!("simplified filter: {filter_expr:?}"); let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) @@ -118,6 +132,7 @@ impl TableProvider for VortexInMemoryTableProvider { Ok(Arc::new(VortexMemoryExec { array: self.array.clone(), projection: projection.cloned(), + filter_expr, plan_properties, })) } @@ -129,15 +144,88 @@ impl TableProvider for VortexInMemoryTableProvider { // TODO(aduffy): add support for filter pushdown Ok(filters .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) + .map(|expr| { + match expr { + // Several expressions can be pushed down. + Expr::BinaryExpr(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::Cast(_) => TableProviderFilterPushDown::Exact, + + // All other expressions should be handled outside of the TableProvider + // via the normal DataFusion operator chain. + _ => TableProviderFilterPushDown::Unsupported, + } + + TableProviderFilterPushDown::Exact + }) .collect()) } } +struct ValidationVisitor {} + +impl ValidationVisitor { + +} + +impl TreeNodeVisitor for ValidationVisitor { + type Node = Expr; + + fn f_down(&mut self, node: &Self::Node) -> DFResult { + + } +} + +/// A mask determining the rows in an Array that should be treated as valid for query processing. +/// The vector is used to determine the take order of a set of things, or otherwise we determine +/// that we want to perform cross-filtering of the larger columns, if we so choose. +pub(crate) struct RowSelection { + selection: NullBuffer, +} + +/// Convert a set of expressions that must all match into a single AND expression. +/// +/// # Returns +/// +/// If conversion is successful, the result will be a +/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. +/// +/// Note that the set of operators must be provided here instead. +/// +/// # Simplification +/// +/// Simplification will occur as part of this process, so constant folding and similar optimizations +/// will be applied before returning the final expression. +fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult { + let init = Box::new(Expr::Literal(ScalarValue::Boolean(Some(true)))); + let conjunction = filters.iter().fold(init, |conj, item| { + Box::new(Expr::BinaryExpr(BinaryExpr::new( + conj, + Operator::And, + Box::new(item.clone()), + ))) + }); + + let schema = schema.to_dfschema_ref()?; + + // simplify the expression. + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(schema); + let simplifier = ExprSimplifier::new(context); + + simplifier.simplify(*conjunction) +} + /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] struct VortexMemoryExec { array: Array, + filter_expr: Option, projection: Option>, plan_properties: PlanProperties, }