Skip to content

Commit

Permalink
fix bug in varbin_to_arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 26, 2024
1 parent f2f6c26 commit e5d82a8
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 38 deletions.
5 changes: 2 additions & 3 deletions bench-vortex/benches/datafusion_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Crit
use datafusion::common::Result as DFResult;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::memory_pool::human_readable_size;
use datafusion::functions_aggregate::expr_fn::sum;
use datafusion::logical_expr::lit;
use datafusion::prelude::{col, DataFrame, SessionContext};
use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::Compressor;
use vortex::encoding::EncodingRef;
Expand Down Expand Up @@ -87,7 +86,7 @@ fn filter_agg_query(df: DataFrame) -> DFResult<DataFrame> {
// SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000
df.filter(col("scores").gt_eq(lit(3_000)))?
.filter(col("scores").lt_eq(lit(4_000)))?
.aggregate(vec![], vec![sum(col("scores"))])
.aggregate(vec![], vec![count_distinct(col("names"))])
}

fn measure_provider<M: Measurement>(
Expand Down
100 changes: 100 additions & 0 deletions bench-vortex/src/bin/debuggable_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::Arc;

use arrow_array::builder::{StringBuilder, UInt32Builder};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use datafusion::common::Result as DFResult;
use datafusion::execution::memory_pool::human_readable_size;
use datafusion::functions_aggregate::expr_fn::sum;
use datafusion::prelude::{col, lit, DataFrame, SessionContext};
use lazy_static::lazy_static;
use vortex::compress::Compressor;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray, ToArrayData};
use vortex_datafusion::SessionContextExt;
use vortex_dict::DictEncoding;
use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding};

lazy_static! {
pub static ref CTX: Context = Context::default().with_encodings([
&BitPackedEncoding as EncodingRef,
&DictEncoding,
&FoREncoding,
&DeltaEncoding,
]);
}

fn toy_dataset_arrow() -> RecordBatch {
// 64,000 rows of string and numeric data.
// 8,000 values of first string, second string, third string, etc.

let names = [
"Alexander",
"Anastasia",
"Archibald",
"Bartholomew",
"Benjamin",
"Christopher",
"Elizabeth",
"Gabriella",
];

let mut col1 = StringBuilder::with_capacity(640_000, 64_000_000);
let mut col2 = UInt32Builder::with_capacity(640_000);
for i in 0..640_000 {
col1.append_value(names[i % 8]);
col2.append_value(u32::try_from(i).unwrap());
}

let col1 = col1.finish();
let col2 = col2.finish();

RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("names", DataType::Utf8, false),
Field::new("scores", DataType::UInt32, false),
])),
vec![Arc::new(col1), Arc::new(col2)],
)
.unwrap()
}

fn toy_dataset_vortex(compress: bool) -> Array {
let uncompressed = toy_dataset_arrow().to_array_data().into_array();

if !compress {
return uncompressed;
}

println!(
"uncompressed size: {:?}",
human_readable_size(uncompressed.nbytes())
);
let compressor = Compressor::new(&CTX);
let compressed = compressor.compress(&uncompressed, None).unwrap();
println!(
"vortex compressed size: {:?}",
human_readable_size(compressed.nbytes())
);
compressed
}

fn filter_agg_query(df: DataFrame) -> DFResult<DataFrame> {
// SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000
df.filter(col("scores").gt_eq(lit(3_000)))?
.filter(col("scores").lt_eq(lit(4_000)))?
.aggregate(vec![col("names")], vec![sum(col("scores"))])
}

#[tokio::main]
async fn main() -> DFResult<()> {
let context = SessionContext::new();

let vortex_array = toy_dataset_vortex(true);

let df = context.read_vortex(vortex_array)?;
let result = filter_agg_query(df)?.collect().await?;
println!("result: {:?}", result);

Ok(())
}
2 changes: 0 additions & 2 deletions vortex-array/src/array/bool/accessors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use itertools::Itertools;
use vortex_error::VortexResult;

use crate::accessor::ArrayAccessor;
Expand All @@ -22,7 +21,6 @@ impl ArrayAccessor<bool> for BoolArray {
Validity::AllInvalid => Ok(f(&mut (0..self.len()).map(|_| None))),
Validity::Array(valid) => {
let valids = valid.into_bool()?.boolean_buffer();
println!("nulls: {:?}", valids.iter().collect_vec());
let mut iter = valids.iter().zip(bools.iter()).map(|(is_valid, value)| {
if is_valid {
Some(if value { &TRUE } else { &FALSE })
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn take<I: NativePType, O: NativePType>(
return Ok(take_nullable(dtype, offsets, data, indices, v));
}

let mut builder = VarBinBuilder::<I>::with_capacity(indices.len());
let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
for &idx in indices {
let idx = idx.to_usize().unwrap();
let start = offsets[idx].to_usize().unwrap();
Expand Down
3 changes: 1 addition & 2 deletions vortex-array/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef {
fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef {
let offsets = varbin_array
.offsets()
.into_canonical()
.and_then(Canonical::into_primitive)
.into_primitive()
.expect("flatten_primitive");
let offsets = match offsets.ptype() {
PType::I32 | PType::I64 => offsets,
Expand Down
30 changes: 0 additions & 30 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,6 @@ impl StatsSet {
}
}

// pub fn constant(len: usize, scalar: &Scalar) -> Self {
// let mut stats = HashMap::from([
// (Stat::Max, scalar.clone()),
// (Stat::Min, scalar.clone()),
// (Stat::IsConstant, true.into()),
// (Stat::IsSorted, true.into()),
// (Stat::RunCount, 1.into()),
// ]);
//
// match scalar.dtype() {
// DType::Bool(_) => {
// stats.insert(Stat::TrueCount, 0.into());
// }
// DType::Primitive(ptype, _) => {
// ptype.byte_width();
// stats.insert(
// Stat::BitWidthFreq,
// vec![0; ptype.byte_width() * 8 + 1].into(),
// );
// stats.insert(
// Stat::TrailingZeroFreq,
// vec![ptype.byte_width() * 8; ptype.byte_width() * 8 + 1].into(),
// );
// }
// _ => {}
// }
//
// Self::from(stats)
// }

/// Specialized constructor for the case where the StatsSet represents
/// an array consisting entirely of [null](vortex_dtype::DType::Null) values.
pub fn nulls(len: usize, dtype: &DType) -> Self {
Expand Down

0 comments on commit e5d82a8

Please sign in to comment.