diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 318be778cb..d9c46232ac 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -55,10 +55,13 @@ async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> { #[tokio::main(flavor = "current_thread")] async fn main() { + // uncomment the below to enable trace logging of datafusion execution + // setup_logger(LevelFilter::Trace); + // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); - // q1_csv(&data_dir).await.unwrap(); + q1_csv(&data_dir).await.unwrap(); q1_arrow(&data_dir).await.unwrap(); - // q1_vortex(&data_dir).await.unwrap(); + q1_vortex(&data_dir).await.unwrap(); } diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index ed439572cb..473905c433 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -7,7 +7,7 @@ use datafusion::datasource::MemTable; use datafusion::prelude::{CsvReadOptions, SessionContext}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; -use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayData, IntoArray}; use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; pub mod dbgen; @@ -146,17 +146,11 @@ async fn register_vortex( .collect(); let dtype = chunks[0].dtype().clone(); - let chunked_array = ChunkedArray::try_new(chunks, dtype)? - .into_canonical()? - .into_array(); - - // Convert to Arrow to concat all chunks, then flip back to Vortex for processing. - let arrow = chunked_array.into_canonical()?.into_arrow(); - let array = ArrayData::from_arrow(arrow, false).into_array(); + let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array(); session.register_vortex_opts( name, - array, + chunked_array, VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown), )?; diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 8670340d05..a61fd18c02 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -294,7 +294,7 @@ fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayR .to_null_buffer() .expect("null buffer"); let timestamps_len = timestamps.len(); - let buffer = ScalarBuffer::::new(timestamps.into_buffer().into(), 0, timestamps_len); + let buffer = ScalarBuffer::::new(timestamps.into_buffer().into_arrow(), 0, timestamps_len); match local_date_time_array.time_unit() { TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 1e2da7eacc..cb3c8a75fc 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -22,11 +22,9 @@ use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; -use futures::{Stream, StreamExt}; +use futures::Stream; use itertools::Itertools; -use pin_project::pin_project; use vortex::array::chunked::ChunkedArray; -use vortex::array::struct_::StructArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; @@ -109,7 +107,7 @@ impl SessionContextExt for SessionContext { /// a table to DataFusion. #[derive(Debug, Clone)] pub struct VortexMemTable { - array: Array, + array: ChunkedArray, schema_ref: SchemaRef, options: VortexMemTableOptions, } @@ -124,6 +122,14 @@ impl VortexMemTable { let arrow_schema = infer_schema(array.dtype()); let schema_ref = SchemaRef::new(arrow_schema); + let array = match ChunkedArray::try_from(&array) { + Ok(a) => a, + _ => { + let dtype = array.dtype().clone(); + ChunkedArray::try_new(vec![array], dtype).unwrap() + } + }; + Self { array, schema_ref, @@ -176,12 +182,6 @@ impl TableProvider for VortexMemTable { Some(filters) }; - let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { - Partitioning::RoundRobinBatch(chunked_array.nchunks()) - } else { - Partitioning::UnknownPartitioning(1) - }; - let output_projection: Vec = match projection { None => (0..self.schema_ref.fields().len()).collect(), Some(proj) => proj.clone(), @@ -215,7 +215,9 @@ impl TableProvider for VortexMemTable { ); let plan_properties = PlanProperties::new( EquivalenceProperties::new(output_schema), - partitioning, + // non-pushdown scans execute in single partition, where the partition + // yields one RecordBatch per chunk in the input ChunkedArray + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -265,23 +267,21 @@ fn make_filter_then_take_plan( schema: SchemaRef, filter_exprs: &[Expr], filter_projection: Vec, - array: Array, + chunked_array: ChunkedArray, output_projection: Vec, _session_state: &SessionState, ) -> Arc { - let struct_array = StructArray::try_from(array).unwrap(); - - let filter_struct = struct_array - .project(filter_projection.as_slice()) - .expect("projecting filter struct"); - - let row_selector_op = Arc::new(RowSelectorExec::new(filter_exprs, &filter_struct)); + let row_selector_op = Arc::new(RowSelectorExec::new( + filter_exprs, + filter_projection, + &chunked_array, + )); Arc::new(TakeRowsExec::new( schema.clone(), &output_projection, row_selector_op.clone(), - &struct_array, + &chunked_array, )) } @@ -372,86 +372,83 @@ fn get_column_references(expr: &Expr) -> HashSet { } /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. -#[derive(Debug, Clone)] +#[derive(Clone)] struct VortexScanExec { - array: Array, + array: ChunkedArray, scan_projection: Vec, plan_properties: PlanProperties, } +impl Debug for VortexScanExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VortexScanExec") + .field("array_length", &self.array.len()) + .field("array_dtype", &self.array.dtype()) + .field("scan_projection", &self.scan_projection) + .field("plan_properties", &self.plan_properties) + .finish_non_exhaustive() + } +} + impl DisplayAs for VortexScanExec { fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -/// Read a single array chunk from the source as a RecordBatch. -/// -/// # Errors -/// This function will return an Error if `array` is not struct-typed. It will also return an -/// error if the projection references columns -fn execute_unfiltered( - array: Array, - projection: &Vec, -) -> DFResult { - // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. - let struct_array = array - .clone() - .into_struct() - .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; - - let projected_struct = struct_array - .project(projection.as_slice()) - .map_err(|vortex_err| { - exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") - })?; - let batch = RecordBatch::from( - projected_struct - .into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ); - Ok(Box::pin(VortexRecordBatchStream { - schema_ref: batch.schema(), - inner: futures::stream::iter(vec![batch]), - })) -} - -// Row selector stream. -// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays -// back down to the other systems here instead. - -#[pin_project] -pub(crate) struct VortexRecordBatchStream { +pub(crate) struct VortexRecordBatchStream { schema_ref: SchemaRef, - #[pin] - inner: I, + idx: usize, + num_chunks: usize, + chunks: ChunkedArray, + + projection: Vec, } -impl Stream for VortexRecordBatchStream -where - I: Stream, -{ +impl Stream for VortexRecordBatchStream { type Item = DFResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - match this.inner.poll_next_unpin(cx) { - Poll::Ready(Some(batch)) => Poll::Ready(Some(Ok(batch))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if this.idx >= this.num_chunks { + return Poll::Ready(None); } + + // Grab next chunk, project and convert to Arrow. + let chunk = this + .chunks + .chunk(this.idx) + .expect("nchunks should match precomputed"); + this.idx += 1; + + let struct_array = chunk + .clone() + .into_struct() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + + let projected_struct = + struct_array + .project(this.projection.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + + let batch = RecordBatch::from( + projected_struct + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + + Poll::Ready(Some(Ok(batch))) } } -impl RecordBatchStream for VortexRecordBatchStream -where - I: Stream, -{ +impl RecordBatchStream for VortexRecordBatchStream { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema_ref) } @@ -480,18 +477,17 @@ impl ExecutionPlan for VortexScanExec { fn execute( &self, - partition: usize, + _partition: usize, _context: Arc, ) -> DFResult { - let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { - chunked_array - .chunk(partition) - .ok_or_else(|| exec_datafusion_err!("partition not found"))? - } else { - self.array.clone() - }; - - execute_unfiltered(chunk, &self.scan_projection) + // Send back a stream of RecordBatch that returns the next element of the chunk each time. + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: self.schema().clone(), + idx: 0, + num_chunks: self.array.nchunks(), + chunks: self.array.clone(), + projection: self.scan_projection.clone(), + })) } } diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 7623538585..05391b8e69 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -2,7 +2,6 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; -use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -21,10 +20,10 @@ use datafusion_physical_plan::{ use futures::{ready, Stream}; use lazy_static::lazy_static; use pin_project::pin_project; -use vortex::array::struct_::StructArray; +use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::compute::take::take; -use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{ArrayDType, ArrayData, IntoArray, IntoArrayVariant, IntoCanonical}; use crate::datatype::infer_schema; use crate::expr::{make_conjunction, simplify_expr}; @@ -35,12 +34,13 @@ use crate::expr::{make_conjunction, simplify_expr}; pub(crate) struct RowSelectorExec { filter_exprs: Vec, + filter_projection: Vec, + // cached PlanProperties object. We do not make use of this. cached_plan_props: PlanProperties, - // A Vortex struct array that contains all columns necessary for executing the filter - // expressions. - filter_struct: StructArray, + // Full array. We only access partitions of this data. + chunked_array: ChunkedArray, } lazy_static! { @@ -52,16 +52,21 @@ lazy_static! { } impl RowSelectorExec { - pub(crate) fn new(filter_exprs: &[Expr], filter_struct: &StructArray) -> Self { + pub(crate) fn new( + filter_exprs: &[Expr], + filter_projection: Vec, + chunked_array: &ChunkedArray, + ) -> Self { let cached_plan_props = PlanProperties::new( EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), - Partitioning::RoundRobinBatch(1), + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); Self { filter_exprs: filter_exprs.to_owned(), - filter_struct: filter_struct.clone(), + filter_projection: filter_projection.clone(), + chunked_array: chunked_array.clone(), cached_plan_props, } } @@ -109,68 +114,69 @@ impl ExecutionPlan for RowSelectorExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> DFResult { assert_eq!( partition, 0, - "single partitioning only supported by TakeOperator" + "single partitioning only supported by RowSelectorExec" ); - let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); + // Derive a schema using the provided set of fields. - let filter_struct = self.filter_struct.clone(); - let one_shot = Box::pin(async move { filter_struct.into_array() }); + let filter_schema = Arc::new( + infer_schema(self.chunked_array.dtype()) + .project(self.filter_projection.as_slice()) + .unwrap(), + ); let conjunction_expr = simplify_expr( &make_conjunction(&self.filter_exprs)?, - stream_schema.clone(), + filter_schema.clone(), )?; Ok(Box::pin(RowIndicesStream { - one_shot, - polled_inner: false, + chunked_array: self.chunked_array.clone(), + chunk_idx: 0, + filter_projection: self.filter_projection.clone(), conjunction_expr, - schema_ref: stream_schema, - context: context.clone(), + filter_schema, })) } } /// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. -#[pin_project::pin_project] -pub(crate) struct RowIndicesStream { - /// The inner future that returns `DFResult`. - /// This future should only poll one time. - #[pin] - one_shot: F, - - polled_inner: bool, - +pub(crate) struct RowIndicesStream { + chunked_array: ChunkedArray, + chunk_idx: usize, conjunction_expr: Expr, - schema_ref: SchemaRef, - context: Arc, + filter_projection: Vec, + filter_schema: SchemaRef, } -impl Stream for RowIndicesStream -where - F: Future, -{ +impl Stream for RowIndicesStream { type Item = DFResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - // If we have already polled the one-shot future of filter records, indicate - // that the stream has finished. - if *this.polled_inner { + if this.chunk_idx >= this.chunked_array.nchunks() { return Poll::Ready(None); } + let next_chunk = this + .chunked_array + .chunk(this.chunk_idx) + .expect("chunk index in-bounds"); + this.chunk_idx += 1; + // Get the unfiltered record batch. // Since this is a one-shot, we only want to poll the inner future once, to create the // initial batch for us to process. - let vortex_struct = ready!(this.one_shot.poll(cx)); - *this.polled_inner = true; + let vortex_struct = next_chunk + .into_struct() + .expect("chunks must be StructArray") + .project(this.filter_projection.as_slice()) + .expect("projection should succeed"); // Immediately convert to Arrow RecordBatch for processing. // TODO(aduffy): attempt to pushdown the filter to Vortex without decoding. @@ -186,9 +192,9 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - let df_schema = DFSchema::try_from(this.schema_ref.clone())?; + let df_schema = DFSchema::try_from(this.filter_schema.clone())?; let physical_expr = - create_physical_expr(this.conjunction_expr, &df_schema, &Default::default())?; + create_physical_expr(&this.conjunction_expr, &df_schema, &Default::default())?; let selection = physical_expr .evaluate(&record_batch)? .into_array(record_batch.num_rows())?; @@ -209,12 +215,9 @@ where } } -impl RecordBatchStream for RowIndicesStream -where - F: Future, -{ +impl RecordBatchStream for RowIndicesStream { fn schema(&self) -> SchemaRef { - self.schema_ref.clone() + ROW_SELECTOR_SCHEMA_REF.clone() } } @@ -232,7 +235,7 @@ pub(crate) struct TakeRowsExec { output_schema: SchemaRef, // The original Vortex array holding the fields we have not decoded yet. - table: StructArray, + table: ChunkedArray, } impl TakeRowsExec { @@ -240,12 +243,12 @@ impl TakeRowsExec { schema_ref: SchemaRef, projection: &[usize], row_indices: Arc, - table: &StructArray, + table: &ChunkedArray, ) -> Self { let output_schema = Arc::new(schema_ref.project(projection).unwrap()); let plan_properties = PlanProperties::new( EquivalenceProperties::new(output_schema.clone()), - Partitioning::RoundRobinBatch(1), + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -299,16 +302,12 @@ impl ExecutionPlan for TakeRowsExec { partition: usize, context: Arc, ) -> DFResult { - assert_eq!( - partition, 0, - "single partitioning only supported by TakeOperator" - ); - + // Get the row indices for the given chunk. let row_indices_stream = self.input.execute(partition, context)?; Ok(Box::pin(TakeRowsStream { row_indices_stream, - completed: false, + chunk_idx: 0, output_projection: self.projection.clone(), output_schema: self.output_schema.clone(), vortex_array: self.table.clone(), @@ -323,14 +322,17 @@ pub(crate) struct TakeRowsStream { #[pin] row_indices_stream: F, - completed: bool, + // The current chunk. Every time we receive a new RecordBatch from the upstream operator + // we treat it as a set of row-indices that are zero-indexed relative to this chunk number + // in the `vortex_array`. + chunk_idx: usize, // Projection based on the schema here output_projection: Vec, output_schema: SchemaRef, // The original Vortex array we're taking from - vortex_array: StructArray, + vortex_array: ChunkedArray, } impl Stream for TakeRowsStream @@ -342,24 +344,20 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - // If `poll_next` has already fired, return None indicating end of the stream. - if *this.completed { - return Poll::Ready(None); - } - // Get the indices provided by the upstream operator. let record_batch = match ready!(this.row_indices_stream.poll_next(cx)) { None => { // Row indices stream is complete, we are also complete. - // This should never happen right now given we only emit one recordbatch upstream. return Poll::Ready(None); } - Some(result) => { - *this.completed = true; - result? - } + Some(result) => result?, }; + assert!( + *this.chunk_idx <= this.vortex_array.nchunks(), + "input yielded too many RecordBatches" + ); + let row_indices = ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) .into_array(); @@ -376,10 +374,19 @@ where .unwrap()))); } + let chunk = this + .vortex_array + .chunk(*this.chunk_idx) + .expect("streamed too many chunks") + .into_struct() + .expect("chunks must be struct-encoded"); + + *this.chunk_idx += 1; + // TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful. // We should find a way to avoid decoding the filter columns and only decode the other // columns, then stitch the StructArray back together from those. - let projected_for_output = this.vortex_array.project(this.output_projection).unwrap(); + let projected_for_output = chunk.project(this.output_projection).unwrap(); let decoded = take(&projected_for_output.into_array(), &row_indices) .expect("take") .into_canonical() @@ -411,10 +418,11 @@ mod test { use datafusion_expr::{and, col, lit}; use itertools::Itertools; use vortex::array::bool::BoolArray; + use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::array::struct_::StructArray; use vortex::validity::Validity; - use vortex::IntoArray; + use vortex::{ArrayDType, IntoArray}; use vortex_dtype::FieldName; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; @@ -426,35 +434,36 @@ mod test { Field::new("b", DataType::Boolean, false), ])); - let _schema = schema.clone(); - let one_shot = Box::pin(async move { - StructArray::try_new( - Arc::new([FieldName::from("a"), FieldName::from("b")]), - vec![ - PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), - BoolArray::from(vec![false, false, true]).into_array(), - ], - 3, - Validity::NonNullable, - ) - .unwrap() - .into_array() - }); + let chunk = StructArray::try_new( + Arc::new([FieldName::from("a"), FieldName::from("b")]), + vec![ + PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), + BoolArray::from(vec![false, false, true]).into_array(), + ], + 3, + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let dtype = chunk.dtype().clone(); + let chunked_array = + ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap(); let _schema = schema.clone(); let filtering_stream = RowIndicesStream { - one_shot, - polled_inner: false, + chunked_array: chunked_array.clone(), + chunk_idx: 0, conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), - schema_ref: _schema, - context: Arc::new(Default::default()), + filter_projection: vec![0, 1], + filter_schema: _schema, }; let rows: Vec = futures::executor::block_on_stream(filtering_stream) .try_collect() .unwrap(); - assert_eq!(rows.len(), 1); + assert_eq!(rows.len(), 2); // The output of row selection is a RecordBatch of indices that can be used as selectors // against the original RecordBatch. @@ -466,5 +475,14 @@ mod test { ) .unwrap() ); + + assert_eq!( + rows[1], + RecordBatch::try_new( + ROW_SELECTOR_SCHEMA_REF.clone(), + vec![Arc::new(UInt64Array::from(vec![2u64])),] + ) + .unwrap() + ); } }