Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 25, 2024
1 parent af10b71 commit 3214287
Showing 1 changed file with 94 additions and 6 deletions.
100 changes: 94 additions & 6 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ use std::task::{Context, Poll};
use arrow_array::{RecordBatch, StructArray as ArrowStructArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::arrow::buffer::NullBuffer;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::prelude::SessionContext;
use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result as DFResult};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion_common::{
exec_datafusion_err, DataFusionError, Result as DFResult, ScalarValue, ToDFSchema,
};
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
Expand Down Expand Up @@ -99,9 +106,16 @@ impl TableProvider for VortexInMemoryTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if !filters.is_empty() {
return exec_err!("vortex does not support filter pushdown");
}
let filter_expr = if filters.is_empty() {
None
} else {
Some(make_simplified_conjunction(
filters,
self.schema_ref.clone(),
)?)
};

println!("simplified filter: {filter_expr:?}");

let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) {
Partitioning::RoundRobinBatch(chunked_array.nchunks())
Expand All @@ -118,6 +132,7 @@ impl TableProvider for VortexInMemoryTableProvider {
Ok(Arc::new(VortexMemoryExec {
array: self.array.clone(),
projection: projection.cloned(),
filter_expr,
plan_properties,
}))
}
Expand All @@ -129,15 +144,88 @@ impl TableProvider for VortexInMemoryTableProvider {
// TODO(aduffy): add support for filter pushdown
Ok(filters
.iter()
.map(|_| TableProviderFilterPushDown::Unsupported)
.map(|expr| {
match expr {
// Several expressions can be pushed down.
Expr::BinaryExpr(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::Cast(_) => TableProviderFilterPushDown::Exact,

// All other expressions should be handled outside of the TableProvider
// via the normal DataFusion operator chain.
_ => TableProviderFilterPushDown::Unsupported,
}

TableProviderFilterPushDown::Exact
})
.collect())
}
}

struct ValidationVisitor {}

impl ValidationVisitor {

}

impl TreeNodeVisitor for ValidationVisitor {
type Node = Expr;

fn f_down(&mut self, node: &Self::Node) -> DFResult<TreeNodeRecursion> {

}
}

/// A mask determining the rows in an Array that should be treated as valid for query processing.
/// The vector is used to determine the take order of a set of things, or otherwise we determine
/// that we want to perform cross-filtering of the larger columns, if we so choose.
pub(crate) struct RowSelection {
selection: NullBuffer,
}

/// Convert a set of expressions that must all match into a single AND expression.
///
/// # Returns
///
/// If conversion is successful, the result will be a
/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction.
///
/// Note that the set of operators must be provided here instead.
///
/// # Simplification
///
/// Simplification will occur as part of this process, so constant folding and similar optimizations
/// will be applied before returning the final expression.
fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult<Expr> {
let init = Box::new(Expr::Literal(ScalarValue::Boolean(Some(true))));
let conjunction = filters.iter().fold(init, |conj, item| {
Box::new(Expr::BinaryExpr(BinaryExpr::new(
conj,
Operator::And,
Box::new(item.clone()),
)))
});

let schema = schema.to_dfschema_ref()?;

// simplify the expression.
let props = ExecutionProps::new();
let context = SimplifyContext::new(&props).with_schema(schema);
let simplifier = ExprSimplifier::new(context);

simplifier.simplify(*conjunction)
}

/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array.
#[derive(Debug, Clone)]
struct VortexMemoryExec {
array: Array,
filter_expr: Option<Expr>,
projection: Option<Vec<usize>>,
plan_properties: PlanProperties,
}
Expand Down

0 comments on commit 3214287

Please sign in to comment.