From cc4d32fe40a1719ee9d63fbeb5d835af2d7f5d42 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 11 Jul 2024 13:59:43 -0400 Subject: [PATCH 1/3] Make into_arrow truly zero-copy for Primitive, VarBin Before we would go through the ArrowBuffer::from(&[u8]) constructor, which would actually do a full data copy. Now we're just doing pointer tricks. Shaved about 30ms (~7%) off of the tpc-h benchmark. --- bench-vortex/benches/tpch_benchmark.rs | 41 +++++++++++++++++---- bench-vortex/src/bin/tpch_benchmark.rs | 15 +++++--- bench-vortex/src/tpch/mod.rs | 29 +++++++++++---- vortex-array/src/arrow/wrappers.rs | 4 +-- vortex-array/src/canonical.rs | 50 +++++++++++++++----------- vortex-buffer/src/lib.rs | 20 ++++++++--- vortex-datafusion/src/plans.rs | 2 +- 7 files changed, 116 insertions(+), 45 deletions(-) diff --git a/bench-vortex/benches/tpch_benchmark.rs b/bench-vortex/benches/tpch_benchmark.rs index 3e56cf2aea..7f33574b48 100644 --- a/bench-vortex/benches/tpch_benchmark.rs +++ b/bench-vortex/benches/tpch_benchmark.rs @@ -2,10 +2,19 @@ use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; use bench_vortex::tpch::query::Q1; use bench_vortex::tpch::{load_datasets, Format}; use criterion::{criterion_group, criterion_main, Criterion}; -use tokio::runtime::Runtime; +use tokio::runtime::Builder; fn benchmark(c: &mut Criterion) { - let runtime = Runtime::new().unwrap(); + let runtime = Builder::new_current_thread() + .thread_name("bench-worker") + .enable_all() + .build() + .unwrap(); + let prework_runtime = Builder::new_current_thread() + .thread_name("prework") + .enable_all() + .build() + .unwrap(); // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); @@ -13,15 +22,33 @@ fn benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("tpch q1"); group.sample_size(10); - let ctx = runtime - .block_on(load_datasets(&data_dir, Format::VortexUncompressed)) + let ctx = prework_runtime + .block_on(load_datasets( + &data_dir, + Format::Vortex { + disable_pushdown: false, + }, + )) + .unwrap(); + group.bench_function("vortex-pushdown", |b| { + b.to_async(&runtime) + .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) + }); + + let ctx = prework_runtime + .block_on(load_datasets( + &data_dir, + Format::Vortex { + disable_pushdown: true, + }, + )) .unwrap(); - group.bench_function("vortex", |b| { + group.bench_function("vortex-nopushdown", |b| { b.to_async(&runtime) .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) }); - let ctx = runtime + let ctx = prework_runtime .block_on(load_datasets(&data_dir, Format::Csv)) .unwrap(); group.bench_function("csv", |b| { @@ -29,7 +56,7 @@ fn benchmark(c: &mut Criterion) { .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) }); - let ctx = runtime + let ctx = prework_runtime .block_on(load_datasets(&data_dir, Format::Arrow)) .unwrap(); group.bench_function("arrow", |b| { diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 3919a56426..318be778cb 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::path::PathBuf; use std::time::SystemTime; @@ -33,7 +34,13 @@ async fn q1_arrow(base_dir: &PathBuf) -> anyhow::Result<()> { } async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> { - let ctx = load_datasets(base_dir, Format::VortexUncompressed).await?; + let ctx = load_datasets( + base_dir, + Format::Vortex { + disable_pushdown: true, + }, + ) + .await?; println!("BEGIN: Q1(VORTEX)"); let start = SystemTime::now(); @@ -46,12 +53,12 @@ async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> { Ok(()) } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); - q1_csv(&data_dir).await.unwrap(); + // q1_csv(&data_dir).await.unwrap(); q1_arrow(&data_dir).await.unwrap(); - q1_vortex(&data_dir).await.unwrap(); + // q1_vortex(&data_dir).await.unwrap(); } diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 93fce99c45..ed439572cb 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -8,7 +8,7 @@ use datafusion::prelude::{CsvReadOptions, SessionContext}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; -use vortex_datafusion::SessionContextExt; +use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; pub mod dbgen; pub mod query; @@ -17,7 +17,7 @@ pub mod schema; pub enum Format { Csv, Arrow, - VortexUncompressed, + Vortex { disable_pushdown: bool }, } // Generate table dataset. @@ -42,8 +42,17 @@ pub async fn load_datasets>( match format { Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await, Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await, - Format::VortexUncompressed => { - register_vortex(&context, stringify!($name), &$name, $schema).await + Format::Vortex { + disable_pushdown, .. + } => { + register_vortex( + &context, + stringify!($name), + &$name, + $schema, + disable_pushdown, + ) + .await } } }; @@ -113,7 +122,7 @@ async fn register_vortex( name: &str, file: &Path, schema: &Schema, - // TODO(aduffy): add compression option + disable_pushdown: bool, ) -> anyhow::Result<()> { let record_batches = session .read_csv( @@ -141,7 +150,15 @@ async fn register_vortex( .into_canonical()? .into_array(); - session.register_vortex(name, chunked_array)?; + // Convert to Arrow to concat all chunks, then flip back to Vortex for processing. + let arrow = chunked_array.into_canonical()?.into_arrow(); + let array = ArrayData::from_arrow(arrow, false).into_array(); + + session.register_vortex_opts( + name, + array, + VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown), + )?; Ok(()) } diff --git a/vortex-array/src/arrow/wrappers.rs b/vortex-array/src/arrow/wrappers.rs index e353eacc7f..39c3c00bc9 100644 --- a/vortex-array/src/arrow/wrappers.rs +++ b/vortex-array/src/arrow/wrappers.rs @@ -1,4 +1,4 @@ -use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer}; use vortex_dtype::NativePType; use crate::array::primitive::PrimitiveArray; @@ -7,7 +7,7 @@ pub fn as_scalar_buffer( array: PrimitiveArray, ) -> ScalarBuffer { assert_eq!(array.ptype(), T::PTYPE); - ScalarBuffer::from(ArrowBuffer::from(array.buffer())) + ScalarBuffer::from(array.buffer().clone().into_arrow()) } pub fn as_offset_buffer( diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index c21497fe2e..8670340d05 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -152,7 +152,7 @@ fn primitive_to_arrow(primitive_array: PrimitiveArray) -> ArrayRef { array: &PrimitiveArray, ) -> ArrowPrimitiveArray { ArrowPrimitiveArray::new( - ScalarBuffer::::new(array.buffer().clone().into(), 0, array.len()), + ScalarBuffer::::new(array.buffer().clone().into_arrow(), 0, array.len()), array .logical_validity() .to_null_buffer() @@ -242,29 +242,37 @@ fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { // Switch on Arrow DType. match varbin_array.dtype() { DType::Binary(_) => match offsets.ptype() { - PType::I32 => Arc::new(BinaryArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - PType::I64 => Arc::new(LargeBinaryArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), + PType::I32 => Arc::new(unsafe { + BinaryArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + PType::I64 => Arc::new(unsafe { + LargeBinaryArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), _ => panic!("Invalid offsets type"), }, DType::Utf8(_) => match offsets.ptype() { - PType::I32 => Arc::new(StringArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - PType::I64 => Arc::new(LargeStringArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), + PType::I32 => Arc::new(unsafe { + StringArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), + PType::I64 => Arc::new(unsafe { + LargeStringArray::new_unchecked( + as_offset_buffer::(offsets), + data.clone().into_arrow(), + nulls, + ) + }), _ => panic!("Invalid offsets type"), }, _ => panic!( diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 7dcddd4bdf..1d7122f3db 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -1,13 +1,13 @@ -mod flexbuffers; -pub mod io_buf; -mod string; - use std::cmp::Ordering; use std::ops::{Deref, Range}; use arrow_buffer::{ArrowNativeType, Buffer as ArrowBuffer}; pub use string::*; +mod flexbuffers; +pub mod io_buf; +mod string; + #[derive(Debug, Clone)] pub enum Buffer { // TODO(ngates): we could add Aligned(Arc) from aligned-vec package @@ -56,6 +56,18 @@ impl Buffer { Self::Bytes(_) => Err(self), } } + + /// Convert a Buffer into an ArrowBuffer with no copying. + pub fn into_arrow(self) -> ArrowBuffer { + match self { + Buffer::Arrow(a) => a, + Buffer::Bytes(b) => { + let v: Vec = b.into(); + + ArrowBuffer::from_vec(v) + } + } + } } impl Deref for Buffer { diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 2e0daf3159..7623538585 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -139,7 +139,7 @@ impl ExecutionPlan for RowSelectorExec { /// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. #[pin_project::pin_project] pub(crate) struct RowIndicesStream { - /// The inner future that returns `DFResult`. + /// The inner future that returns `DFResult`. /// This future should only poll one time. #[pin] one_shot: F, From 076be970839037d69f229ca38d37327a224982c2 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Fri, 12 Jul 2024 13:03:27 -0400 Subject: [PATCH 2/3] draw the rest of the f*cking owl --- bench-vortex/src/bin/tpch_benchmark.rs | 7 +- bench-vortex/src/tpch/mod.rs | 12 +- vortex-array/src/canonical.rs | 2 +- vortex-datafusion/src/lib.rs | 174 +++++++++++----------- vortex-datafusion/src/plans.rs | 198 ++++++++++++++----------- 5 files changed, 202 insertions(+), 191 deletions(-) diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index 318be778cb..d9c46232ac 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -55,10 +55,13 @@ async fn q1_vortex(base_dir: &PathBuf) -> anyhow::Result<()> { #[tokio::main(flavor = "current_thread")] async fn main() { + // uncomment the below to enable trace logging of datafusion execution + // setup_logger(LevelFilter::Trace); + // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); - // q1_csv(&data_dir).await.unwrap(); + q1_csv(&data_dir).await.unwrap(); q1_arrow(&data_dir).await.unwrap(); - // q1_vortex(&data_dir).await.unwrap(); + q1_vortex(&data_dir).await.unwrap(); } diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index ed439572cb..473905c433 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -7,7 +7,7 @@ use datafusion::datasource::MemTable; use datafusion::prelude::{CsvReadOptions, SessionContext}; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; -use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayData, IntoArray}; use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; pub mod dbgen; @@ -146,17 +146,11 @@ async fn register_vortex( .collect(); let dtype = chunks[0].dtype().clone(); - let chunked_array = ChunkedArray::try_new(chunks, dtype)? - .into_canonical()? - .into_array(); - - // Convert to Arrow to concat all chunks, then flip back to Vortex for processing. - let arrow = chunked_array.into_canonical()?.into_arrow(); - let array = ArrayData::from_arrow(arrow, false).into_array(); + let chunked_array = ChunkedArray::try_new(chunks, dtype)?.into_array(); session.register_vortex_opts( name, - array, + chunked_array, VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown), )?; diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 8670340d05..a61fd18c02 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -294,7 +294,7 @@ fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayR .to_null_buffer() .expect("null buffer"); let timestamps_len = timestamps.len(); - let buffer = ScalarBuffer::::new(timestamps.into_buffer().into(), 0, timestamps_len); + let buffer = ScalarBuffer::::new(timestamps.into_buffer().into_arrow(), 0, timestamps_len); match local_date_time_array.time_unit() { TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 1e2da7eacc..cb3c8a75fc 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -22,11 +22,9 @@ use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; -use futures::{Stream, StreamExt}; +use futures::Stream; use itertools::Itertools; -use pin_project::pin_project; use vortex::array::chunked::ChunkedArray; -use vortex::array::struct_::StructArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; @@ -109,7 +107,7 @@ impl SessionContextExt for SessionContext { /// a table to DataFusion. #[derive(Debug, Clone)] pub struct VortexMemTable { - array: Array, + array: ChunkedArray, schema_ref: SchemaRef, options: VortexMemTableOptions, } @@ -124,6 +122,14 @@ impl VortexMemTable { let arrow_schema = infer_schema(array.dtype()); let schema_ref = SchemaRef::new(arrow_schema); + let array = match ChunkedArray::try_from(&array) { + Ok(a) => a, + _ => { + let dtype = array.dtype().clone(); + ChunkedArray::try_new(vec![array], dtype).unwrap() + } + }; + Self { array, schema_ref, @@ -176,12 +182,6 @@ impl TableProvider for VortexMemTable { Some(filters) }; - let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { - Partitioning::RoundRobinBatch(chunked_array.nchunks()) - } else { - Partitioning::UnknownPartitioning(1) - }; - let output_projection: Vec = match projection { None => (0..self.schema_ref.fields().len()).collect(), Some(proj) => proj.clone(), @@ -215,7 +215,9 @@ impl TableProvider for VortexMemTable { ); let plan_properties = PlanProperties::new( EquivalenceProperties::new(output_schema), - partitioning, + // non-pushdown scans execute in single partition, where the partition + // yields one RecordBatch per chunk in the input ChunkedArray + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -265,23 +267,21 @@ fn make_filter_then_take_plan( schema: SchemaRef, filter_exprs: &[Expr], filter_projection: Vec, - array: Array, + chunked_array: ChunkedArray, output_projection: Vec, _session_state: &SessionState, ) -> Arc { - let struct_array = StructArray::try_from(array).unwrap(); - - let filter_struct = struct_array - .project(filter_projection.as_slice()) - .expect("projecting filter struct"); - - let row_selector_op = Arc::new(RowSelectorExec::new(filter_exprs, &filter_struct)); + let row_selector_op = Arc::new(RowSelectorExec::new( + filter_exprs, + filter_projection, + &chunked_array, + )); Arc::new(TakeRowsExec::new( schema.clone(), &output_projection, row_selector_op.clone(), - &struct_array, + &chunked_array, )) } @@ -372,86 +372,83 @@ fn get_column_references(expr: &Expr) -> HashSet { } /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. -#[derive(Debug, Clone)] +#[derive(Clone)] struct VortexScanExec { - array: Array, + array: ChunkedArray, scan_projection: Vec, plan_properties: PlanProperties, } +impl Debug for VortexScanExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VortexScanExec") + .field("array_length", &self.array.len()) + .field("array_dtype", &self.array.dtype()) + .field("scan_projection", &self.scan_projection) + .field("plan_properties", &self.plan_properties) + .finish_non_exhaustive() + } +} + impl DisplayAs for VortexScanExec { fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -/// Read a single array chunk from the source as a RecordBatch. -/// -/// # Errors -/// This function will return an Error if `array` is not struct-typed. It will also return an -/// error if the projection references columns -fn execute_unfiltered( - array: Array, - projection: &Vec, -) -> DFResult { - // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. - let struct_array = array - .clone() - .into_struct() - .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; - - let projected_struct = struct_array - .project(projection.as_slice()) - .map_err(|vortex_err| { - exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") - })?; - let batch = RecordBatch::from( - projected_struct - .into_canonical() - .expect("struct arrays must canonicalize") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ); - Ok(Box::pin(VortexRecordBatchStream { - schema_ref: batch.schema(), - inner: futures::stream::iter(vec![batch]), - })) -} - -// Row selector stream. -// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays -// back down to the other systems here instead. - -#[pin_project] -pub(crate) struct VortexRecordBatchStream { +pub(crate) struct VortexRecordBatchStream { schema_ref: SchemaRef, - #[pin] - inner: I, + idx: usize, + num_chunks: usize, + chunks: ChunkedArray, + + projection: Vec, } -impl Stream for VortexRecordBatchStream -where - I: Stream, -{ +impl Stream for VortexRecordBatchStream { type Item = DFResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - match this.inner.poll_next_unpin(cx) { - Poll::Ready(Some(batch)) => Poll::Ready(Some(Ok(batch))), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if this.idx >= this.num_chunks { + return Poll::Ready(None); } + + // Grab next chunk, project and convert to Arrow. + let chunk = this + .chunks + .chunk(this.idx) + .expect("nchunks should match precomputed"); + this.idx += 1; + + let struct_array = chunk + .clone() + .into_struct() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + + let projected_struct = + struct_array + .project(this.projection.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + + let batch = RecordBatch::from( + projected_struct + .into_canonical() + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + + Poll::Ready(Some(Ok(batch))) } } -impl RecordBatchStream for VortexRecordBatchStream -where - I: Stream, -{ +impl RecordBatchStream for VortexRecordBatchStream { fn schema(&self) -> SchemaRef { Arc::clone(&self.schema_ref) } @@ -480,18 +477,17 @@ impl ExecutionPlan for VortexScanExec { fn execute( &self, - partition: usize, + _partition: usize, _context: Arc, ) -> DFResult { - let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { - chunked_array - .chunk(partition) - .ok_or_else(|| exec_datafusion_err!("partition not found"))? - } else { - self.array.clone() - }; - - execute_unfiltered(chunk, &self.scan_projection) + // Send back a stream of RecordBatch that returns the next element of the chunk each time. + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: self.schema().clone(), + idx: 0, + num_chunks: self.array.nchunks(), + chunks: self.array.clone(), + projection: self.scan_projection.clone(), + })) } } diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 7623538585..05391b8e69 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -2,7 +2,6 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; -use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -21,10 +20,10 @@ use datafusion_physical_plan::{ use futures::{ready, Stream}; use lazy_static::lazy_static; use pin_project::pin_project; -use vortex::array::struct_::StructArray; +use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::compute::take::take; -use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{ArrayDType, ArrayData, IntoArray, IntoArrayVariant, IntoCanonical}; use crate::datatype::infer_schema; use crate::expr::{make_conjunction, simplify_expr}; @@ -35,12 +34,13 @@ use crate::expr::{make_conjunction, simplify_expr}; pub(crate) struct RowSelectorExec { filter_exprs: Vec, + filter_projection: Vec, + // cached PlanProperties object. We do not make use of this. cached_plan_props: PlanProperties, - // A Vortex struct array that contains all columns necessary for executing the filter - // expressions. - filter_struct: StructArray, + // Full array. We only access partitions of this data. + chunked_array: ChunkedArray, } lazy_static! { @@ -52,16 +52,21 @@ lazy_static! { } impl RowSelectorExec { - pub(crate) fn new(filter_exprs: &[Expr], filter_struct: &StructArray) -> Self { + pub(crate) fn new( + filter_exprs: &[Expr], + filter_projection: Vec, + chunked_array: &ChunkedArray, + ) -> Self { let cached_plan_props = PlanProperties::new( EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), - Partitioning::RoundRobinBatch(1), + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); Self { filter_exprs: filter_exprs.to_owned(), - filter_struct: filter_struct.clone(), + filter_projection: filter_projection.clone(), + chunked_array: chunked_array.clone(), cached_plan_props, } } @@ -109,68 +114,69 @@ impl ExecutionPlan for RowSelectorExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> DFResult { assert_eq!( partition, 0, - "single partitioning only supported by TakeOperator" + "single partitioning only supported by RowSelectorExec" ); - let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); + // Derive a schema using the provided set of fields. - let filter_struct = self.filter_struct.clone(); - let one_shot = Box::pin(async move { filter_struct.into_array() }); + let filter_schema = Arc::new( + infer_schema(self.chunked_array.dtype()) + .project(self.filter_projection.as_slice()) + .unwrap(), + ); let conjunction_expr = simplify_expr( &make_conjunction(&self.filter_exprs)?, - stream_schema.clone(), + filter_schema.clone(), )?; Ok(Box::pin(RowIndicesStream { - one_shot, - polled_inner: false, + chunked_array: self.chunked_array.clone(), + chunk_idx: 0, + filter_projection: self.filter_projection.clone(), conjunction_expr, - schema_ref: stream_schema, - context: context.clone(), + filter_schema, })) } } /// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. -#[pin_project::pin_project] -pub(crate) struct RowIndicesStream { - /// The inner future that returns `DFResult`. - /// This future should only poll one time. - #[pin] - one_shot: F, - - polled_inner: bool, - +pub(crate) struct RowIndicesStream { + chunked_array: ChunkedArray, + chunk_idx: usize, conjunction_expr: Expr, - schema_ref: SchemaRef, - context: Arc, + filter_projection: Vec, + filter_schema: SchemaRef, } -impl Stream for RowIndicesStream -where - F: Future, -{ +impl Stream for RowIndicesStream { type Item = DFResult; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); - // If we have already polled the one-shot future of filter records, indicate - // that the stream has finished. - if *this.polled_inner { + if this.chunk_idx >= this.chunked_array.nchunks() { return Poll::Ready(None); } + let next_chunk = this + .chunked_array + .chunk(this.chunk_idx) + .expect("chunk index in-bounds"); + this.chunk_idx += 1; + // Get the unfiltered record batch. // Since this is a one-shot, we only want to poll the inner future once, to create the // initial batch for us to process. - let vortex_struct = ready!(this.one_shot.poll(cx)); - *this.polled_inner = true; + let vortex_struct = next_chunk + .into_struct() + .expect("chunks must be StructArray") + .project(this.filter_projection.as_slice()) + .expect("projection should succeed"); // Immediately convert to Arrow RecordBatch for processing. // TODO(aduffy): attempt to pushdown the filter to Vortex without decoding. @@ -186,9 +192,9 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - let df_schema = DFSchema::try_from(this.schema_ref.clone())?; + let df_schema = DFSchema::try_from(this.filter_schema.clone())?; let physical_expr = - create_physical_expr(this.conjunction_expr, &df_schema, &Default::default())?; + create_physical_expr(&this.conjunction_expr, &df_schema, &Default::default())?; let selection = physical_expr .evaluate(&record_batch)? .into_array(record_batch.num_rows())?; @@ -209,12 +215,9 @@ where } } -impl RecordBatchStream for RowIndicesStream -where - F: Future, -{ +impl RecordBatchStream for RowIndicesStream { fn schema(&self) -> SchemaRef { - self.schema_ref.clone() + ROW_SELECTOR_SCHEMA_REF.clone() } } @@ -232,7 +235,7 @@ pub(crate) struct TakeRowsExec { output_schema: SchemaRef, // The original Vortex array holding the fields we have not decoded yet. - table: StructArray, + table: ChunkedArray, } impl TakeRowsExec { @@ -240,12 +243,12 @@ impl TakeRowsExec { schema_ref: SchemaRef, projection: &[usize], row_indices: Arc, - table: &StructArray, + table: &ChunkedArray, ) -> Self { let output_schema = Arc::new(schema_ref.project(projection).unwrap()); let plan_properties = PlanProperties::new( EquivalenceProperties::new(output_schema.clone()), - Partitioning::RoundRobinBatch(1), + Partitioning::UnknownPartitioning(1), ExecutionMode::Bounded, ); @@ -299,16 +302,12 @@ impl ExecutionPlan for TakeRowsExec { partition: usize, context: Arc, ) -> DFResult { - assert_eq!( - partition, 0, - "single partitioning only supported by TakeOperator" - ); - + // Get the row indices for the given chunk. let row_indices_stream = self.input.execute(partition, context)?; Ok(Box::pin(TakeRowsStream { row_indices_stream, - completed: false, + chunk_idx: 0, output_projection: self.projection.clone(), output_schema: self.output_schema.clone(), vortex_array: self.table.clone(), @@ -323,14 +322,17 @@ pub(crate) struct TakeRowsStream { #[pin] row_indices_stream: F, - completed: bool, + // The current chunk. Every time we receive a new RecordBatch from the upstream operator + // we treat it as a set of row-indices that are zero-indexed relative to this chunk number + // in the `vortex_array`. + chunk_idx: usize, // Projection based on the schema here output_projection: Vec, output_schema: SchemaRef, // The original Vortex array we're taking from - vortex_array: StructArray, + vortex_array: ChunkedArray, } impl Stream for TakeRowsStream @@ -342,24 +344,20 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - // If `poll_next` has already fired, return None indicating end of the stream. - if *this.completed { - return Poll::Ready(None); - } - // Get the indices provided by the upstream operator. let record_batch = match ready!(this.row_indices_stream.poll_next(cx)) { None => { // Row indices stream is complete, we are also complete. - // This should never happen right now given we only emit one recordbatch upstream. return Poll::Ready(None); } - Some(result) => { - *this.completed = true; - result? - } + Some(result) => result?, }; + assert!( + *this.chunk_idx <= this.vortex_array.nchunks(), + "input yielded too many RecordBatches" + ); + let row_indices = ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) .into_array(); @@ -376,10 +374,19 @@ where .unwrap()))); } + let chunk = this + .vortex_array + .chunk(*this.chunk_idx) + .expect("streamed too many chunks") + .into_struct() + .expect("chunks must be struct-encoded"); + + *this.chunk_idx += 1; + // TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful. // We should find a way to avoid decoding the filter columns and only decode the other // columns, then stitch the StructArray back together from those. - let projected_for_output = this.vortex_array.project(this.output_projection).unwrap(); + let projected_for_output = chunk.project(this.output_projection).unwrap(); let decoded = take(&projected_for_output.into_array(), &row_indices) .expect("take") .into_canonical() @@ -411,10 +418,11 @@ mod test { use datafusion_expr::{and, col, lit}; use itertools::Itertools; use vortex::array::bool::BoolArray; + use vortex::array::chunked::ChunkedArray; use vortex::array::primitive::PrimitiveArray; use vortex::array::struct_::StructArray; use vortex::validity::Validity; - use vortex::IntoArray; + use vortex::{ArrayDType, IntoArray}; use vortex_dtype::FieldName; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; @@ -426,35 +434,36 @@ mod test { Field::new("b", DataType::Boolean, false), ])); - let _schema = schema.clone(); - let one_shot = Box::pin(async move { - StructArray::try_new( - Arc::new([FieldName::from("a"), FieldName::from("b")]), - vec![ - PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), - BoolArray::from(vec![false, false, true]).into_array(), - ], - 3, - Validity::NonNullable, - ) - .unwrap() - .into_array() - }); + let chunk = StructArray::try_new( + Arc::new([FieldName::from("a"), FieldName::from("b")]), + vec![ + PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), + BoolArray::from(vec![false, false, true]).into_array(), + ], + 3, + Validity::NonNullable, + ) + .unwrap() + .into_array(); + + let dtype = chunk.dtype().clone(); + let chunked_array = + ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap(); let _schema = schema.clone(); let filtering_stream = RowIndicesStream { - one_shot, - polled_inner: false, + chunked_array: chunked_array.clone(), + chunk_idx: 0, conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), - schema_ref: _schema, - context: Arc::new(Default::default()), + filter_projection: vec![0, 1], + filter_schema: _schema, }; let rows: Vec = futures::executor::block_on_stream(filtering_stream) .try_collect() .unwrap(); - assert_eq!(rows.len(), 1); + assert_eq!(rows.len(), 2); // The output of row selection is a RecordBatch of indices that can be used as selectors // against the original RecordBatch. @@ -466,5 +475,14 @@ mod test { ) .unwrap() ); + + assert_eq!( + rows[1], + RecordBatch::try_new( + ROW_SELECTOR_SCHEMA_REF.clone(), + vec![Arc::new(UInt64Array::from(vec![2u64])),] + ) + .unwrap() + ); } } From 948f7c3da7840c53ab5564c359d9a5fee713cf57 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Fri, 12 Jul 2024 13:16:53 -0400 Subject: [PATCH 3/3] undo the two executors thing --- bench-vortex/benches/tpch_benchmark.rs | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/bench-vortex/benches/tpch_benchmark.rs b/bench-vortex/benches/tpch_benchmark.rs index 7f33574b48..fc98eb6470 100644 --- a/bench-vortex/benches/tpch_benchmark.rs +++ b/bench-vortex/benches/tpch_benchmark.rs @@ -5,16 +5,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Builder; fn benchmark(c: &mut Criterion) { - let runtime = Builder::new_current_thread() - .thread_name("bench-worker") - .enable_all() - .build() - .unwrap(); - let prework_runtime = Builder::new_current_thread() - .thread_name("prework") - .enable_all() - .build() - .unwrap(); + let runtime = Builder::new_current_thread().enable_all().build().unwrap(); // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); @@ -22,7 +13,7 @@ fn benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("tpch q1"); group.sample_size(10); - let ctx = prework_runtime + let ctx = runtime .block_on(load_datasets( &data_dir, Format::Vortex { @@ -35,7 +26,7 @@ fn benchmark(c: &mut Criterion) { .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) }); - let ctx = prework_runtime + let ctx = runtime .block_on(load_datasets( &data_dir, Format::Vortex { @@ -48,7 +39,7 @@ fn benchmark(c: &mut Criterion) { .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) }); - let ctx = prework_runtime + let ctx = runtime .block_on(load_datasets(&data_dir, Format::Csv)) .unwrap(); group.bench_function("csv", |b| { @@ -56,7 +47,7 @@ fn benchmark(c: &mut Criterion) { .iter(|| async { ctx.sql(Q1).await.unwrap().collect().await.unwrap() }) }); - let ctx = prework_runtime + let ctx = runtime .block_on(load_datasets(&data_dir, Format::Arrow)) .unwrap(); group.bench_function("arrow", |b| {