From e5d82a82f07c2f726b5ef88628ba82ffd4effd14 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 13:14:25 -0400 Subject: [PATCH] fix bug in varbin_to_arrow --- bench-vortex/benches/datafusion_benchmark.rs | 5 +- bench-vortex/src/bin/debuggable_bench.rs | 100 ++++++++++++++++++ vortex-array/src/array/bool/accessors.rs | 2 - vortex-array/src/array/varbin/compute/take.rs | 2 +- vortex-array/src/canonical.rs | 3 +- vortex-array/src/stats/statsset.rs | 30 ------ 6 files changed, 104 insertions(+), 38 deletions(-) create mode 100644 bench-vortex/src/bin/debuggable_bench.rs diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 3e683d8544..73c3ced0e0 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -8,9 +8,8 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Crit use datafusion::common::Result as DFResult; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::memory_pool::human_readable_size; -use datafusion::functions_aggregate::expr_fn::sum; use datafusion::logical_expr::lit; -use datafusion::prelude::{col, DataFrame, SessionContext}; +use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext}; use lazy_static::lazy_static; use vortex::compress::Compressor; use vortex::encoding::EncodingRef; @@ -87,7 +86,7 @@ fn filter_agg_query(df: DataFrame) -> DFResult { // SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000 df.filter(col("scores").gt_eq(lit(3_000)))? .filter(col("scores").lt_eq(lit(4_000)))? - .aggregate(vec![], vec![sum(col("scores"))]) + .aggregate(vec![], vec![count_distinct(col("names"))]) } fn measure_provider( diff --git a/bench-vortex/src/bin/debuggable_bench.rs b/bench-vortex/src/bin/debuggable_bench.rs new file mode 100644 index 0000000000..6c8a197019 --- /dev/null +++ b/bench-vortex/src/bin/debuggable_bench.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use arrow_array::builder::{StringBuilder, UInt32Builder}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::common::Result as DFResult; +use datafusion::execution::memory_pool::human_readable_size; +use datafusion::functions_aggregate::expr_fn::sum; +use datafusion::prelude::{col, lit, DataFrame, SessionContext}; +use lazy_static::lazy_static; +use vortex::compress::Compressor; +use vortex::encoding::EncodingRef; +use vortex::{Array, Context, IntoArray, ToArrayData}; +use vortex_datafusion::SessionContextExt; +use vortex_dict::DictEncoding; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; + +lazy_static! { + pub static ref CTX: Context = Context::default().with_encodings([ + &BitPackedEncoding as EncodingRef, + &DictEncoding, + &FoREncoding, + &DeltaEncoding, + ]); +} + +fn toy_dataset_arrow() -> RecordBatch { + // 64,000 rows of string and numeric data. + // 8,000 values of first string, second string, third string, etc. + + let names = [ + "Alexander", + "Anastasia", + "Archibald", + "Bartholomew", + "Benjamin", + "Christopher", + "Elizabeth", + "Gabriella", + ]; + + let mut col1 = StringBuilder::with_capacity(640_000, 64_000_000); + let mut col2 = UInt32Builder::with_capacity(640_000); + for i in 0..640_000 { + col1.append_value(names[i % 8]); + col2.append_value(u32::try_from(i).unwrap()); + } + + let col1 = col1.finish(); + let col2 = col2.finish(); + + RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("names", DataType::Utf8, false), + Field::new("scores", DataType::UInt32, false), + ])), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap() +} + +fn toy_dataset_vortex(compress: bool) -> Array { + let uncompressed = toy_dataset_arrow().to_array_data().into_array(); + + if !compress { + return uncompressed; + } + + println!( + "uncompressed size: {:?}", + human_readable_size(uncompressed.nbytes()) + ); + let compressor = Compressor::new(&CTX); + let compressed = compressor.compress(&uncompressed, None).unwrap(); + println!( + "vortex compressed size: {:?}", + human_readable_size(compressed.nbytes()) + ); + compressed +} + +fn filter_agg_query(df: DataFrame) -> DFResult { + // SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000 + df.filter(col("scores").gt_eq(lit(3_000)))? + .filter(col("scores").lt_eq(lit(4_000)))? + .aggregate(vec![col("names")], vec![sum(col("scores"))]) +} + +#[tokio::main] +async fn main() -> DFResult<()> { + let context = SessionContext::new(); + + let vortex_array = toy_dataset_vortex(true); + + let df = context.read_vortex(vortex_array)?; + let result = filter_agg_query(df)?.collect().await?; + println!("result: {:?}", result); + + Ok(()) +} diff --git a/vortex-array/src/array/bool/accessors.rs b/vortex-array/src/array/bool/accessors.rs index e35b25f8bf..6c464f1cff 100644 --- a/vortex-array/src/array/bool/accessors.rs +++ b/vortex-array/src/array/bool/accessors.rs @@ -1,4 +1,3 @@ -use itertools::Itertools; use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; @@ -22,7 +21,6 @@ impl ArrayAccessor for BoolArray { Validity::AllInvalid => Ok(f(&mut (0..self.len()).map(|_| None))), Validity::Array(valid) => { let valids = valid.into_bool()?.boolean_buffer(); - println!("nulls: {:?}", valids.iter().collect_vec()); let mut iter = valids.iter().zip(bools.iter()).map(|(is_valid, value)| { if is_valid { Some(if value { &TRUE } else { &FALSE }) diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs index 16bdf8fb21..2ce2b576f1 100644 --- a/vortex-array/src/array/varbin/compute/take.rs +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -49,7 +49,7 @@ fn take( return Ok(take_nullable(dtype, offsets, data, indices, v)); } - let mut builder = VarBinBuilder::::with_capacity(indices.len()); + let mut builder = VarBinBuilder::::with_capacity(indices.len()); for &idx in indices { let idx = idx.to_usize().unwrap(); let start = offsets[idx].to_usize().unwrap(); diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 6fbe21b251..2d737243fb 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -209,8 +209,7 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { let offsets = varbin_array .offsets() - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"); let offsets = match offsets.ptype() { PType::I32 | PType::I64 => offsets, diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index a2f9e85748..cc5dec36d5 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -27,36 +27,6 @@ impl StatsSet { } } - // pub fn constant(len: usize, scalar: &Scalar) -> Self { - // let mut stats = HashMap::from([ - // (Stat::Max, scalar.clone()), - // (Stat::Min, scalar.clone()), - // (Stat::IsConstant, true.into()), - // (Stat::IsSorted, true.into()), - // (Stat::RunCount, 1.into()), - // ]); - // - // match scalar.dtype() { - // DType::Bool(_) => { - // stats.insert(Stat::TrueCount, 0.into()); - // } - // DType::Primitive(ptype, _) => { - // ptype.byte_width(); - // stats.insert( - // Stat::BitWidthFreq, - // vec![0; ptype.byte_width() * 8 + 1].into(), - // ); - // stats.insert( - // Stat::TrailingZeroFreq, - // vec![ptype.byte_width() * 8; ptype.byte_width() * 8 + 1].into(), - // ); - // } - // _ => {} - // } - // - // Self::from(stats) - // } - /// Specialized constructor for the case where the StatsSet represents /// an array consisting entirely of [null](vortex_dtype::DType::Null) values. pub fn nulls(len: usize, dtype: &DType) -> Self {