From bd6a0153fee406e0f32495d973232fff2370c734 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 10:15:44 +0100 Subject: [PATCH 01/20] Move the inmemory provider to a new mod --- vortex-datafusion/src/lib.rs | 430 +------------------------------ vortex-datafusion/src/memory.rs | 433 ++++++++++++++++++++++++++++++++ 2 files changed, 437 insertions(+), 426 deletions(-) create mode 100644 vortex-datafusion/src/memory.rs diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index af4cd1be7f..34133279f6 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -11,27 +11,16 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::{DataType, SchemaRef}; -use async_trait::async_trait; -use datafusion::dataframe::DataFrame; -use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::prelude::SessionContext; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; -use datafusion_physical_expr::EquivalenceProperties; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, -}; +use datafusion_expr::{Expr, Operator}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; -use itertools::Itertools; use vortex::array::chunked::ChunkedArray; -use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; -use vortex_dtype::DType; +use vortex::{ArrayDType, IntoArrayVariant, IntoCanonical}; -use crate::datatype::infer_schema; -use crate::plans::{RowSelectorExec, TakeRowsExec}; +pub mod memory; mod datatype; mod eval; @@ -47,256 +36,6 @@ const SUPPORTED_BINARY_OPS: &[Operator] = &[ Operator::LtEq, ]; -/// Optional configurations to pass when loading a [VortexMemTable]. -#[derive(Default, Debug, Clone)] -pub struct VortexMemTableOptions { - pub disable_pushdown: bool, -} - -impl VortexMemTableOptions { - pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self { - self.disable_pushdown = disable_pushdown; - self - } -} - -pub trait SessionContextExt { - fn register_vortex>(&self, name: S, array: Array) -> DFResult<()> { - self.register_vortex_opts(name, array, VortexMemTableOptions::default()) - } - - fn register_vortex_opts>( - &self, - name: S, - array: Array, - options: VortexMemTableOptions, - ) -> DFResult<()>; - - fn read_vortex(&self, array: Array) -> DFResult { - self.read_vortex_opts(array, VortexMemTableOptions::default()) - } - - fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) - -> DFResult; -} - -impl SessionContextExt for SessionContext { - fn register_vortex_opts>( - &self, - name: S, - array: Array, - options: VortexMemTableOptions, - ) -> DFResult<()> { - assert!( - matches!(array.dtype(), DType::Struct(_, _)), - "Vortex arrays must have struct type" - ); - - let vortex_table = VortexMemTable::new(array, options); - self.register_table(name.as_ref(), Arc::new(vortex_table)) - .map(|_| ()) - } - - fn read_vortex_opts( - &self, - array: Array, - options: VortexMemTableOptions, - ) -> DFResult { - assert!( - matches!(array.dtype(), DType::Struct(_, _)), - "Vortex arrays must have struct type" - ); - - let vortex_table = VortexMemTable::new(array, options); - - self.read_table(Arc::new(vortex_table)) - } -} - -/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. -/// -/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as -/// a table to DataFusion. -#[derive(Debug, Clone)] -pub struct VortexMemTable { - array: ChunkedArray, - schema_ref: SchemaRef, - options: VortexMemTableOptions, -} - -impl VortexMemTable { - /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. - /// - /// # Panics - /// - /// Creation will panic if the provided array is not of `DType::Struct` type. - pub fn new(array: Array, options: VortexMemTableOptions) -> Self { - let arrow_schema = infer_schema(array.dtype()); - let schema_ref = SchemaRef::new(arrow_schema); - - let array = match ChunkedArray::try_from(&array) { - Ok(a) => a, - _ => { - let dtype = array.dtype().clone(); - ChunkedArray::try_new(vec![array], dtype).unwrap() - } - }; - - Self { - array, - schema_ref, - options, - } - } -} - -#[async_trait] -impl TableProvider for VortexMemTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema_ref) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - /// Plan an array scan. - /// - /// Currently, projection pushdown is supported, but not filter pushdown. - /// The array is flattened directly into the nearest Arrow-compatible encoding. - async fn scan( - &self, - state: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - _limit: Option, - ) -> DFResult> { - fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { - let referenced_columns: HashSet = - exprs.iter().flat_map(get_column_references).collect(); - - let projection: Vec = referenced_columns - .iter() - .map(|col_name| schema.column_with_name(col_name).unwrap().0) - .sorted() - .collect(); - - projection - } - - let filter_exprs = if filters.is_empty() { - None - } else { - Some(filters) - }; - - let output_projection: Vec = match projection { - None => (0..self.schema_ref.fields().len()).collect(), - Some(proj) => proj.clone(), - }; - - match filter_exprs { - // If there is a filter expression, we execute in two phases, first performing a filter - // on the input to get back row indices, and then taking the remaining struct columns - // using the calculated indices from the filter. - Some(filter_exprs) => { - let filter_projection = - get_filter_projection(filter_exprs, self.schema_ref.clone()); - - Ok(make_filter_then_take_plan( - self.schema_ref.clone(), - filter_exprs, - filter_projection, - self.array.clone(), - output_projection.clone(), - state, - )) - } - - // If no filters were pushed down, we materialize the entire StructArray into a - // RecordBatch and let DataFusion process the entire query. - _ => { - let output_schema = Arc::new( - self.schema_ref - .project(output_projection.as_slice()) - .expect("project output schema"), - ); - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(output_schema), - // non-pushdown scans execute in single partition, where the partition - // yields one RecordBatch per chunk in the input ChunkedArray - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ); - - Ok(Arc::new(VortexScanExec { - array: self.array.clone(), - scan_projection: output_projection.clone(), - plan_properties, - })) - } - } - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> DFResult> { - // In the case the caller has configured this provider with filter pushdown disabled, - // do not attempt to apply any filters at scan time. - if self.options.disable_pushdown { - return Ok(filters - .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) - .collect()); - } - - filters - .iter() - .map(|expr| { - if can_be_pushed_down(expr) { - Ok(TableProviderFilterPushDown::Exact) - } else { - Ok(TableProviderFilterPushDown::Unsupported) - } - }) - .try_collect() - } -} - -/// Construct an operator plan that executes in two stages. -/// -/// The first plan stage only materializes the columns related to the provided set of filter -/// expressions. It evaluates the filters into a row selection. -/// -/// The second stage receives the row selection above and dispatches a `take` on the remaining -/// columns. -fn make_filter_then_take_plan( - schema: SchemaRef, - filter_exprs: &[Expr], - filter_projection: Vec, - chunked_array: ChunkedArray, - output_projection: Vec, - _session_state: &SessionState, -) -> Arc { - let row_selector_op = Arc::new(RowSelectorExec::new( - filter_exprs, - filter_projection, - &chunked_array, - )); - - Arc::new(TakeRowsExec::new( - schema.clone(), - &output_projection, - row_selector_op.clone(), - &chunked_array, - )) -} - fn supported_data_types(dt: DataType) -> bool { dt.is_integer() || dt.is_signed_integer() @@ -467,164 +206,3 @@ impl ExecutionPlan for VortexScanExec { })) } } - -#[cfg(test)] -mod test { - use arrow_array::types::Int64Type; - use datafusion::arrow::array::AsArray; - use datafusion::functions_aggregate::count::count_distinct; - use datafusion::prelude::SessionContext; - use datafusion_common::{Column, TableReference}; - use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; - use vortex::array::primitive::PrimitiveArray; - use vortex::array::struct_::StructArray; - use vortex::array::varbin::VarBinArray; - use vortex::validity::Validity; - use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; - - use crate::{can_be_pushed_down, SessionContextExt, VortexMemTableOptions}; - - fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ - "Washington", - "Adams", - "Jefferson", - "Madison", - "Monroe", - "Adams", - ], - DType::Utf8(Nullability::NonNullable), - ); - let term_start = PrimitiveArray::from_vec( - vec![1789u16, 1797, 1801, 1809, 1817, 1825], - Validity::NonNullable, - ); - - StructArray::from_fields(&[ - ("president", names.into_array()), - ("term_start", term_start.into_array()), - ]) - .into_array() - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx.read_vortex(presidents_array()).unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[tokio::test] - #[cfg_attr(miri, ignore)] - async fn test_datafusion_no_pushdown() { - let ctx = SessionContext::new(); - - let df = ctx - .read_vortex_opts( - presidents_array(), - // Disable pushdown. We run this test to make sure that the naive codepath also - // produces correct results and does not panic anywhere. - VortexMemTableOptions::default().with_disable_pushdown(true), - ) - .unwrap(); - - let distinct_names = df - .filter(col("term_start").gt_eq(lit(1795))) - .unwrap() - .filter(col("term_start").lt(lit(2000))) - .unwrap() - .aggregate(vec![], vec![count_distinct(col("president"))]) - .unwrap() - .collect() - .await - .unwrap(); - - assert_eq!(distinct_names.len(), 1); - - assert_eq!( - *distinct_names[0] - .column(0) - .as_primitive::() - .values() - .first() - .unwrap(), - 4i64 - ); - } - - #[test] - fn test_can_be_pushed_down0() { - let e = BinaryExpr { - left: Box::new( - Column { - relation: Some(TableReference::Bare { - table: "orders".into(), - }), - name: "o_orderstatus".to_string(), - } - .into(), - ), - op: Operator::Eq, - right: Box::new(lit("F")), - }; - let e = Expr::BinaryExpr(e); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down1() { - let e = lit("hello"); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down2() { - let e = lit(3); - - assert!(can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down3() { - let e = BinaryExpr { - left: Box::new(col("nums")), - op: Operator::Modulo, - right: Box::new(lit(5)), - }; - let e = Expr::BinaryExpr(e); - - assert!(!can_be_pushed_down(&e)); - } - - #[test] - fn test_can_be_pushed_down4() { - let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); - assert!(can_be_pushed_down(&e)); - } -} diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs new file mode 100644 index 0000000000..b51d41a24d --- /dev/null +++ b/vortex-datafusion/src/memory.rs @@ -0,0 +1,433 @@ +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::prelude::*; +use datafusion_common::Result as DFResult; +use datafusion_expr::{TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties}; +use itertools::Itertools; +use vortex::array::chunked::ChunkedArray; +use vortex::{Array, ArrayDType as _}; +use vortex_dtype::DType; + +use crate::datatype::infer_schema; +use crate::plans::{RowSelectorExec, TakeRowsExec}; +use crate::{can_be_pushed_down, get_column_references, VortexScanExec}; + +/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. +/// +/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as +/// a table to DataFusion. +#[derive(Debug, Clone)] +pub struct VortexMemTable { + array: ChunkedArray, + schema_ref: SchemaRef, + options: VortexMemTableOptions, +} + +impl VortexMemTable { + /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. + /// + /// # Panics + /// + /// Creation will panic if the provided array is not of `DType::Struct` type. + pub fn new(array: Array, options: VortexMemTableOptions) -> Self { + let arrow_schema = infer_schema(array.dtype()); + let schema_ref = SchemaRef::new(arrow_schema); + + let array = match ChunkedArray::try_from(&array) { + Ok(a) => a, + _ => { + let dtype = array.dtype().clone(); + ChunkedArray::try_new(vec![array], dtype).unwrap() + } + }; + + Self { + array, + schema_ref, + options, + } + } +} + +#[async_trait] +impl TableProvider for VortexMemTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Plan an array scan. + /// + /// Currently, projection pushdown is supported, but not filter pushdown. + /// The array is flattened directly into the nearest Arrow-compatible encoding. + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection + } + + let filter_exprs = if filters.is_empty() { + None + } else { + Some(filters) + }; + + let output_projection: Vec = match projection { + None => (0..self.schema_ref.fields().len()).collect(), + Some(proj) => proj.clone(), + }; + + match filter_exprs { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaining struct columns + // using the calculated indices from the filter. + Some(filter_exprs) => { + let filter_projection = + get_filter_projection(filter_exprs, self.schema_ref.clone()); + + Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + output_projection.clone(), + state, + )) + } + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => { + let output_schema = Arc::new( + self.schema_ref + .project(output_projection.as_slice()) + .expect("project output schema"), + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema), + // non-pushdown scans execute in single partition, where the partition + // yields one RecordBatch per chunk in the input ChunkedArray + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + scan_projection: output_projection.clone(), + plan_properties, + })) + } + } + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + // In the case the caller has configured this provider with filter pushdown disabled, + // do not attempt to apply any filters at scan time. + if self.options.disable_pushdown { + return Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()); + } + + filters + .iter() + .map(|expr| { + if can_be_pushed_down(expr) { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .try_collect() + } +} + +/// Optional configurations to pass when loading a [VortexMemTable]. +#[derive(Default, Debug, Clone)] +pub struct VortexMemTableOptions { + pub disable_pushdown: bool, +} + +impl VortexMemTableOptions { + pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self { + self.disable_pushdown = disable_pushdown; + self + } +} + +pub trait SessionContextExt { + fn register_vortex>(&self, name: S, array: Array) -> DFResult<()> { + self.register_vortex_opts(name, array, VortexMemTableOptions::default()) + } + + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()>; + + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; +} + +impl SessionContextExt for SessionContext { + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()> { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + self.register_table(name.as_ref(), Arc::new(vortex_table)) + .map(|_| ()) + } + + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + + self.read_table(Arc::new(vortex_table)) + } +} + +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + schema: SchemaRef, + filter_exprs: &[Expr], + filter_projection: Vec, + chunked_array: ChunkedArray, + output_projection: Vec, + _session_state: &SessionState, +) -> Arc { + let row_selector_op = Arc::new(RowSelectorExec::new( + filter_exprs, + filter_projection, + &chunked_array, + )); + + Arc::new(TakeRowsExec::new( + schema.clone(), + &output_projection, + row_selector_op.clone(), + &chunked_array, + )) +} + +#[cfg(test)] +mod test { + use arrow_array::cast::AsArray as _; + use arrow_array::types::Int64Type; + use datafusion::functions_aggregate::count::count_distinct; + use datafusion::prelude::SessionContext; + use datafusion_common::{Column, TableReference}; + use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::array::varbin::VarBinArray; + use vortex::validity::Validity; + use vortex::{Array, IntoArray}; + use vortex_dtype::{DType, Nullability}; + + use crate::can_be_pushed_down; + use crate::memory::{SessionContextExt as _, VortexMemTableOptions}; + + fn presidents_array() -> Array { + let names = VarBinArray::from_vec( + vec![ + "Washington", + "Adams", + "Jefferson", + "Madison", + "Monroe", + "Adams", + ], + DType::Utf8(Nullability::NonNullable), + ); + let term_start = PrimitiveArray::from_vec( + vec![1789u16, 1797, 1801, 1809, 1817, 1825], + Validity::NonNullable, + ); + + StructArray::from_fields(&[ + ("president", names.into_array()), + ("term_start", term_start.into_array()), + ]) + .into_array() + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn test_datafusion_no_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .filter(col("term_start").lt(lit(2000))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[test] + fn test_can_be_pushed_down0() { + let e = BinaryExpr { + left: Box::new( + Column { + relation: Some(TableReference::Bare { + table: "orders".into(), + }), + name: "o_orderstatus".to_string(), + } + .into(), + ), + op: Operator::Eq, + right: Box::new(lit("F")), + }; + let e = Expr::BinaryExpr(e); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down1() { + let e = lit("hello"); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down2() { + let e = lit(3); + + assert!(can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down3() { + let e = BinaryExpr { + left: Box::new(col("nums")), + op: Operator::Modulo, + right: Box::new(lit(5)), + }; + let e = Expr::BinaryExpr(e); + + assert!(!can_be_pushed_down(&e)); + } + + #[test] + fn test_can_be_pushed_down4() { + let e = and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))); + assert!(can_be_pushed_down(&e)); + } +} From cc49e53aed882d01d91f194187cb1afdffd27e6d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 10:44:37 +0100 Subject: [PATCH 02/20] Initial work --- vortex-datafusion/src/lib.rs | 59 ++++++++++++++++++++++++++++- vortex-datafusion/src/persistent.rs | 54 ++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 vortex-datafusion/src/persistent.rs diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 34133279f6..c2804f7aa6 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,15 +12,19 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::{DataType, SchemaRef}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; use datafusion_expr::{Expr, Operator}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; +use memory::{VortexMemTable, VortexMemTableOptions}; use vortex::array::chunked::ChunkedArray; -use vortex::{ArrayDType, IntoArrayVariant, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; +use vortex_dtype::DType; pub mod memory; +pub mod persistent; mod datatype; mod eval; @@ -49,6 +53,59 @@ fn supported_data_types(dt: DataType) -> bool { || dt == DataType::Utf8View } +pub trait SessionContextExt { + fn register_vortex>(&self, name: S, array: Array) -> DFResult<()> { + self.register_vortex_opts(name, array, VortexMemTableOptions::default()) + } + + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()>; + + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; +} + +impl SessionContextExt for SessionContext { + fn register_vortex_opts>( + &self, + name: S, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult<()> { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + self.register_table(name.as_ref(), Arc::new(vortex_table)) + .map(|_| ()) + } + + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { + assert!( + matches!(array.dtype(), DType::Struct(_, _)), + "Vortex arrays must have struct type" + ); + + let vortex_table = VortexMemTable::new(array, options); + + self.read_table(Arc::new(vortex_table)) + } +} + fn can_be_pushed_down(expr: &Expr) -> bool { match expr { Expr::BinaryExpr(expr) diff --git a/vortex-datafusion/src/persistent.rs b/vortex-datafusion/src/persistent.rs new file mode 100644 index 0000000000..9ca7ef24ef --- /dev/null +++ b/vortex-datafusion/src/persistent.rs @@ -0,0 +1,54 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion_common::{Result as DFResult, Statistics}; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::ExecutionPlan; + +pub struct VortexFileTableProvider {} + +#[async_trait] +impl TableProvider for VortexFileTableProvider { + fn as_any(&self) -> &dyn Any { + todo!() + } + + fn schema(&self) -> SchemaRef { + todo!() + } + + fn table_type(&self) -> TableType { + todo!() + } + + async fn scan( + &self, + _state: &SessionState, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + todo!() + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } + + fn statistics(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests {} From 3090fb483476bf250d481fff35dd6989bbcbbb25 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 11:25:27 +0100 Subject: [PATCH 03/20] Fix uses --- bench-vortex/benches/datafusion_benchmark.rs | 2 +- bench-vortex/src/tpch/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 85494de987..d7e157208d 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -16,7 +16,7 @@ use lazy_static::lazy_static; use vortex::compress::CompressionStrategy; use vortex::encoding::EncodingRef; use vortex::{Array, Context, IntoArray, ToArrayData}; -use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_datafusion::memory::{VortexMemTable, VortexMemTableOptions}; use vortex_dict::DictEncoding; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor; diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index fc81921483..8d2f061dac 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -10,7 +10,8 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray}; -use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; +use vortex_datafusion::memory::VortexMemTableOptions; +use vortex_datafusion::SessionContextExt; use crate::idempotent_async; From e78bcc4491eb65f69bca0ac88f2494a376bc0fa9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 11:26:36 +0100 Subject: [PATCH 04/20] leftover from previous PR --- bench-vortex/benches/datafusion_benchmark.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index d7e157208d..7e2ce50e8f 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -177,7 +177,7 @@ fn bench_datafusion(c: &mut Criterion) { bench_vortex( c.benchmark_group("vortex-pushdown-compressed"), &SessionContext::new(), - false, + true, true, ); @@ -185,7 +185,7 @@ fn bench_datafusion(c: &mut Criterion) { bench_vortex( c.benchmark_group("vortex-pushdown-uncompressed"), &SessionContext::new(), - false, + true, false, ); @@ -193,7 +193,7 @@ fn bench_datafusion(c: &mut Criterion) { bench_vortex( c.benchmark_group("vortex-nopushdown-compressed"), &SessionContext::new(), - true, + false, true, ); @@ -201,7 +201,7 @@ fn bench_datafusion(c: &mut Criterion) { bench_vortex( c.benchmark_group("vortex-nopushdown-uncompressed"), &SessionContext::new(), - true, + false, false, ); } From 37bcf9c3797c8bc624b226a64e3bc14f3c5c1a0e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 14:00:21 +0100 Subject: [PATCH 05/20] more minor changes --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 6 ++ vortex-datafusion/src/lib.rs | 19 +++++- vortex-datafusion/src/memory.rs | 26 +++----- vortex-datafusion/src/persistent/execution.rs | 61 +++++++++++++++++++ vortex-datafusion/src/persistent/mod.rs | 16 +++++ .../{persistent.rs => persistent/table.rs} | 25 ++++++-- 7 files changed, 129 insertions(+), 25 deletions(-) create mode 100644 vortex-datafusion/src/persistent/execution.rs create mode 100644 vortex-datafusion/src/persistent/mod.rs rename vortex-datafusion/src/{persistent.rs => persistent/table.rs} (67%) diff --git a/Cargo.lock b/Cargo.lock index 289bf18460..1f8263d861 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4138,6 +4138,7 @@ dependencies = [ "vortex-error", "vortex-expr", "vortex-scalar", + "vortex-serde", ] [[package]] diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index abe626d007..b4214bb09a 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -10,11 +10,16 @@ include.workspace = true edition.workspace = true rust-version.workspace = true +[lib] +name = "vortex_datafusion" +path = "src/lib.rs" + [dependencies] vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } vortex-expr = { path = "../vortex-expr" } vortex-error = { path = "../vortex-error" } +vortex-serde = { path = "../vortex-serde" } vortex-scalar = { path = "../vortex-scalar", features = ["datafusion"] } arrow-array = { workspace = true } @@ -32,6 +37,7 @@ itertools = { workspace = true } lazy_static = { workspace = true } pin-project = { workspace = true } + [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index c2804f7aa6..fee7fdf6ca 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -18,10 +18,10 @@ use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult use datafusion_expr::{Expr, Operator}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; +use itertools::Itertools; use memory::{VortexMemTable, VortexMemTableOptions}; use vortex::array::chunked::ChunkedArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; -use vortex_dtype::DType; pub mod memory; pub mod persistent; @@ -81,7 +81,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult<()> { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -96,7 +96,7 @@ impl SessionContextExt for SessionContext { options: VortexMemTableOptions, ) -> DFResult { assert!( - matches!(array.dtype(), DType::Struct(_, _)), + array.dtype().is_struct(), "Vortex arrays must have struct type" ); @@ -119,6 +119,19 @@ fn can_be_pushed_down(expr: &Expr) -> bool { } } +fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection +} + /// Extract out the columns from our table referenced by the expression. fn get_column_references(expr: &Expr) -> HashSet { let mut references = HashSet::new(); diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index 636b35eb86..abe924c873 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -1,5 +1,4 @@ use std::any::Any; -use std::collections::HashSet; use std::sync::Arc; use arrow_schema::SchemaRef; @@ -18,7 +17,7 @@ use vortex_dtype::DType; use crate::datatype::infer_schema; use crate::plans::{RowSelectorExec, TakeRowsExec}; -use crate::{can_be_pushed_down, get_column_references, VortexScanExec}; +use crate::{can_be_pushed_down, get_filter_projection, VortexScanExec}; /// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. /// @@ -82,19 +81,6 @@ impl TableProvider for VortexMemTable { filters: &[Expr], _limit: Option, ) -> DFResult> { - fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { - let referenced_columns: HashSet = - exprs.iter().flat_map(get_column_references).collect(); - - let projection: Vec = referenced_columns - .iter() - .map(|col_name| schema.column_with_name(col_name).unwrap().0) - .sorted() - .collect(); - - projection - } - let filter_exprs = if filters.is_empty() { None } else { @@ -176,11 +162,19 @@ impl TableProvider for VortexMemTable { } /// Optional configurations to pass when loading a [VortexMemTable]. -#[derive(Default, Debug, Clone)] +#[derive(Debug, Clone)] pub struct VortexMemTableOptions { pub enable_pushdown: bool, } +impl Default for VortexMemTableOptions { + fn default() -> Self { + Self { + enable_pushdown: true, + } + } +} + impl VortexMemTableOptions { pub fn with_pushdown(mut self, enable_pushdown: bool) -> Self { self.enable_pushdown = enable_pushdown; diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs new file mode 100644 index 0000000000..9b72ba799d --- /dev/null +++ b/vortex-datafusion/src/persistent/execution.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; +use datafusion_common::Result as DFResult; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::{DisplayAs, ExecutionPlan, PlanProperties}; + +use crate::persistent::VortexFileOpener; + +#[derive(Debug)] +pub struct VortexExec { + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, +} + +impl DisplayAs for VortexExec { + fn fmt_as( + &self, + _t: datafusion_physical_plan::DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + todo!() + } +} + +impl ExecutionPlan for VortexExec { + fn name(&self) -> &str { + "VortexExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + todo!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + let opener = VortexFileOpener {}; + let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; + + Ok(Box::pin(stream)) + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs new file mode 100644 index 0000000000..492724539d --- /dev/null +++ b/vortex-datafusion/src/persistent/mod.rs @@ -0,0 +1,16 @@ +use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; +use datafusion_common::Result as DFResult; + +pub mod execution; +pub mod table; + +pub struct VortexFileOpener {} + +impl FileOpener for VortexFileOpener { + fn open(&self, _file_meta: FileMeta) -> DFResult { + todo!() + } +} + +#[cfg(test)] +mod tests {} diff --git a/vortex-datafusion/src/persistent.rs b/vortex-datafusion/src/persistent/table.rs similarity index 67% rename from vortex-datafusion/src/persistent.rs rename to vortex-datafusion/src/persistent/table.rs index 9ca7ef24ef..c70f8baf2e 100644 --- a/vortex-datafusion/src/persistent.rs +++ b/vortex-datafusion/src/persistent/table.rs @@ -3,13 +3,20 @@ use std::sync::Arc; use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion_common::{Result as DFResult, Statistics}; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; -pub struct VortexFileTableProvider {} +use super::VortexFileOpener; + +pub struct VortexFileTableProvider { + schema_ref: SchemaRef, + file_scan_config: FileScanConfig, +} #[async_trait] impl TableProvider for VortexFileTableProvider { @@ -18,11 +25,11 @@ impl TableProvider for VortexFileTableProvider { } fn schema(&self) -> SchemaRef { - todo!() + Arc::clone(&self.schema_ref) } fn table_type(&self) -> TableType { - todo!() + TableType::Base } async fn scan( @@ -32,6 +39,15 @@ impl TableProvider for VortexFileTableProvider { _filters: &[Expr], _limit: Option, ) -> DFResult> { + let opener = VortexFileOpener {}; + + let _stream = FileStream::new( + &self.file_scan_config, + 0, + opener, + &ExecutionPlanMetricsSet::new(), + )?; + todo!() } @@ -49,6 +65,3 @@ impl TableProvider for VortexFileTableProvider { None } } - -#[cfg(test)] -mod tests {} From 53675f1d2ebd1c42fd8c223accd214f3ec5a71ae Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 14:52:11 +0100 Subject: [PATCH 06/20] More object_store boilerplate --- vortex-datafusion/Cargo.toml | 2 +- vortex-datafusion/src/persistent/opener.rs | 18 +++++++++++++++--- vortex-datafusion/src/persistent/table.rs | 5 +---- vortex-serde/src/file/reader/filtering.rs | 2 +- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index d718027887..e8cd0c415e 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -19,7 +19,7 @@ vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } vortex-expr = { path = "../vortex-expr" } vortex-error = { path = "../vortex-error" } -vortex-serde = { path = "../vortex-serde" } +vortex-serde = { path = "../vortex-serde", features = ["object_store"] } vortex-scalar = { path = "../vortex-scalar", features = ["datafusion"] } arrow-array = { workspace = true } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d86a0df108..51831e497d 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -2,15 +2,27 @@ use std::sync::Arc; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; +use futures::FutureExt; use object_store::ObjectStore; +use vortex_serde::file::reader::VortexBatchReaderBuilder; +use vortex_serde::io::ObjectStoreReadAt; pub struct VortexFileOpener { object_store: Arc, } impl FileOpener for VortexFileOpener { - fn open(&self, _file_meta: FileMeta) -> DFResult { - // let object_store = - todo!() + fn open(&self, file_meta: FileMeta) -> DFResult { + let object_store = self.object_store.clone(); + Ok(async move { + let read_at = ObjectStoreReadAt::new(&object_store, file_meta.location()); + + let _reader = VortexBatchReaderBuilder::new(read_at) + .build() + .await + .unwrap(); + todo!() + } + .boxed()) } } diff --git a/vortex-datafusion/src/persistent/table.rs b/vortex-datafusion/src/persistent/table.rs index 7a9bc63809..60094c938a 100644 --- a/vortex-datafusion/src/persistent/table.rs +++ b/vortex-datafusion/src/persistent/table.rs @@ -3,16 +3,13 @@ use std::sync::Arc; use arrow_schema::SchemaRef; use async_trait::async_trait; -use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion_common::{Result as DFResult, Statistics}; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::ExecutionPlan; -use super::opener::VortexFileOpener; - #[allow(dead_code)] pub struct VortexFileTableProvider { schema_ref: SchemaRef, diff --git a/vortex-serde/src/file/reader/filtering.rs b/vortex-serde/src/file/reader/filtering.rs index d66cbf499e..e3ddc3347f 100644 --- a/vortex-serde/src/file/reader/filtering.rs +++ b/vortex-serde/src/file/reader/filtering.rs @@ -3,7 +3,7 @@ use vortex_error::VortexResult; use super::projections::Projection; -pub trait FilteringPredicate { +pub trait FilteringPredicate: Send + Sync { fn projection(&self) -> &Projection; fn evaluate(&mut self, array: &Array) -> VortexResult; } From a14291c47d711571ef08b53750f6393a0bfa3172 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 17:24:25 +0100 Subject: [PATCH 07/20] more work --- bench-vortex/src/reader.rs | 4 +-- vortex-datafusion/src/persistent/opener.rs | 41 ++++++++++++++++------ vortex-serde/src/io/object_store.rs | 23 ++++++------ 3 files changed, 43 insertions(+), 25 deletions(-) diff --git a/bench-vortex/src/reader.rs b/bench-vortex/src/reader.rs index bd00a4e9ee..79a9f06889 100644 --- a/bench-vortex/src/reader.rs +++ b/bench-vortex/src/reader.rs @@ -159,8 +159,8 @@ pub async fn read_vortex_footer_format( ) } -pub async fn take_vortex_object_store( - fs: &O, +pub async fn take_vortex_object_store( + fs: &Arc, path: &object_store::path::Path, indices: &[u64], ) -> VortexResult { diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 51831e497d..c9822a36da 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,28 +1,47 @@ use std::sync::Arc; +use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; +use arrow_schema::ArrowError; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; -use futures::FutureExt; +use futures::{FutureExt as _, TryStreamExt}; use object_store::ObjectStore; +use vortex::IntoCanonical; use vortex_serde::file::reader::VortexBatchReaderBuilder; use vortex_serde::io::ObjectStoreReadAt; pub struct VortexFileOpener { - object_store: Arc, + pub object_store: Arc, } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { let object_store = self.object_store.clone(); - Ok(async move { - let read_at = ObjectStoreReadAt::new(&object_store, file_meta.location()); + DFResult::Ok( + async move { + let read_at = ObjectStoreReadAt::new(object_store, file_meta.location().clone()); - let _reader = VortexBatchReaderBuilder::new(read_at) - .build() - .await - .unwrap(); - todo!() - } - .boxed()) + let reader = VortexBatchReaderBuilder::new(read_at) + .build() + .await + .unwrap(); + + let stream = reader + .map_ok(|a| { + RecordBatch::from( + a.into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ) + }) + .map_err(|e| ArrowError::from_external_error(Box::new(e))); + + DFResult::Ok(Box::pin(stream) as _) + } + .boxed(), + ) } } diff --git a/vortex-serde/src/io/object_store.rs b/vortex-serde/src/io/object_store.rs index cc03b16136..5306ffd7df 100644 --- a/vortex-serde/src/io/object_store.rs +++ b/vortex-serde/src/io/object_store.rs @@ -1,8 +1,7 @@ -#![cfg(feature = "object_store")] - use std::future::Future; use std::io::Cursor; use std::ops::Range; +use std::sync::Arc; use std::{io, mem}; use bytes::BytesMut; @@ -29,7 +28,7 @@ pub trait ObjectStoreExt { ) -> impl Future>; } -impl ObjectStoreExt for O { +impl ObjectStoreExt for Arc { async fn vortex_read( &self, location: &Path, @@ -40,7 +39,7 @@ impl ObjectStoreExt for O { } fn vortex_reader(&self, location: &Path) -> impl VortexReadAt { - ObjectStoreReadAt::new(self, location) + ObjectStoreReadAt::new(self.clone(), location.clone()) } async fn vortex_writer(&self, location: &Path) -> VortexResult { @@ -51,13 +50,13 @@ impl ObjectStoreExt for O { } } -pub struct ObjectStoreReadAt<'a, 'b, O: ObjectStore> { - object_store: &'a O, - location: &'b Path, +pub struct ObjectStoreReadAt { + object_store: Arc, + location: Path, } -impl<'a, 'b, O: ObjectStore> ObjectStoreReadAt<'a, 'b, O> { - pub fn new(object_store: &'a O, location: &'b Path) -> Self { +impl ObjectStoreReadAt { + pub fn new(object_store: Arc, location: Path) -> Self { Self { object_store, location, @@ -65,19 +64,19 @@ impl<'a, 'b, O: ObjectStore> ObjectStoreReadAt<'a, 'b, O> { } } -impl<'a, 'b, O: ObjectStore> VortexReadAt for ObjectStoreReadAt<'a, 'b, O> { +impl VortexReadAt for ObjectStoreReadAt { async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result { let start_range = pos as usize; let bytes = self .object_store - .get_range(self.location, start_range..(start_range + buffer.len())) + .get_range(&self.location, start_range..(start_range + buffer.len())) .await?; buffer.as_mut().copy_from_slice(bytes.as_ref()); Ok(buffer) } async fn size(&self) -> u64 { - self.object_store.head(self.location).await.unwrap().size as u64 + self.object_store.head(&self.location).await.unwrap().size as u64 } } From a2980feca85d249da951aee6fc2f450d864a1119 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 2 Aug 2024 18:45:10 +0100 Subject: [PATCH 08/20] some error handling --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 2 +- vortex-datafusion/src/persistent/execution.rs | 3 ++- vortex-datafusion/src/persistent/opener.rs | 27 ++++++++----------- vortex-error/Cargo.toml | 5 ++++ vortex-error/src/lib.rs | 17 ++++++++++++ 6 files changed, 37 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ece9fa309..7fc5db4fd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4194,6 +4194,7 @@ name = "vortex-error" version = "0.1.0" dependencies = [ "arrow-schema", + "datafusion-common", "flatbuffers", "flexbuffers", "object_store", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index e8cd0c415e..4e82915c7b 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } vortex-expr = { path = "../vortex-expr" } -vortex-error = { path = "../vortex-error" } +vortex-error = { path = "../vortex-error", features = ["datafusion"] } vortex-serde = { path = "../vortex-serde", features = ["object_store"] } vortex-scalar = { path = "../vortex-scalar", features = ["datafusion"] } diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 033fb69017..a47ec616f9 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::sync::Arc; use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; @@ -15,7 +16,7 @@ pub struct VortexExec { } impl DisplayAs for VortexExec { - fn fmt_as(&self, _t: DisplayFormatType, _f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt_as(&self, _t: DisplayFormatType, _f: &mut fmt::Formatter) -> fmt::Result { todo!() } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index c9822a36da..99cc322e75 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; -use arrow_schema::ArrowError; +use arrow_array::cast::as_struct_array; +use arrow_array::RecordBatch; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; use futures::{FutureExt as _, TryStreamExt}; @@ -21,23 +21,18 @@ impl FileOpener for VortexFileOpener { async move { let read_at = ObjectStoreReadAt::new(object_store, file_meta.location().clone()); - let reader = VortexBatchReaderBuilder::new(read_at) - .build() - .await - .unwrap(); + let reader = VortexBatchReaderBuilder::new(read_at).build().await?; let stream = reader - .map_ok(|a| { - RecordBatch::from( - a.into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ) + .map_ok(|array| { + let arrow = array + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(arrow.as_ref()); + RecordBatch::from(struct_array) }) - .map_err(|e| ArrowError::from_external_error(Box::new(e))); + .map_err(|e| e.into()); DFResult::Ok(Box::pin(stream) as _) } diff --git a/vortex-error/Cargo.toml b/vortex-error/Cargo.toml index c3df57a1bc..2a5d1a2000 100644 --- a/vortex-error/Cargo.toml +++ b/vortex-error/Cargo.toml @@ -15,8 +15,13 @@ rust-version = { workspace = true } name = "vortex_error" path = "src/lib.rs" + +[features] +datafusion = ["datafusion-common"] + [dependencies] arrow-schema = { workspace = true } +datafusion-common = { workspace = true, optional = true } flatbuffers = { workspace = true } flexbuffers = { workspace = true, optional = true } parquet = { workspace = true, optional = true } diff --git a/vortex-error/src/lib.rs b/vortex-error/src/lib.rs index 505563d3b1..f9d44e74fa 100644 --- a/vortex-error/src/lib.rs +++ b/vortex-error/src/lib.rs @@ -194,6 +194,23 @@ macro_rules! vortex_bail { }; } +#[cfg(feature = "datafusion")] +impl From for datafusion_common::DataFusionError { + fn from(value: VortexError) -> Self { + Self::External(Box::new(value)) + } +} + +#[cfg(feature = "datafusion")] +impl From for datafusion_common::arrow::error::ArrowError { + fn from(value: VortexError) -> Self { + match value { + VortexError::ArrowError(e) => e, + _ => Self::from_external_error(Box::new(value)), + } + } +} + // Not public, referenced by macros only. #[doc(hidden)] pub mod __private { From 3e532a59c81c8572ed9bd355507658aabaefe43f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 14:30:54 +0100 Subject: [PATCH 09/20] minimal working example --- Cargo.lock | 4 + vortex-datafusion/Cargo.toml | 11 ++- vortex-datafusion/examples/table_provider.rs | 86 +++++++++++++++++ vortex-datafusion/src/persistent/config.rs | 46 +++++++++ vortex-datafusion/src/persistent/execution.rs | 46 ++++++++- vortex-datafusion/src/persistent/mod.rs | 3 +- vortex-datafusion/src/persistent/opener.rs | 27 +++++- vortex-datafusion/src/persistent/provider.rs | 95 +++++++++++++++++++ vortex-datafusion/src/persistent/table.rs | 56 ----------- 9 files changed, 307 insertions(+), 67 deletions(-) create mode 100644 vortex-datafusion/examples/table_provider.rs create mode 100644 vortex-datafusion/src/persistent/config.rs create mode 100644 vortex-datafusion/src/persistent/provider.rs delete mode 100644 vortex-datafusion/src/persistent/table.rs diff --git a/Cargo.lock b/Cargo.lock index 7fc5db4fd6..6c6fa051d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4119,9 +4119,11 @@ dependencies = [ name = "vortex-datafusion" version = "0.1.0" dependencies = [ + "anyhow", "arrow-array", "arrow-schema", "async-trait", + "chrono", "datafusion", "datafusion-common", "datafusion-execution", @@ -4133,7 +4135,9 @@ dependencies = [ "lazy_static", "object_store", "pin-project", + "tempfile", "tokio", + "url", "vortex-array", "vortex-dtype", "vortex-error", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 4e82915c7b..0d0cea868a 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -26,6 +26,7 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } @@ -40,7 +41,11 @@ pin-project = { workspace = true } [dev-dependencies] -tokio = { workspace = true, features = ["test-util"] } +anyhow = { workspace = true } +url = "2" +tempfile = "3" +tokio = { workspace = true, features = ["test-util", "rt-multi-thread"] } -[lints] -workspace = true + +# [lints] +# workspace = true diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs new file mode 100644 index 0000000000..7ac20fa7d0 --- /dev/null +++ b/vortex-datafusion/examples/table_provider.rs @@ -0,0 +1,86 @@ +use std::env::temp_dir; +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use datafusion::prelude::SessionContext; +use datafusion_execution::object_store::ObjectStoreUrl; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +use object_store::ObjectStore; +use tokio::fs::OpenOptions; +use url::Url; +use vortex::array::chunked::ChunkedArray; +use vortex::array::primitive::PrimitiveArray; +use vortex::array::struct_::StructArray; +use vortex::array::varbin::VarBinArray; +use vortex::validity::Validity; +use vortex::IntoArray; +use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig}; +use vortex_datafusion::persistent::provider::VortexFileTableProvider; +use vortex_serde::file::file_writer::FileWriter; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let tmp_path = temp_dir(); + let strings = ChunkedArray::from_iter([ + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), + ]) + .into_array(); + + let numbers = ChunkedArray::from_iter([ + PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(), + PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(), + ]) + .into_array(); + + let st = StructArray::try_new( + ["strings".into(), "numbers".into()].into(), + vec![strings, numbers], + 8, + Validity::NonNullable, + ) + .unwrap(); + + let f = OpenOptions::new() + .write(true) + .truncate(true) + .open(tmp_path.join("a.vtx")) + .await?; + + let writer = FileWriter::new(f); + let writer = writer.write_array_columns(st.into_array()).await?; + writer.finalize().await?; + + let f = tokio::fs::File::open(tmp_path.join("a.vtx")).await?; + let file_size = f.metadata().await?.len(); + + let object_store: Arc = Arc::new(LocalFileSystem::new()); + let url = ObjectStoreUrl::local_filesystem(); + + let p = Path::from_filesystem_path(tmp_path.join("a.vtx"))?; + + let config = VortexTableConfig::new( + Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8, false), + Field::new("numbers", DataType::UInt32, false), + ])), + vec![VortexFile::new(p, file_size)], + ); + + let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?); + + let ctx = SessionContext::new(); + ctx.register_table("vortex_tbl", Arc::clone(&provider) as _)?; + + let url = Url::try_from("file://").unwrap(); + ctx.register_object_store(&url, object_store); + + ctx.sql("SELECT * from vortex_tbl").await?.show().await?; + ctx.sql("SELECT * from vortex_tbl where numbers % 2 == 0") + .await? + .show() + .await?; + + Ok(()) +} diff --git a/vortex-datafusion/src/persistent/config.rs b/vortex-datafusion/src/persistent/config.rs new file mode 100644 index 0000000000..49ad782b3f --- /dev/null +++ b/vortex-datafusion/src/persistent/config.rs @@ -0,0 +1,46 @@ +use arrow_schema::SchemaRef; +use chrono::TimeZone as _; +use datafusion::datasource::listing::PartitionedFile; +use object_store::path::Path; +use object_store::ObjectMeta; + +pub struct Placeholder; + +#[derive(Clone)] +pub struct VortexFile { + pub(crate) object_meta: ObjectMeta, +} + +impl From for PartitionedFile { + fn from(value: VortexFile) -> Self { + PartitionedFile::new(value.object_meta.location, value.object_meta.size as u64) + } +} + +impl VortexFile { + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + } + } +} + +pub struct VortexTableConfig { + pub(crate) data_files: Vec, + pub(crate) schema: Option, +} + +impl VortexTableConfig { + pub fn new(schema: SchemaRef, data_files: Vec) -> Self { + Self { + data_files, + schema: Some(schema), + } + } +} diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index a47ec616f9..98eb519db1 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -4,15 +4,50 @@ use std::sync::Arc; use datafusion::datasource::physical_plan::{FileScanConfig, FileStream}; use datafusion_common::Result as DFResult; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; use crate::persistent::opener::VortexFileOpener; +#[allow(dead_code)] #[derive(Debug)] pub struct VortexExec { file_scan_config: FileScanConfig, metrics: ExecutionPlanMetricsSet, + predicate: Option>, + plan_properties: PlanProperties, + projection: Option>, +} + +impl VortexExec { + pub fn try_new( + file_scan_config: FileScanConfig, + metrics: ExecutionPlanMetricsSet, + projection: Option<&Vec>, + predicate: Option>, + ) -> DFResult { + let partitioning = Partitioning::UnknownPartitioning(1); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(file_scan_config.file_schema.clone()), + partitioning, + ExecutionMode::Bounded, + ); + let projection = projection.cloned(); + + Ok(Self { + file_scan_config, + metrics, + predicate, + projection, + plan_properties, + }) + } + pub(crate) fn into_arc(self) -> Arc { + Arc::new(self) as _ + } } impl DisplayAs for VortexExec { @@ -31,7 +66,7 @@ impl ExecutionPlan for VortexExec { } fn properties(&self) -> &PlanProperties { - todo!() + &self.plan_properties } fn children(&self) -> Vec<&Arc> { @@ -53,7 +88,12 @@ impl ExecutionPlan for VortexExec { let object_store = context .runtime_env() .object_store(&self.file_scan_config.object_store_url)?; - let opener = VortexFileOpener { object_store }; + let opener = VortexFileOpener { + object_store, + projection: self.projection.clone(), + batch_size: None, + predicate: None, + }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; Ok(Box::pin(stream)) diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 94acce4fde..bed3bd9d43 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -1,3 +1,4 @@ +pub mod config; pub mod execution; pub mod opener; -pub mod table; +pub mod provider; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 99cc322e75..f63272b39d 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -4,24 +4,43 @@ use arrow_array::cast::as_struct_array; use arrow_array::RecordBatch; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; +use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; use object_store::ObjectStore; use vortex::IntoCanonical; +use vortex_serde::file::reader::projections::Projection; use vortex_serde::file::reader::VortexBatchReaderBuilder; use vortex_serde::io::ObjectStoreReadAt; pub struct VortexFileOpener { pub object_store: Arc, + pub batch_size: Option, + pub projection: Option>, + pub predicate: Option>, } impl FileOpener for VortexFileOpener { fn open(&self, file_meta: FileMeta) -> DFResult { - let object_store = self.object_store.clone(); + let read_at = + ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); + + let mut builder = VortexBatchReaderBuilder::new(read_at); + + if let Some(batch_size) = self.batch_size { + builder = builder.with_batch_size(batch_size); + } + + if let Some(_predicate) = self.predicate.as_ref() { + unimplemented!("Missing logic to turn a physical expression into a RowFilter"); + } + + if let Some(projection) = self.projection.as_ref() { + builder = builder.with_projection(Projection::new(projection)) + } + DFResult::Ok( async move { - let read_at = ObjectStoreReadAt::new(object_store, file_meta.location().clone()); - - let reader = VortexBatchReaderBuilder::new(read_at).build().await?; + let reader = builder.build().await?; let stream = reader .map_ok(|array| { diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs new file mode 100644 index 0000000000..b8fd0e187e --- /dev/null +++ b/vortex-datafusion/src/persistent/provider.rs @@ -0,0 +1,95 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion_common::{DFSchema, Result as DFResult, Statistics}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::utils::conjunction; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::ExecutionPlan; + +use super::config::VortexTableConfig; +use crate::persistent::execution::VortexExec; + +#[allow(dead_code)] +pub struct VortexFileTableProvider { + schema_ref: SchemaRef, + object_store_url: ObjectStoreUrl, + config: VortexTableConfig, +} + +impl VortexFileTableProvider { + pub fn try_new(object_store_url: ObjectStoreUrl, config: VortexTableConfig) -> DFResult { + Ok(Self { + schema_ref: config.schema.clone().unwrap(), + object_store_url, + config, + }) + } +} + +#[async_trait] +impl TableProvider for VortexFileTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let df_schema = DFSchema::try_from(self.schema())?; + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()?; + + let metrics = ExecutionPlanMetricsSet::new(); + + // TODO: Point at some files and/or ranges + let file_scan_config = FileScanConfig::new(self.object_store_url.clone(), self.schema()) + .with_file_group( + self.config + .data_files + .iter() + .cloned() + .map(|f| f.into()) + .collect(), + ); + + let exec = + VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc(); + + Ok(exec) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![ + TableProviderFilterPushDown::Unsupported; + filters.len() + ]) + } + + fn statistics(&self) -> Option { + None + } +} diff --git a/vortex-datafusion/src/persistent/table.rs b/vortex-datafusion/src/persistent/table.rs deleted file mode 100644 index 60094c938a..0000000000 --- a/vortex-datafusion/src/persistent/table.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::any::Any; -use std::sync::Arc; - -use arrow_schema::SchemaRef; -use async_trait::async_trait; -use datafusion::datasource::physical_plan::FileScanConfig; -use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; -use datafusion_common::{Result as DFResult, Statistics}; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; -use datafusion_physical_plan::ExecutionPlan; - -#[allow(dead_code)] -pub struct VortexFileTableProvider { - schema_ref: SchemaRef, - file_scan_config: FileScanConfig, -} - -#[async_trait] -impl TableProvider for VortexFileTableProvider { - fn as_any(&self) -> &dyn Any { - todo!() - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema_ref) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - _state: &SessionState, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> DFResult> { - todo!() - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> DFResult> { - Ok(vec![ - TableProviderFilterPushDown::Unsupported; - filters.len() - ]) - } - - fn statistics(&self) -> Option { - None - } -} From f81b111edf2b9842d9effd9edb82e0977e578f61 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 14:34:32 +0100 Subject: [PATCH 10/20] small change --- vortex-datafusion/src/persistent/config.rs | 2 -- vortex-datafusion/src/persistent/execution.rs | 5 ++--- vortex-datafusion/src/persistent/provider.rs | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/persistent/config.rs b/vortex-datafusion/src/persistent/config.rs index 49ad782b3f..f60e7ed943 100644 --- a/vortex-datafusion/src/persistent/config.rs +++ b/vortex-datafusion/src/persistent/config.rs @@ -4,8 +4,6 @@ use datafusion::datasource::listing::PartitionedFile; use object_store::path::Path; use object_store::ObjectMeta; -pub struct Placeholder; - #[derive(Clone)] pub struct VortexFile { pub(crate) object_meta: ObjectMeta, diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 98eb519db1..ce4f353be8 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -12,12 +12,11 @@ use datafusion_physical_plan::{ use crate::persistent::opener::VortexFileOpener; -#[allow(dead_code)] #[derive(Debug)] pub struct VortexExec { file_scan_config: FileScanConfig, metrics: ExecutionPlanMetricsSet, - predicate: Option>, + _predicate: Option>, plan_properties: PlanProperties, projection: Option>, } @@ -40,7 +39,7 @@ impl VortexExec { Ok(Self { file_scan_config, metrics, - predicate, + _predicate: predicate, projection, plan_properties, }) diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index b8fd0e187e..c1339788fd 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -16,7 +16,6 @@ use datafusion_physical_plan::ExecutionPlan; use super::config::VortexTableConfig; use crate::persistent::execution::VortexExec; -#[allow(dead_code)] pub struct VortexFileTableProvider { schema_ref: SchemaRef, object_store_url: ObjectStoreUrl, From 877b48a9eab11a035a4f13747c96968a6234d1df Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 14:54:19 +0100 Subject: [PATCH 11/20] Some more minimal things --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/examples/table_provider.rs | 6 +++++- vortex-datafusion/src/persistent/execution.rs | 6 +++--- vortex-datafusion/src/persistent/opener.rs | 2 +- vortex-datafusion/src/persistent/provider.rs | 8 +++----- 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7f925f6e2..dce4c091a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4179,6 +4179,7 @@ dependencies = [ "futures", "itertools 0.13.0", "lazy_static", + "log", "object_store", "pin-project", "tempfile", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 0d0cea868a..1961b03748 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -36,6 +36,7 @@ datafusion-physical-plan = { workspace = true } futures = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } +log = { workspace = true } object_store = { workspace = true } pin-project = { workspace = true } diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index 7ac20fa7d0..dd34ae876c 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -76,7 +76,11 @@ async fn main() -> anyhow::Result<()> { let url = Url::try_from("file://").unwrap(); ctx.register_object_store(&url, object_store); - ctx.sql("SELECT * from vortex_tbl").await?.show().await?; + ctx.sql("SELECT numbers, strings from vortex_tbl") + .await? + .show() + .await?; + ctx.sql("SELECT * from vortex_tbl where numbers % 2 == 0") .await? .show() diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index ce4f353be8..32c19ef1e2 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -16,7 +16,7 @@ use crate::persistent::opener::VortexFileOpener; pub struct VortexExec { file_scan_config: FileScanConfig, metrics: ExecutionPlanMetricsSet, - _predicate: Option>, + predicate: Option>, plan_properties: PlanProperties, projection: Option>, } @@ -39,7 +39,7 @@ impl VortexExec { Ok(Self { file_scan_config, metrics, - _predicate: predicate, + predicate, projection, plan_properties, }) @@ -91,7 +91,7 @@ impl ExecutionPlan for VortexExec { object_store, projection: self.projection.clone(), batch_size: None, - predicate: None, + predicate: self.predicate.clone(), }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index f63272b39d..a168638090 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -31,7 +31,7 @@ impl FileOpener for VortexFileOpener { } if let Some(_predicate) = self.predicate.as_ref() { - unimplemented!("Missing logic to turn a physical expression into a RowFilter"); + log::warn!("Missing logic to turn a physical expression into a RowFilter"); } if let Some(projection) = self.projection.as_ref() { diff --git a/vortex-datafusion/src/persistent/provider.rs b/vortex-datafusion/src/persistent/provider.rs index c1339788fd..460d350803 100644 --- a/vortex-datafusion/src/persistent/provider.rs +++ b/vortex-datafusion/src/persistent/provider.rs @@ -70,7 +70,8 @@ impl TableProvider for VortexFileTableProvider { .cloned() .map(|f| f.into()) .collect(), - ); + ) + .with_projection(projection.cloned()); let exec = VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc(); @@ -82,10 +83,7 @@ impl TableProvider for VortexFileTableProvider { &self, filters: &[&Expr], ) -> DFResult> { - Ok(vec![ - TableProviderFilterPushDown::Unsupported; - filters.len() - ]) + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } fn statistics(&self) -> Option { From c92bfdc7ad22bd56a6c577b1566778adea9259d1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 15:03:42 +0100 Subject: [PATCH 12/20] fmt_as --- vortex-datafusion/examples/table_provider.rs | 18 +++++++++++++----- vortex-datafusion/src/persistent/execution.rs | 7 +++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index dd34ae876c..69101486fe 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -76,12 +76,20 @@ async fn main() -> anyhow::Result<()> { let url = Url::try_from("file://").unwrap(); ctx.register_object_store(&url, object_store); - ctx.sql("SELECT numbers, strings from vortex_tbl") - .await? - .show() - .await?; + run_query( + &ctx, + "SELECT strings from vortex_tbl where numbers % 2 == 0", + ) + .await?; + + Ok(()) +} + +async fn run_query(ctx: &SessionContext, query_string: impl AsRef) -> anyhow::Result<()> { + let query_string = query_string.as_ref(); + ctx.sql(query_string).await?.show().await?; - ctx.sql("SELECT * from vortex_tbl where numbers % 2 == 0") + ctx.sql(&format!("EXPLAIN {query_string}")) .await? .show() .await?; diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 32c19ef1e2..51f8f84003 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -50,8 +50,11 @@ impl VortexExec { } impl DisplayAs for VortexExec { - fn fmt_as(&self, _t: DisplayFormatType, _f: &mut fmt::Formatter) -> fmt::Result { - todo!() + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "VortexExec: ")?; + self.file_scan_config.fmt_as(t, f)?; + + Ok(()) } } From 81467dae6e6ac49e01ab42c52adfce685b5b7401 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 15:05:45 +0100 Subject: [PATCH 13/20] re-enable clippy --- vortex-datafusion/Cargo.toml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 1961b03748..ff3ca59603 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -47,6 +47,5 @@ url = "2" tempfile = "3" tokio = { workspace = true, features = ["test-util", "rt-multi-thread"] } - -# [lints] -# workspace = true +[lints] +workspace = true From 5b402f1491f91553a491174c7706fb54a775eb4e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 15:12:00 +0100 Subject: [PATCH 14/20] fix code I broke somewhere else --- bench-vortex/benches/random_access.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bench-vortex/benches/random_access.rs b/bench-vortex/benches/random_access.rs index 7d856ecc26..3d69ad9c2c 100644 --- a/bench-vortex/benches/random_access.rs +++ b/bench-vortex/benches/random_access.rs @@ -8,6 +8,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use mimalloc::MiMalloc; use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; +use object_store::ObjectStore; use tokio::runtime::Runtime; #[global_allocator] @@ -31,7 +32,7 @@ fn random_access_vortex(c: &mut Criterion) { .iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) }) }); - let local_fs = LocalFileSystem::new(); + let local_fs = Arc::new(LocalFileSystem::new()) as Arc; let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap(); group.bench_function("localfs", |b| { b.to_async(Runtime::new().unwrap()).iter(|| async { @@ -43,7 +44,7 @@ fn random_access_vortex(c: &mut Criterion) { }) }); - let r2_fs = AmazonS3Builder::from_env().build().unwrap(); + let r2_fs = Arc::new(AmazonS3Builder::from_env().build().unwrap()) as Arc; let r2_path = object_store::path::Path::from_url_path(taxi_vortex.file_name().unwrap().to_str().unwrap()) .unwrap(); From 3232edcbed620878f54093763bce971919892236 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 15:31:25 +0100 Subject: [PATCH 15/20] Push errors to datafusion --- vortex-array/src/array/struct_/mod.rs | 15 +++++++++------ vortex-datafusion/examples/table_provider.rs | 9 +++------ vortex-serde/src/file/reader/mod.rs | 19 +++++++++++-------- 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 8893ee0c29..a1958bbae3 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{DType, FieldName, FieldNames, Nullability, StructDType}; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::stats::{ArrayStatisticsCompute, StatsSet}; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -97,12 +97,12 @@ impl StructArray { let mut children = Vec::with_capacity(projection.len()); let mut names = Vec::with_capacity(projection.len()); - for column_idx in projection { + for &column_idx in projection { children.push( - self.field(*column_idx) - .expect("column must not exceed bounds"), + self.field(column_idx) + .ok_or(vortex_err!(OutOfBounds: column_idx, 0, self.dtypes().len()))?, ); - names.push(self.names()[*column_idx].clone()); + names.push(self.names()[column_idx].clone()); } StructArray::try_new( @@ -124,7 +124,10 @@ impl ArrayVariants for StructArray { impl StructArrayTrait for StructArray { fn field(&self, idx: usize) -> Option { - self.array().child(idx, &self.dtypes()[idx], self.len()) + let dtype = self.dtypes().get(idx); + dtype + .map(|dtype| self.array().child(idx, dtype, self.len())) + .flatten() } } diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index 69101486fe..036e077360 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -76,23 +76,20 @@ async fn main() -> anyhow::Result<()> { let url = Url::try_from("file://").unwrap(); ctx.register_object_store(&url, object_store); - run_query( - &ctx, - "SELECT strings from vortex_tbl where numbers % 2 == 0", - ) - .await?; + run_query(&ctx, "SELECT * FROM vortex_tbl").await?; Ok(()) } async fn run_query(ctx: &SessionContext, query_string: impl AsRef) -> anyhow::Result<()> { let query_string = query_string.as_ref(); - ctx.sql(query_string).await?.show().await?; ctx.sql(&format!("EXPLAIN {query_string}")) .await? .show() .await?; + ctx.sql(query_string).await?.show().await?; + Ok(()) } diff --git a/vortex-serde/src/file/reader/mod.rs b/vortex-serde/src/file/reader/mod.rs index f6a4905bdf..618e3ee06f 100644 --- a/vortex-serde/src/file/reader/mod.rs +++ b/vortex-serde/src/file/reader/mod.rs @@ -299,16 +299,19 @@ impl Stream for VortexBatchStream { batch = filter(&batch, ¤t_predicate)?; - let projected = match &self.projection { - Projection::All => batch, - Projection::Partial(indices) => StructArray::try_from(batch.clone()) - .unwrap() - .project(indices.as_ref()) - .unwrap() - .into_array(), + let projected = { + let array = match &self.projection { + Projection::All => batch, + Projection::Partial(indices) => { + StructArray::try_from(batch.clone())? + .project(indices.as_ref())? + .into_array() + } + }; + Ok(array) }; - return Poll::Ready(Some(Ok(projected))); + return Poll::Ready(Some(projected)); } None => { From c5a2c927023eacf405dd5bbd55e5776305c09135 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 16:01:40 +0100 Subject: [PATCH 16/20] . --- vortex-array/src/array/struct_/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index a1958bbae3..c8c04406d9 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -124,10 +124,9 @@ impl ArrayVariants for StructArray { impl StructArrayTrait for StructArray { fn field(&self, idx: usize) -> Option { - let dtype = self.dtypes().get(idx); - dtype - .map(|dtype| self.array().child(idx, dtype, self.len())) - .flatten() + self.dtypes() + .get(idx) + .and_then(|dtype| self.array().child(idx, dtype, self.len())) } } From 4199c344324f015feb8c9469fb996fbcef14f4d2 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 16:10:49 +0100 Subject: [PATCH 17/20] make example more reproducable --- vortex-datafusion/examples/table_provider.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index 036e077360..88e4aeee3e 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -1,4 +1,3 @@ -use std::env::temp_dir; use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; @@ -7,6 +6,7 @@ use datafusion_execution::object_store::ObjectStoreUrl; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectStore; +use tempfile::tempdir; use tokio::fs::OpenOptions; use url::Url; use vortex::array::chunked::ChunkedArray; @@ -21,7 +21,7 @@ use vortex_serde::file::file_writer::FileWriter; #[tokio::main] async fn main() -> anyhow::Result<()> { - let tmp_path = temp_dir(); + let temp_dir = tempdir()?; let strings = ChunkedArray::from_iter([ VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), @@ -42,23 +42,26 @@ async fn main() -> anyhow::Result<()> { ) .unwrap(); + let filepath = temp_dir.path().join("a.vtx"); + let f = OpenOptions::new() .write(true) .truncate(true) - .open(tmp_path.join("a.vtx")) + .create(true) + .open(&filepath) .await?; let writer = FileWriter::new(f); let writer = writer.write_array_columns(st.into_array()).await?; writer.finalize().await?; - let f = tokio::fs::File::open(tmp_path.join("a.vtx")).await?; + let f = tokio::fs::File::open(&filepath).await?; let file_size = f.metadata().await?.len(); let object_store: Arc = Arc::new(LocalFileSystem::new()); let url = ObjectStoreUrl::local_filesystem(); - let p = Path::from_filesystem_path(tmp_path.join("a.vtx"))?; + let p = Path::from_filesystem_path(filepath)?; let config = VortexTableConfig::new( Arc::new(Schema::new(vec![ From d86336a8da6fb7fd2b2328416ab285f7917f45ec Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 17:43:30 +0100 Subject: [PATCH 18/20] . --- Cargo.toml | 2 ++ vortex-datafusion/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6576e6d89..af4feacf44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -114,6 +114,8 @@ walkdir = "2.5.0" worker = "0.3.0" xshell = "0.2.6" zigzag = "0.1.0" +url = "2" +tempfile = "3" [workspace.lints.rust] warnings = "deny" diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index ff3ca59603..07c628f55c 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -43,8 +43,8 @@ pin-project = { workspace = true } [dev-dependencies] anyhow = { workspace = true } -url = "2" -tempfile = "3" +url = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = ["test-util", "rt-multi-thread"] } [lints] From 2a1f9bfc6bd9e948b4bcb0e3189c9d2794211c48 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 5 Aug 2024 17:50:02 +0100 Subject: [PATCH 19/20] . --- vortex-datafusion/src/persistent/opener.rs | 36 ++++++++++------------ 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a168638090..1ce621e026 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -38,24 +38,22 @@ impl FileOpener for VortexFileOpener { builder = builder.with_projection(Projection::new(projection)) } - DFResult::Ok( - async move { - let reader = builder.build().await?; - - let stream = reader - .map_ok(|array| { - let arrow = array - .into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow(); - let struct_array = as_struct_array(arrow.as_ref()); - RecordBatch::from(struct_array) - }) - .map_err(|e| e.into()); - - DFResult::Ok(Box::pin(stream) as _) - } - .boxed(), - ) + Ok(async move { + let reader = builder.build().await?; + + let stream = reader + .map_ok(|array| { + let arrow = array + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow(); + let struct_array = as_struct_array(arrow.as_ref()); + RecordBatch::from(struct_array) + }) + .map_err(|e| e.into()); + + Ok(Box::pin(stream) as _) + } + .boxed()) } } From 70c989036dc29b8c6fee49d749c4bfd46fd88b78 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 6 Aug 2024 10:53:48 +0100 Subject: [PATCH 20/20] . --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bee65dae70..3c19d7289e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,8 @@ members = [ "encodings/*", "pyvortex", "vortex-array", - "vortex-build", "vortex-buffer", + "vortex-build", "vortex-datafusion", "vortex-dtype", "vortex-error",