diff --git a/Cargo.lock b/Cargo.lock index 3fd8d68809..ce03c9eb3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,11 +421,13 @@ name = "bench-vortex" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-schema", "arrow-select", "bytes", "bzip2", "criterion", "csv", + "datafusion", "enum-iterator", "flexbuffers", "futures", @@ -435,6 +437,7 @@ dependencies = [ "log", "mimalloc", "parquet", + "rand", "reqwest", "serde", "simplelog", @@ -443,6 +446,7 @@ dependencies = [ "vortex-alp", "vortex-array", "vortex-buffer", + "vortex-datafusion", "vortex-datetime-parts", "vortex-dict", "vortex-dtype", diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index cb222976d9..b314f0dcf4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -16,10 +16,12 @@ workspace = true [dependencies] arrow-array = { workspace = true } +arrow-schema = { workspace = true } arrow-select = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } csv = { workspace = true } +datafusion = { workspace = true } enum-iterator = { workspace = true } flexbuffers = { workspace = true } futures = { workspace = true } @@ -29,6 +31,7 @@ lazy_static = { workspace = true } log = { workspace = true } mimalloc = { workspace = true } parquet = { workspace = true, features = [] } +rand = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } simplelog = { workspace = true } @@ -37,6 +40,7 @@ uuid = { workspace = true, features = ["v4"] } vortex-alp = { path = "../encodings/alp" } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } +vortex-datafusion = { path = "../vortex-datafusion" } vortex-datetime-parts = { path = "../encodings/datetime-parts" } vortex-dict = { path = "../encodings/dict" } vortex-dtype = { path = "../vortex-dtype" } @@ -56,3 +60,7 @@ harness = false [[bench]] name = "random_access" harness = false + +[[bench]] +name = "datafusion_benchmark" +harness = false diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs new file mode 100644 index 0000000000..1e9204039c --- /dev/null +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use arrow_array::builder::{StringBuilder, UInt32Builder}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion::datasource::MemTable; +use datafusion::functions_aggregate::expr_fn::sum; +use datafusion::logical_expr::lit; +use datafusion::prelude::{col, SessionContext}; +use lazy_static::lazy_static; +use vortex::compress::Compressor; +use vortex::encoding::EncodingRef; +use vortex::{Array, Context, IntoArray, ToArrayData}; +use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; + +lazy_static! { + pub static ref CTX: Context = Context::default().with_encodings([ + &BitPackedEncoding as EncodingRef, + // &DictEncoding, + &FoREncoding, + &DeltaEncoding, + ]); +} + +fn toy_dataset_arrow() -> RecordBatch { + // 64,000 rows of string and numeric data. + // 8,000 values of first string, second string, third string, etc. + + let names = vec![ + "Alexander", + "Anastasia", + "Archibald", + "Bartholomew", + "Benjamin", + "Christopher", + "Elizabeth", + "Gabriella", + ]; + + let mut col1 = StringBuilder::with_capacity(64_000, 64_000_000); + let mut col2 = UInt32Builder::with_capacity(64_000); + for i in 0..64_000 { + col1.append_value(names[i % 8]); + col2.append_value(u32::try_from(i).unwrap()); + } + + let col1 = col1.finish(); + let col2 = col2.finish(); + + RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("names", DataType::Utf8, false), + Field::new("scores", DataType::UInt32, false), + ])), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap() +} + +fn toy_dataset_vortex() -> Array { + let uncompressed = toy_dataset_arrow().to_array_data().into_array(); + println!("uncompressed vortex size: {}B", uncompressed.nbytes()); + + let compressor = Compressor::new(&CTX); + let compressed = compressor.compress(&uncompressed, None).unwrap(); + println!("compressed vortex size: {} B", compressed.nbytes()); + compressed +} + +fn bench_datafusion(c: &mut Criterion) { + let mut group = c.benchmark_group("datafusion"); + + let session = SessionContext::new(); + + let arrow_dataset = toy_dataset_arrow(); + let arrow_table = + Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap()); + + group.bench_function("arrow", |b| { + let arrow_table = arrow_table.clone(); + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + black_box(session.read_table(arrow_table.clone()).unwrap()) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); + + let vortex_dataset = toy_dataset_vortex(); + let vortex_table_pushdown = Arc::new( + VortexMemTable::try_new(vortex_dataset, VortexMemTableOptions::default()).unwrap(), + ); + group.bench_function("vortex_pushdown", |b| { + let vortex_table_pushdown = vortex_table_pushdown.clone(); + b.to_async(tokio::runtime::Runtime::new().unwrap()) + .iter(|| async { + black_box(session.read_table(vortex_table_pushdown.clone()).unwrap()) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); + + let vortex_dataset = toy_dataset_vortex(); + let vortex_table_no_pushdown = Arc::new( + VortexMemTable::try_new( + vortex_dataset, + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(), + ); + group.bench_function("vortex_no_pushdown", |b| { + let vortex_table_no_pushdown = vortex_table_no_pushdown.clone(); + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + black_box( + session + .read_table(vortex_table_no_pushdown.clone()) + .unwrap(), + ) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); +} + +criterion_group!(benches, bench_datafusion); +criterion_main!(benches); diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index e52a819688..b02c63f273 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -45,7 +45,7 @@ pub struct VortexMemTableOptions { } impl VortexMemTableOptions { - pub fn with_disable_pushdown(&mut self, disable_pushdown: bool) -> &mut Self { + pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self { self.disable_pushdown = disable_pushdown; self } @@ -83,7 +83,7 @@ impl SessionContextExt for SessionContext { /// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as /// a table to DataFusion. #[derive(Debug, Clone)] -pub(crate) struct VortexMemTable { +pub struct VortexMemTable { array: Array, schema_ref: SchemaRef, options: VortexMemTableOptions, @@ -297,6 +297,8 @@ fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult | Expr::IsFalse(_) | Expr::IsNotTrue(_) | Expr::IsNotFalse(_) + | Expr::Column(_) + | Expr::Literal(_) // TODO(aduffy): ensure that cast can be pushed down. | Expr::Cast(_) => true, _ => false, @@ -374,17 +376,20 @@ fn execute_unfiltered( array: &Array, projection: &Vec, ) -> DFResult { + println!("EXECUTE_UNFILTERED"); // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. let struct_array = array .clone() .into_struct() .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + println!("PROJECTION: {:?}", projection); let projected_struct = struct_array .project(projection.as_slice()) .map_err(|vortex_err| { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; + println!("PROJECTED SCHEMA: {:?}", projected_struct.dtype()); let batch = RecordBatch::from( projected_struct .into_canonical() @@ -463,6 +468,7 @@ impl ExecutionPlan for VortexScanExec { partition: usize, _context: Arc, ) -> DFResult { + println!("EXECUTE VortexScanExec"); let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array .chunk(partition) @@ -485,13 +491,12 @@ mod test { use vortex::array::struct_::StructArray; use vortex::array::varbin::VarBinArray; use vortex::validity::Validity; - use vortex::IntoArray; + use vortex::{Array, IntoArray}; use vortex_dtype::{DType, Nullability}; - use crate::SessionContextExt; + use crate::{SessionContextExt, VortexMemTableOptions}; - #[tokio::test] - async fn test_datafusion_simple() { + fn presidents_array() -> Array { let names = VarBinArray::from_vec( vec![ "Washington", @@ -508,15 +513,53 @@ mod test { Validity::NonNullable, ); - let presidents = StructArray::from_fields(&[ + StructArray::from_fields(&[ ("president", names.into_array()), ("term_start", term_start.into_array()), ]) - .into_array(); + .into_array() + } + + #[tokio::test] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[tokio::test] + async fn test_datafusion_no_pushdown() { let ctx = SessionContext::new(); - let df = ctx.read_vortex(presidents).unwrap(); + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(); let distinct_names = df .filter(col("term_start").gt_eq(lit(1795))) diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index f4906afe6a..0b4c2f8f53 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -9,8 +9,9 @@ use std::task::{Context, Poll}; use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; -use arrow_array::{ArrayRef, RecordBatch, UInt64Array}; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::compute::cast; use datafusion::prelude::SessionContext; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; @@ -70,7 +71,9 @@ impl RowSelectorExec { impl Debug for RowSelectorExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RowSelectorExec").finish() + f.debug_struct("RowSelectorExec") + .field("filter_exprs", &self.filter_exprs) + .finish() } } @@ -102,7 +105,7 @@ impl ExecutionPlan for RowSelectorExec { self: Arc, _children: Vec>, ) -> DFResult> { - panic!("with_new_children not supported for RowSelectorExec") + Ok(self) } fn execute( @@ -171,13 +174,11 @@ where type Item = DFResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - println!("BEGIN poll_next"); let this = self.project(); // If we have already polled the one-shot future with the filter records, indicate // that the stream has finished. if *this.polled_inner { - println!("EXIT"); return Poll::Ready(None); } @@ -186,7 +187,6 @@ where // initial batch for us to process. // // We want to avoid ever calling it again. - println!("POLL record_batch"); let record_batch = ready!(this.inner.poll(cx))?; *this.polled_inner = true; @@ -195,7 +195,6 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - println!("CREATE session"); let session = SessionContext::new(); let df_schema = DFSchema::try_from(this.schema_ref.clone())?; let physical_expr = @@ -216,7 +215,6 @@ where let indices: ArrayRef = Arc::new(UInt64Array::from(selection_indices)); let indices_batch = RecordBatch::try_new(ROW_SELECTOR_SCHEMA_REF.clone(), vec![indices])?; - println!("RETURNING Poll::Ready"); Poll::Ready(Some(Ok(indices_batch))) } } @@ -274,7 +272,10 @@ impl TakeRowsExec { impl Debug for TakeRowsExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Take").finish() + f.debug_struct("TakeRowsExec") + .field("projection", &self.projection) + .field("output_schema", &self.output_schema) + .finish() } } @@ -301,7 +302,7 @@ impl ExecutionPlan for TakeRowsExec { self: Arc, _children: Vec>, ) -> DFResult> { - panic!("unsupported with_new_children for {:?}", &self) + Ok(self) } fn execute( @@ -374,22 +375,36 @@ where ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) .into_array(); + // If no columns in the output projection, we send back a RecordBatch with empty schema. + // This is common for COUNT queries. + if this.output_projection.is_empty() { + let opts = RecordBatchOptions::new().with_row_count(Some(row_indices.len())); + return Poll::Ready(Some(Ok(RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &opts, + ) + .unwrap()))); + } + // Assemble the output columns using the row indices. // NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary. let mut columns = Vec::new(); - for field_idx in this.output_projection { - let encoded = this.vortex_array.field(*field_idx).unwrap(); + for (output_idx, src_idx) in this.output_projection.iter().enumerate() { + let encoded = this.vortex_array.field(*src_idx).expect("field access"); let decoded = take(&encoded, &row_indices) - .unwrap() + .expect("take") .into_canonical() - .unwrap() + .expect("into_canonical") .into_arrow(); + let data_type = this.output_schema.field(output_idx).data_type(); - columns.push(decoded); + columns.push(cast(&decoded, data_type).expect("cast")); } - // Send back a single record batch of the decoded data - let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns).unwrap(); + // Send back a single record batch of the decoded data. + let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns) + .expect("RecordBatch::try_new"); Poll::Ready(Some(Ok(output_batch))) }