From e65857e11f5bd30b2dfe1bdbc8dfcd99344298ef Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 7 Aug 2024 23:45:01 -0400 Subject: [PATCH] more --- vortex-array/benches/compare.rs | 2 +- vortex-array/benches/scalar_subtract.rs | 2 +- vortex-datafusion/examples/table_provider.rs | 95 ++++ vortex-datafusion/src/datatype.rs | 4 +- vortex-datafusion/src/eval.rs | 2 +- vortex-datafusion/src/lib.rs | 426 ++---------------- vortex-datafusion/src/memory.rs | 425 +++++++++++++++++ vortex-datafusion/src/persistent/config.rs | 44 ++ vortex-datafusion/src/persistent/execution.rs | 103 +++++ vortex-datafusion/src/persistent/mod.rs | 4 + vortex-datafusion/src/persistent/opener.rs | 60 +++ vortex-datafusion/src/persistent/provider.rs | 92 ++++ vortex-datafusion/src/plans.rs | 12 +- vortex-error/Cargo.toml | 5 + vortex-error/src/lib.rs | 17 + 15 files changed, 887 insertions(+), 406 deletions(-) create mode 100644 vortex-datafusion/examples/table_provider.rs create mode 100644 vortex-datafusion/src/memory.rs create mode 100644 vortex-datafusion/src/persistent/config.rs create mode 100644 vortex-datafusion/src/persistent/execution.rs create mode 100644 vortex-datafusion/src/persistent/mod.rs create mode 100644 vortex-datafusion/src/persistent/opener.rs create mode 100644 vortex-datafusion/src/persistent/provider.rs diff --git a/vortex-array/benches/compare.rs b/vortex-array/benches/compare.rs index 5c3ca3d959..b8d7c35184 100644 --- a/vortex-array/benches/compare.rs +++ b/vortex-array/benches/compare.rs @@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; -use vortex::array::bool::BoolArray; +use vortex::array::BoolArray; use vortex::IntoArray; use vortex_error::VortexError; use vortex_expr::Operator; diff --git a/vortex-array/benches/scalar_subtract.rs b/vortex-array/benches/scalar_subtract.rs index 2722d623a7..f6608f850e 100644 --- a/vortex-array/benches/scalar_subtract.rs +++ b/vortex-array/benches/scalar_subtract.rs @@ -2,7 +2,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use itertools::Itertools; use rand::distributions::Uniform; use rand::{thread_rng, Rng}; -use vortex::array::chunked::ChunkedArray; +use vortex::array::ChunkedArray; use vortex::IntoArray; use vortex_error::VortexError; diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs new file mode 100644 index 0000000000..b8c6866e05 --- /dev/null +++ b/vortex-datafusion/examples/table_provider.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use datafusion::prelude::SessionContext; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::ObjectStore; +use tempfile::tempdir; +use tokio::fs::OpenOptions; +use url::Url; +use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray}; +use vortex::validity::Validity; +use vortex::IntoArray; +use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig}; +use vortex_datafusion::persistent::provider::VortexFileTableProvider; +use vortex_serde::layouts::writer::LayoutWriter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let temp_dir = tempdir()?; + let strings = ChunkedArray::from_iter([ + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + ]) + .into_array(); + + let numbers = ChunkedArray::from_iter([ + PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(), + PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(), + ]) + .into_array(); + + let st = StructArray::try_new( + ["strings".into(), "numbers".into()].into(), + vec![strings, numbers], + 8, + Validity::NonNullable, + ) + .unwrap(); + + let filepath = temp_dir.path().join("a.vtx"); + + let f = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&filepath) + .await?; + + let writer = LayoutWriter::new(f); + let writer = writer.write_array_columns(st.into_array()).await?; + writer.finalize().await?; + + let f = tokio::fs::File::open(&filepath).await?; + let file_size = f.metadata().await?.len(); + + let object_store: Arc = Arc::new(LocalFileSystem::new()); + let url = ObjectStoreUrl::local_filesystem(); + + let p = Path::from_filesystem_path(filepath)?; + + let config = VortexTableConfig::new( + Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8, false), + Field::new("numbers", DataType::UInt32, false), + ])), + vec![VortexFile::new(p, file_size)], + ); + + let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?); + + let ctx = SessionContext::new(); + ctx.register_table("vortex_tbl", Arc::clone(&provider) as _)?; + + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + run_query(&ctx, "SELECT * FROM vortex_tbl").await?; + + Ok(()) +} + +async fn run_query(ctx: &SessionContext, query_string: impl AsRef) -> anyhow::Result<()> { + let query_string = query_string.as_ref(); + + ctx.sql(&format!("EXPLAIN {query_string}")) + .await? + .show() + .await?; + + ctx.sql(query_string).await?.show().await?; + + Ok(()) +} diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs index 0a04e4e875..7ed6599176 100644 --- a/vortex-datafusion/src/datatype.rs +++ b/vortex-datafusion/src/datatype.rs @@ -15,8 +15,8 @@ use std::sync::Arc; use arrow_schema::{ DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, TimeUnit as ArrowTimeUnit, }; -use vortex::array::datetime::temporal::TemporalMetadata; -use vortex::array::datetime::TimeUnit; +use vortex::array::temporal::TemporalMetadata; +use vortex::array::TimeUnit; use vortex_dtype::{DType, Nullability, PType}; /// Convert a Vortex [struct DType][DType] to an Arrow [Schema]. diff --git a/vortex-datafusion/src/eval.rs b/vortex-datafusion/src/eval.rs index 187a33c423..49034f92b2 100644 --- a/vortex-datafusion/src/eval.rs +++ b/vortex-datafusion/src/eval.rs @@ -1,5 +1,5 @@ use datafusion_expr::{Expr, Operator as DFOperator}; -use vortex::array::constant::ConstantArray; +use vortex::array::ConstantArray; use vortex::compute::{and, compare, or}; use vortex::{Array, IntoArray}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index d3f8e23d5a..0c29b47794 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -11,27 +11,20 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::{DataType, SchemaRef}; -use async_trait::async_trait; -use datafusion::dataframe::DataFrame; -use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, -}; +use datafusion_expr::{Expr, Operator}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; use itertools::Itertools; -use vortex::array::chunked::ChunkedArray; +use memory::{VortexMemTable, VortexMemTableOptions}; +use vortex::array::ChunkedArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; -use vortex_dtype::DType; -use crate::datatype::infer_schema; -use crate::plans::{RowSelectorExec, TakeRowsExec}; +pub mod memory; +pub mod persistent; mod datatype; mod eval; @@ -47,25 +40,17 @@ const SUPPORTED_BINARY_OPS: &[Operator] = &[ Operator::LtEq, ]; -/// Optional configurations to pass when loading a [VortexMemTable]. -#[derive(Debug, Clone)] -pub struct VortexMemTableOptions { - pub enable_pushdown: bool, -} - -impl Default for VortexMemTableOptions { - fn default() -> Self { - Self { - enable_pushdown: true, - } - } -} - -impl VortexMemTableOptions { - pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self { - self.enable_pushdown = enable_pushdown; - self - } +fn supported_data_types(dt: DataType) -> bool { + dt.is_integer() + || dt.is_signed_integer() + || dt.is_floating() + || dt.is_null() + || dt == DataType::Boolean + || dt == DataType::Binary + || dt == DataType::Utf8 + || dt == DataType::Binary + || dt == DataType::BinaryView + || dt == DataType::Utf8View } pub trait SessionContextExt { @@ -96,7 +81,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult<()> { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -111,7 +96,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -121,203 +106,6 @@ impl SessionContextExt for SessionContext { } } -/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. -/// -/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as -/// a table to DataFusion. -#[derive(Debug, Clone)] -pub struct VortexMemTable { - array: ChunkedArray, - schema_ref: SchemaRef, - options: VortexMemTableOptions, -} - -impl VortexMemTable { - /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. - /// - /// # Panics - /// - /// Creation will panic if the provided array is not of `DType::Struct` type. - pub fn new(array: Array, options: VortexMemTableOptions) -> Self { - let arrow_schema = infer_schema(array.dtype()); - let schema_ref = SchemaRef::new(arrow_schema); - - let array = match ChunkedArray::try_from(&array) { - Ok(a) => a, - _ => { - let dtype = array.dtype().clone(); - ChunkedArray::try_new(vec![array], dtype).unwrap() - } - }; - - Self { - array, - schema_ref, - options, - } - } -} - -#[async_trait] -impl TableProvider for VortexMemTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema_ref) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - /// Plan an array scan. - /// - /// Currently, projection pushdown is supported, but not filter pushdown. - /// The array is flattened directly into the nearest Arrow-compatible encoding. - async fn scan( - &self, - state: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - _limit: Option, - ) -> DFResult> { - fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { - let referenced_columns: HashSet = - exprs.iter().flat_map(get_column_references).collect(); - - let projection: Vec = referenced_columns - .iter() - .map(|col_name| schema.column_with_name(col_name).unwrap().0) - .sorted() - .collect(); - - projection - } - - let filter_exprs = if filters.is_empty() { - None - } else { - Some(filters) - }; - - let output_projection: Vec = match projection { - None => (0..self.schema_ref.fields().len()).collect(), - Some(proj) => proj.clone(), - }; - - match filter_exprs { - // If there is a filter expression, we execute in two phases, first performing a filter - // on the input to get back row indices, and then taking the remaining struct columns - // using the calculated indices from the filter. - Some(filter_exprs) => { - let filter_projection = - get_filter_projection(filter_exprs, self.schema_ref.clone()); - - Ok(make_filter_then_take_plan( - self.schema_ref.clone(), - filter_exprs, - filter_projection, - self.array.clone(), - output_projection.clone(), - state, - )) - } - - // If no filters were pushed down, we materialize the entire StructArray into a - // RecordBatch and let DataFusion process the entire query. - _ => { - let output_schema = Arc::new( - self.schema_ref - .project(output_projection.as_slice()) - .expect("project output schema"), - ); - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(output_schema), - // non-pushdown scans execute in single partition, where the partition - // yields one RecordBatch per chunk in the input ChunkedArray - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ); - - Ok(Arc::new(VortexScanExec { - array: self.array.clone(), - scan_projection: output_projection.clone(), - plan_properties, - })) - } - } - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> DFResult> { - // In the case the caller has configured this provider with filter pushdown disabled, - // do not attempt to apply any filters at scan time. - if !self.options.enable_pushdown { - return Ok(filters - .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) - .collect()); - } - - filters - .iter() - .map(|expr| { - if can_be_pushed_down(expr) { - Ok(TableProviderFilterPushDown::Exact) - } else { - Ok(TableProviderFilterPushDown::Unsupported) - } - }) - .try_collect() - } -} - -/// Construct an operator plan that executes in two stages. -/// -/// The first plan stage only materializes the columns related to the provided set of filter -/// expressions. It evaluates the filters into a row selection. -/// -/// The second stage receives the row selection above and dispatches a `take` on the remaining -/// columns. -fn make_filter_then_take_plan( - schema: SchemaRef, - filter_exprs: &[Expr], - filter_projection: Vec, - chunked_array: ChunkedArray, - output_projection: Vec, - _session_state: &SessionState, -) -> Arc { - let row_selector_op = Arc::new(RowSelectorExec::new( - filter_exprs, - filter_projection, - &chunked_array, - )); - - Arc::new(TakeRowsExec::new( - schema.clone(), - &output_projection, - row_selector_op.clone(), - &chunked_array, - )) -} - -fn supported_data_types(dt: DataType) -> bool { - dt.is_integer() - || dt.is_signed_integer() - || dt.is_floating() - || dt.is_null() - || dt == DataType::Boolean - || dt == DataType::Binary - || dt == DataType::Utf8 - || dt == DataType::Binary - || dt == DataType::BinaryView - || dt == DataType::Utf8View -} - fn can_be_pushed_down(expr: &Expr) -> bool { match expr { Expr::BinaryExpr(expr) @@ -331,6 +119,19 @@ fn can_be_pushed_down(expr: &Expr) -> bool { } } +fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection +} + /// Extract out the columns from our table referenced by the expression. fn get_column_references(expr: &Expr) -> HashSet { let mut references = HashSet::new(); @@ -475,164 +276,3 @@ impl ExecutionPlan for VortexScanExec { })) } } - -#[cfg(test)] -mod test { - use arrow_array::types::Int64Type; - use datafusion::arrow::array::AsArray; - use datafusion::functions_aggregate::count::count_distinct; - use datafusion::prelude::SessionContext; - use datafusion_common::{Column, TableReference}; - use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; - use vortex::array::primitive::PrimitiveArray; - use vortex::array::struct_::StructArray; - use vortex::array::varbin::VarBinArray; - use vortex::validity::Validity; - use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; - - use crate::{can_be_pushed_down, SessionContextExt, VortexMemTableOptions}; - - fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ - "Washington", - "Adams", - "Jefferson", - "Madison", - "Monroe", - "Adams", - ], - DType::Utf8(Nullability::NonNullable), - ); - let term_start = PrimitiveArray::from_vec( - vec![1789u16, 1797, 1801, 1809, 1817, 1825], - Validity::NonNullable, - ); - - StructArray::from_fields(&[ - ("president", names.into_array()), - ("term_start", term_start.into_array()), - ]) - .into_array() - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx.read_vortex(presidents_array()).unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_no_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx - .read_vortex_opts( - presidents_array(), - // Disable pushdown. We run this test to make sure that the naive codepath also - // produces correct results and does not panic anywhere. - VortexMemTableOptions::default().with_pushdown(false), - ) - .unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .filter(col("term_start").lt(lit(2000))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[test] - fn test_can_be_pushed_down0() { - let e = BinaryExpr { - left: Box::new( - Column { - relation: Some(TableReference::Bare { - table: "orders".into(), - }), - name: "o_orderstatus".to_string(), - } - .into(), - ), - op: Operator::Eq, - right: Box::new(lit("F")), - }; - let e = Expr::BinaryExpr(e); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down1() { - let e = lit("hello"); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down2() { - let e = lit(3); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down3() { - let e = BinaryExpr { - left: Box::new(col("nums")), - op: Operator::Modulo, - right: Box::new(lit(5)), - }; - let e = Expr::BinaryExpr(e); - - assert!(!can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down4() { - let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); - assert!(can_be_pushed_down(&e)); - } -} diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs new file mode 100644 index 0000000000..47fc413730 --- /dev/null +++ b/vortex-datafusion/src/memory.rs @@ -0,0 +1,425 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::prelude::*; +use datafusion_common::Result as DFResult; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}; +use itertools::Itertools; +use vortex::array::ChunkedArray; +use vortex::{Array, ArrayDType as _}; +use vortex_dtype::DType; + +use crate::datatype::infer_schema; +use crate::plans::{RowSelectorExec, TakeRowsExec}; +use crate::{can_be_pushed_down, get_filter_projection, VortexScanExec}; + +/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. +/// +/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as +/// a table to DataFusion. +#[derive(Debug, Clone)] +pub struct VortexMemTable { + array: ChunkedArray, + schema_ref: SchemaRef, + options: VortexMemTableOptions, +} + +impl VortexMemTable { + /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. + /// + /// # Panics + /// + /// Creation will panic if the provided array is not of `DType::Struct` type. + pub fn new(array: Array, options: VortexMemTableOptions) -> Self { + let arrow_schema = infer_schema(array.dtype()); + let schema_ref = SchemaRef::new(arrow_schema); + + let array = match ChunkedArray::try_from(&array) { + Ok(a) => a, + _ => { + let dtype = array.dtype().clone(); + ChunkedArray::try_new(vec![array], dtype).unwrap() + } + }; + + Self { + array, + schema_ref, + options, + } + } +} + +#[async_trait] +impl TableProvider for VortexMemTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Plan an array scan. + /// + /// Currently, projection pushdown is supported, but not filter pushdown. + /// The array is flattened directly into the nearest Arrow-compatible encoding. + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let filter_exprs = if filters.is_empty() { + None + } else { + Some(filters) + }; + + let output_projection: Vec = match projection { + None => (0..self.schema_ref.fields().len()).collect(), + Some(proj) => proj.clone(), + }; + + match filter_exprs { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaining struct columns + // using the calculated indices from the filter. + Some(filter_exprs) => { + let filter_projection = + get_filter_projection(filter_exprs, self.schema_ref.clone()); + + Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + output_projection.clone(), + state, + )) + } + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => { + let output_schema = Arc::new( + self.schema_ref + .project(output_projection.as_slice()) + .expect("project output schema"), + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema), + // non-pushdown scans execute in single partition, where the partition + // yields one RecordBatch per chunk in the input ChunkedArray + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + scan_projection: output_projection.clone(), + plan_properties, + })) + } + } + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + // In the case the caller has configured this provider with filter pushdown disabled, + // do not attempt to apply any filters at scan time. + if !self.options.enable_pushdown { + return Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()); + } + + filters + .iter() + .map(|expr| { + if can_be_pushed_down(expr) { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .try_collect() + } +} + +/// Optional configurations to pass when loading a [VortexMemTable]. +#[derive(Debug, Clone)] +pub struct VortexMemTableOptions { + pub enable_pushdown: bool, +} + +impl Default for VortexMemTableOptions { + fn default() -> Self { + Self { + enable_pushdown: true, + } + } +} + +impl VortexMemTableOptions { + pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self { + self.enable_pushdown = enable_pushdown; + self + } +} + +pub trait SessionContextExt { + fn register_vortex>(&self, name: S, array: Array) -> DFResult<()> { + self.register_vortex_opts(name, array, VortexMemTableOptions::default()) + } + + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()>; + + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; +} + +impl SessionContextExt for SessionContext { + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()> { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + self.register_table(name.as_ref(), Arc::new(vortex_table)) + .map(|_| ()) + } + + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + + self.read_table(Arc::new(vortex_table)) + } +} + +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + schema: SchemaRef, + filter_exprs: &[Expr], + filter_projection: Vec, + chunked_array: ChunkedArray, + output_projection: Vec, + _session_state: &SessionState, +) -> Arc { + let row_selector_op = Arc::new(RowSelectorExec::new( + filter_exprs, + filter_projection, + &chunked_array, + )); + + Arc::new(TakeRowsExec::new( + schema.clone(), + &output_projection, + row_selector_op.clone(), + &chunked_array, + )) +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray as _; + use arrow_array::types::Int64Type; + use datafusion::functions_aggregate::count::count_distinct; + use datafusion::prelude::SessionContext; + use datafusion_common::{Column, TableReference}; + use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; + use vortex::array::{PrimitiveArray, StructArray, VarBinArray}; + use vortex::validity::Validity; + use vortex::{Array, IntoArray}; + use vortex_dtype::{DType, Nullability}; + + use crate::can_be_pushed_down; + use crate::memory::{SessionContextExt as _, VortexMemTableOptions}; + + fn presidents_array() -> Array { + let names = VarBinArray::from_vec( + vec![ + "Washington", + "Adams", + "Jefferson", + "Madison", + "Monroe", + "Adams", + ], + DType::Utf8(Nullability::NonNullable), + ); + let term_start = PrimitiveArray::from_vec( + vec![1789u16, 1797, 1801, 1809, 1817, 1825], + Validity::NonNullable, + ); + + StructArray::from_fields(&[ + ("president", names.into_array()), + ("term_start", term_start.into_array()), + ]) + .into_array() + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_no_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_pushdown(false), + ) + .unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .filter(col("term_start").lt(lit(2000))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[test] + fn test_can_be_pushed_down0() { + let e = BinaryExpr { + left: Box::new( + Column { + relation: Some(TableReference::Bare { + table: "orders".into(), + }), + name: "o_orderstatus".to_string(), + } + .into(), + ), + op: Operator::Eq, + right: Box::new(lit("F")), + }; + let e = Expr::BinaryExpr(e); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down1() { + let e = lit("hello"); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down2() { + let e = lit(3); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down3() { + let e = BinaryExpr { + left: Box::new(col("nums")), + op: Operator::Modulo, + right: Box::new(lit(5)), + }; + let e = Expr::BinaryExpr(e); + + assert!(!can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down4() { + let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); + assert!(can_be_pushed_down(&e)); + } +} diff --git a/vortex-datafusion/src/persistent/config.rs b/vortex-datafusion/src/persistent/config.rs new file mode 100644 index 0000000000..f60e7ed943 --- /dev/null +++ b/vortex-datafusion/src/persistent/config.rs @@ -0,0 +1,44 @@ +use arrow_schema::SchemaRef; +use chrono::TimeZone as _; +use datafusion::datasource::listing::PartitionedFile; +use object_store::path::Path; +use object_store::ObjectMeta; + +#[derive(Clone)] +pub struct VortexFile { + pub(crate) object_meta: ObjectMeta, +} + +impl From for PartitionedFile { + fn from(value: VortexFile) -> Self { + PartitionedFile::new(value.object_meta.location, value.object_meta.size as u64) + } +} + +impl VortexFile { + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + } + } +} + +pub struct VortexTableConfig { + pub(crate) data_files: Vec, + pub(crate) schema: Option, +} + +impl VortexTableConfig { + pub fn new(schema: SchemaRef, data_files: Vec) -> Self { + Self { + data_files, + schema: Some(schema), + } + } +} diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs new file mode 100644 index 0000000000..51f8f84003 --- /dev/null +++ b/vortex-datafusion/src/persistent/execution.rs @@ -0,0 +1,103 @@ +use std::fmt; +use std::sync::Arc; + +use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; +use datafusion_common::Result as DFResult; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; + +use crate::persistent::opener::VortexFileOpener; + +#[derive(Debug)] +pub struct VortexExec { + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, + predicate: Option>, + plan_properties: PlanProperties, + projection: Option>, +} + +impl VortexExec { + pub fn try_new( + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, + projection: Option<&Vec>, + predicate: Option>, + ) -> DFResult { + let partitioning = Partitioning::UnknownPartitioning(1); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(file_scan_config.file_schema.clone()), + partitioning, + ExecutionMode::Bounded, + ); + let projection = projection.cloned(); + + Ok(Self { + file_scan_config, + metrics, + predicate, + projection, + plan_properties, + }) + } + pub(crate) fn into_arc(self) -> Arc { + Arc::new(self) as _ + } +} + +impl DisplayAs for VortexExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "VortexExec: ")?; + self.file_scan_config.fmt_as(t, f)?; + + Ok(()) + } +} + +impl ExecutionPlan for VortexExec { + fn name(&self) -> &str { + "VortexExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let object_store = context + .runtime_env() + .object_store(&self.file_scan_config.object_store_url)?; + let opener = VortexFileOpener { + object_store, + projection: self.projection.clone(), + batch_size: None, + predicate: self.predicate.clone(), + }; + let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; + + Ok(Box::pin(stream)) + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs new file mode 100644 index 0000000000..bed3bd9d43 --- /dev/null +++ b/vortex-datafusion/src/persistent/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod execution; +pub mod opener; +pub mod provider; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs new file mode 100644 index 0000000000..5173e82c14 --- /dev/null +++ b/vortex-datafusion/src/persistent/opener.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use arrow_array::cast::as_struct_array; +use arrow_array::RecordBatch; +use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; +use datafusion_common::Result as DFResult; +use datafusion_physical_expr::PhysicalExpr; +use futures::{FutureExt as _, TryStreamExt}; +use object_store::ObjectStore; +use vortex::IntoCanonical; +use vortex_serde::io::ObjectStoreReadAt; +use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder; +use vortex_serde::layouts::reader::context::LayoutDeserializer; +use vortex_serde::layouts::reader::projections::Projection; + +pub struct VortexFileOpener { + pub object_store: Arc, + pub batch_size: Option, + pub projection: Option>, + pub predicate: Option>, +} + +impl FileOpener for VortexFileOpener { + fn open(&self, file_meta: FileMeta) -> DFResult { + let read_at = + ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); + + let mut builder = VortexLayoutReaderBuilder::new(read_at, LayoutDeserializer::default()); + + if let Some(batch_size) = self.batch_size { + builder = builder.with_batch_size(batch_size); + } + + if let Some(_predicate) = self.predicate.as_ref() { + log::warn!("Missing logic to turn a physical expression into a RowFilter"); + } + + if let Some(projection) = self.projection.as_ref() { + builder = builder.with_projection(Projection::new(projection)) + } + + Ok(async move { + let reader = builder.build().await?; + + let stream = reader + .map_ok(|array| { + let arrow = array + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(arrow.as_ref()); + RecordBatch::from(struct_array) + }) + .map_err(|e| e.into()); + + Ok(Box::pin(stream) as _) + } + .boxed()) + } +} diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs new file mode 100644 index 0000000000..460d350803 --- /dev/null +++ b/vortex-datafusion/src/persistent/provider.rs @@ -0,0 +1,92 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion_common::{DFSchema, Result as DFResult, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::ExecutionPlan; + +use super::config::VortexTableConfig; +use crate::persistent::execution::VortexExec; + +pub struct VortexFileTableProvider { + schema_ref: SchemaRef, + object_store_url: ObjectStoreUrl, + config: VortexTableConfig, +} + +impl VortexFileTableProvider { + pub fn try_new(object_store_url: ObjectStoreUrl, config: VortexTableConfig) -> DFResult { + Ok(Self { + schema_ref: config.schema.clone().unwrap(), + object_store_url, + config, + }) + } +} + +#[async_trait] +impl TableProvider for VortexFileTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let df_schema = DFSchema::try_from(self.schema())?; + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()?; + + let metrics = ExecutionPlanMetricsSet::new(); + + // TODO: Point at some files and/or ranges + let file_scan_config = FileScanConfig::new(self.object_store_url.clone(), self.schema()) + .with_file_group( + self.config + .data_files + .iter() + .cloned() + .map(|f| f.into()) + .collect(), + ) + .with_projection(projection.cloned()); + + let exec = + VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc(); + + Ok(exec) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + fn statistics(&self) -> Option { + None + } +} diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 5ac038a2e1..d9380d0912 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -20,10 +20,10 @@ use datafusion_physical_plan::{ use futures::{ready, Stream}; use lazy_static::lazy_static; use pin_project::pin_project; -use vortex::array::chunked::ChunkedArray; +use vortex::array::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::compute::take; -use vortex::{ArrayDType, ArrayData, IntoArray, IntoArrayVariant, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant, IntoCanonical}; use crate::datatype::infer_schema; use crate::eval::ExpressionEvaluator; @@ -346,8 +346,7 @@ where ); let row_indices = - ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) - .into_array(); + Array::from_arrow(record_batch.column(0).as_primitive::(), false); // If no columns in the output projection, we send back a RecordBatch with empty schema. // This is common for COUNT queries. @@ -403,10 +402,7 @@ mod test { use arrow_array::{RecordBatch, UInt64Array}; use datafusion_expr::{and, col, lit}; use itertools::Itertools; - use vortex::array::bool::BoolArray; - use vortex::array::chunked::ChunkedArray; - use vortex::array::primitive::PrimitiveArray; - use vortex::array::struct_::StructArray; + use vortex::array::{BoolArray, ChunkedArray, PrimitiveArray, StructArray}; use vortex::validity::Validity; use vortex::{ArrayDType, IntoArray}; use vortex_dtype::FieldName; diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index 646c42e3bf..f2bdb9e6d4 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -15,8 +15,13 @@ rust-version = { workspace = true } name = "vortex_error" path = "src/lib.rs" +[features] +flatbuffers = ["dep:flatbuffers"] +datafusion = ["datafusion-common"] + [dependencies] arrow-schema = { workspace = true } +datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true, optional = true } flexbuffers = { workspace = true, optional = true } object_store = { workspace = true, optional = true } diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 842cc9a9de..fe1ca3167e 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -195,6 +195,23 @@ macro_rules! vortex_bail { }; } +#[cfg(feature = "datafusion")] +impl From for datafusion_common::DataFusionError { + fn from(value: VortexError) -> Self { + Self::External(Box::new(value)) + } +} + +#[cfg(feature = "datafusion")] +impl From for datafusion_common::arrow::error::ArrowError { + fn from(value: VortexError) -> Self { + match value { + VortexError::ArrowError(e) => e, + _ => Self::from_external_error(Box::new(value)), + } + } +} + // Not public, referenced by macros only. #[doc(hidden)] pub mod __private {