diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 6c774c35cd..2e0daf3159 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -11,7 +11,6 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::compute::cast; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Expr; @@ -25,7 +24,7 @@ use pin_project::pin_project; use vortex::array::struct_::StructArray; use vortex::arrow::FromArrowArray; use vortex::compute::take::take; -use vortex::{ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; use crate::datatype::infer_schema; use crate::expr::{make_conjunction, simplify_expr}; @@ -120,10 +119,7 @@ impl ExecutionPlan for RowSelectorExec { let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); let filter_struct = self.filter_struct.clone(); - let inner = Box::pin(async move { - let arrow_array = filter_struct.into_canonical().unwrap().into_arrow(); - Ok(RecordBatch::from(arrow_array.as_struct())) - }); + let one_shot = Box::pin(async move { filter_struct.into_array() }); let conjunction_expr = simplify_expr( &make_conjunction(&self.filter_exprs)?, @@ -131,7 +127,7 @@ impl ExecutionPlan for RowSelectorExec { )?; Ok(Box::pin(RowIndicesStream { - inner, + one_shot, polled_inner: false, conjunction_expr, schema_ref: stream_schema, @@ -144,8 +140,9 @@ impl ExecutionPlan for RowSelectorExec { #[pin_project::pin_project] pub(crate) struct RowIndicesStream { /// The inner future that returns `DFResult`. + /// This future should only poll one time. #[pin] - inner: F, + one_shot: F, polled_inner: bool, @@ -156,14 +153,14 @@ pub(crate) struct RowIndicesStream { impl Stream for RowIndicesStream where - F: Future>, + F: Future, { type Item = DFResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - // If we have already polled the one-shot future with the filter records, indicate + // If we have already polled the one-shot future of filter records, indicate // that the stream has finished. if *this.polled_inner { return Poll::Ready(None); @@ -172,13 +169,20 @@ where // Get the unfiltered record batch. // Since this is a one-shot, we only want to poll the inner future once, to create the // initial batch for us to process. - // - // We want to avoid ever calling it again. - let record_batch = ready!(this.inner.poll(cx))?; + let vortex_struct = ready!(this.one_shot.poll(cx)); *this.polled_inner = true; - // Using a local SessionContext, generate a physical plan to execute the conjunction query - // against the filter columns. + // Immediately convert to Arrow RecordBatch for processing. + // TODO(aduffy): attempt to pushdown the filter to Vortex without decoding. + let record_batch = RecordBatch::from( + vortex_struct + .into_canonical() + .unwrap() + .into_arrow() + .as_struct(), + ); + + // Generate a physical plan to execute the conjunction query against the filter columns. // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. @@ -207,7 +211,7 @@ where impl RecordBatchStream for RowIndicesStream where - F: Future>, + F: Future, { fn schema(&self) -> SchemaRef { self.schema_ref.clone() @@ -238,14 +242,13 @@ impl TakeRowsExec { row_indices: Arc, table: &StructArray, ) -> Self { + let output_schema = Arc::new(schema_ref.project(projection).unwrap()); let plan_properties = PlanProperties::new( - EquivalenceProperties::new(schema_ref.clone()), + EquivalenceProperties::new(output_schema.clone()), Partitioning::RoundRobinBatch(1), ExecutionMode::Bounded, ); - let output_schema = Arc::new(schema_ref.project(projection).unwrap()); - Self { plan_properties, projection: projection.to_owned(), @@ -373,24 +376,18 @@ where .unwrap()))); } - // Assemble the output columns using the row indices. - // NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary. - let mut columns = Vec::new(); - for (output_idx, src_idx) in this.output_projection.iter().enumerate() { - let encoded = this.vortex_array.field(*src_idx).expect("field access"); - let decoded = take(&encoded, &row_indices) - .expect("take") - .into_canonical() - .expect("into_canonical") - .into_arrow(); - let data_type = this.output_schema.field(output_idx).data_type(); - - columns.push(cast(&decoded, data_type).expect("cast")); - } + // TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful. + // We should find a way to avoid decoding the filter columns and only decode the other + // columns, then stitch the StructArray back together from those. + let projected_for_output = this.vortex_array.project(this.output_projection).unwrap(); + let decoded = take(&projected_for_output.into_array(), &row_indices) + .expect("take") + .into_canonical() + .expect("into_canonical") + .into_arrow(); // Send back a single record batch of the decoded data. - let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns) - .expect("RecordBatch::try_new"); + let output_batch = RecordBatch::from(decoded.as_struct()); Poll::Ready(Some(Ok(output_batch))) } @@ -409,10 +406,16 @@ where mod test { use std::sync::Arc; - use arrow_array::{BooleanArray, RecordBatch, UInt64Array}; + use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_expr::{and, col, lit}; use itertools::Itertools; + use vortex::array::bool::BoolArray; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::validity::Validity; + use vortex::IntoArray; + use vortex_dtype::FieldName; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; @@ -424,20 +427,23 @@ mod test { ])); let _schema = schema.clone(); - let inner = Box::pin(async move { - Ok(RecordBatch::try_new( - _schema, + let one_shot = Box::pin(async move { + StructArray::try_new( + Arc::new([FieldName::from("a"), FieldName::from("b")]), vec![ - Arc::new(UInt64Array::from(vec![0u64, 1, 2])), - Arc::new(BooleanArray::from(vec![false, false, true])), + PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), + BoolArray::from(vec![false, false, true]).into_array(), ], + 3, + Validity::NonNullable, ) - .unwrap()) + .unwrap() + .into_array() }); let _schema = schema.clone(); let filtering_stream = RowIndicesStream { - inner, + one_shot, polled_inner: false, conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), schema_ref: _schema,