diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b853df19dc..f7dca30f17 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -39,6 +39,8 @@ jobs: run: cargo fmt --all --check - name: Rust Lint - Clippy run: cargo clippy --all-features --all-targets + - name: Docs + run: cargo doc --no-deps - name: Rust Test run: cargo test --workspace --all-features diff --git a/Cargo.lock b/Cargo.lock index 535df48fcc..4ad105a453 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3730,9 +3730,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" dependencies = [ "backtrace", "bytes", diff --git a/bench-vortex/benches/tpch_benchmark.rs b/bench-vortex/benches/tpch_benchmark.rs index d49e76504a..c08f76d8d8 100644 --- a/bench-vortex/benches/tpch_benchmark.rs +++ b/bench-vortex/benches/tpch_benchmark.rs @@ -1,5 +1,5 @@ use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use bench_vortex::tpch::{load_datasets, Format}; +use bench_vortex::tpch::{load_datasets, tpch_queries, Format}; use criterion::{criterion_group, criterion_main, Criterion}; use tokio::runtime::Builder; @@ -9,11 +9,11 @@ fn benchmark(c: &mut Criterion) { // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); - let vortex_no_pushdown_ctx = runtime + let vortex_pushdown_disabled_ctx = runtime .block_on(load_datasets( &data_dir, Format::Vortex { - disable_pushdown: false, + disable_pushdown: true, }, )) .unwrap(); @@ -21,30 +21,24 @@ fn benchmark(c: &mut Criterion) { .block_on(load_datasets( &data_dir, Format::Vortex { - disable_pushdown: true, + disable_pushdown: false, }, )) .unwrap(); - let csv_ctx = runtime - .block_on(load_datasets(&data_dir, Format::Csv)) - .unwrap(); let arrow_ctx = runtime .block_on(load_datasets(&data_dir, Format::Arrow)) .unwrap(); + let parquet_ctx = runtime + .block_on(load_datasets(&data_dir, Format::Parquet)) + .unwrap(); - for q in 1..=22 { - if q == 15 { - // DataFusion does not support query 15 since it has multiple SQL statements. - } - - let query = bench_vortex::tpch::tpch_query(q); - + for (q, query) in tpch_queries() { let mut group = c.benchmark_group(format!("tpch_q{q}")); group.sample_size(10); - group.bench_function("vortex-pushdown", |b| { + group.bench_function("vortex-pushdown-disabled", |b| { b.to_async(&runtime).iter(|| async { - vortex_ctx + vortex_pushdown_disabled_ctx .sql(&query) .await .unwrap() @@ -54,9 +48,9 @@ fn benchmark(c: &mut Criterion) { }) }); - group.bench_function("vortex-nopushdown", |b| { + group.bench_function("vortex-pushdown-enabled", |b| { b.to_async(&runtime).iter(|| async { - vortex_no_pushdown_ctx + vortex_ctx .sql(&query) .await .unwrap() @@ -66,11 +60,6 @@ fn benchmark(c: &mut Criterion) { }) }); - group.bench_function("csv", |b| { - b.to_async(&runtime) - .iter(|| async { csv_ctx.sql(&query).await.unwrap().collect().await.unwrap() }) - }); - group.bench_function("arrow", |b| { b.to_async(&runtime).iter(|| async { arrow_ctx @@ -82,6 +71,18 @@ fn benchmark(c: &mut Criterion) { .unwrap() }) }); + + group.bench_function("parquet", |b| { + b.to_async(&runtime).iter(|| async { + parquet_ctx + .sql(&query) + .await + .unwrap() + .collect() + .await + .unwrap() + }) + }); } } diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index e250a100cc..e21e255e9d 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -1,7 +1,8 @@ +use std::sync; use std::time::SystemTime; use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions}; -use bench_vortex::tpch::{load_datasets, tpch_query, Format}; +use bench_vortex::tpch::{load_datasets, tpch_queries, Format}; use futures::future::join_all; use indicatif::ProgressBar; use itertools::Itertools; @@ -18,7 +19,7 @@ async fn main() { // The formats to run against (vs the baseline) let formats = [ Format::Arrow, - Format::Csv, + Format::Parquet, Format::Vortex { disable_pushdown: false, }, @@ -40,57 +41,106 @@ async fn main() { // Set up a results table let mut table = Table::new(); - let mut cells = vec![Cell::new("Query")]; - cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f)))); - table.add_row(Row::new(cells)); + { + let mut cells = vec![Cell::new("Query")]; + cells.extend(formats.iter().map(|f| Cell::new(&format!("{:?}", f)))); + table.add_row(Row::new(cells)); + } // Setup a progress bar - let progress = ProgressBar::new(22 * formats.len() as u64); - - for i in 1..=22 { - // Skip query 15 as it is not supported by DataFusion - if i == 15 { - continue; - } + let progress = ProgressBar::new(21 * formats.len() as u64); - let query = tpch_query(i); - let mut cells = Vec::with_capacity(formats.len()); - cells.push(Cell::new(&format!("Q{}", i))); + // Send back a channel with the results of Row. + let (rows_tx, rows_rx) = sync::mpsc::channel(); + for (q, query) in tpch_queries() { + let _ctxs = ctxs.clone(); + let _tx = rows_tx.clone(); + let _progress = progress.clone(); + rayon::spawn_fifo(move || { + let mut cells = Vec::with_capacity(formats.len()); + cells.push(Cell::new(&format!("Q{}", q))); - let mut elapsed_us = Vec::new(); - for (ctx, format) in ctxs.iter().zip(formats.iter()) { - let start = SystemTime::now(); - ctx.sql(&query) - .await - .map_err(|e| println!("Failed to run {} {:?}: {}", i, format, e)) - .unwrap() - .collect() - .await - .map_err(|e| println!("Failed to collect {} {:?}: {}", i, format, e)) + let mut elapsed_us = Vec::new(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() .unwrap(); - let elapsed = start.elapsed().unwrap(); - elapsed_us.push(elapsed); - progress.inc(1); - } + for (ctx, format) in _ctxs.iter().zip(formats.iter()) { + for _ in 0..3 { + // warmup + rt.block_on(async { + ctx.sql(&query) + .await + .map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e)) + .unwrap() + .collect() + .await + .map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e)) + .unwrap(); + }) + } + let mut measure = Vec::new(); + for _ in 0..10 { + let start = SystemTime::now(); + rt.block_on(async { + ctx.sql(&query) + .await + .map_err(|e| println!("Failed to run {} {:?}: {}", q, format, e)) + .unwrap() + .collect() + .await + .map_err(|e| println!("Failed to collect {} {:?}: {}", q, format, e)) + .unwrap(); + }); + let elapsed = start.elapsed().unwrap(); + measure.push(elapsed); + } + let fastest = measure.iter().cloned().min().unwrap(); + elapsed_us.push(fastest); - let baseline = elapsed_us.first().unwrap(); - // yellow: 10% slower than baseline - let yellow = baseline.as_micros() + (baseline.as_micros() / 10); - // red: 50% slower than baseline - let red = baseline.as_micros() + (baseline.as_micros() / 50); - cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b")); - for measure in elapsed_us.iter().skip(1) { - let style_spec = if measure.as_micros() > red { - "bBr" - } else if measure.as_micros() > yellow { - "bFdBy" - } else { - "bFdBG" - }; - cells.push(Cell::new(&format!("{} us", measure.as_micros())).style_spec(style_spec)); - } - table.add_row(Row::new(cells)); + _progress.inc(1); + } + + let baseline = elapsed_us.first().unwrap(); + // yellow: 10% slower than baseline + let yellow = baseline.as_micros() + (baseline.as_micros() / 10); + // red: 50% slower than baseline + let red = baseline.as_micros() + (baseline.as_micros() / 2); + cells.push(Cell::new(&format!("{} us", baseline.as_micros())).style_spec("b")); + for measure in elapsed_us.iter().skip(1) { + let style_spec = if measure.as_micros() > red { + "bBr" + } else if measure.as_micros() > yellow { + "bFdBy" + } else { + "bFdBG" + }; + cells.push( + Cell::new(&format!( + "{} us ({:.2})", + measure.as_micros(), + measure.as_micros() as f64 / baseline.as_micros() as f64 + )) + .style_spec(style_spec), + ); + } + + _tx.send((q, Row::new(cells))).unwrap(); + }); } - progress.clone().finish(); + + // delete parent handle to tx + drop(rows_tx); + + let mut rows = vec![]; + while let Ok((idx, row)) = rows_rx.recv() { + rows.push((idx, row)); + } + rows.sort_by(|(idx0, _), (idx1, _)| idx0.cmp(idx1)); + for (_, row) in rows { + table.add_row(row); + } + + progress.finish(); table.printstd(); } diff --git a/bench-vortex/src/tpch/mod.rs b/bench-vortex/src/tpch/mod.rs index 2ad1ba3691..59c8914794 100644 --- a/bench-vortex/src/tpch/mod.rs +++ b/bench-vortex/src/tpch/mod.rs @@ -4,13 +4,17 @@ use std::sync::Arc; use arrow_array::StructArray; use arrow_schema::Schema; +use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; -use datafusion::prelude::{CsvReadOptions, SessionContext}; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext}; +use futures::executor::block_on; use vortex::array::chunked::ChunkedArray; use vortex::arrow::FromArrowArray; use vortex::{Array, ArrayDType, ArrayData, IntoArray}; use vortex_datafusion::{SessionContextExt, VortexMemTableOptions}; +use crate::idempotent; + pub mod dbgen; pub mod schema; @@ -18,6 +22,7 @@ pub mod schema; pub enum Format { Csv, Arrow, + Parquet, Vortex { disable_pushdown: bool }, } @@ -43,6 +48,9 @@ pub async fn load_datasets>( match format { Format::Csv => register_csv(&context, stringify!($name), &$name, $schema).await, Format::Arrow => register_arrow(&context, stringify!($name), &$name, $schema).await, + Format::Parquet => { + register_parquet(&context, stringify!($name), &$name, $schema).await + } Format::Vortex { disable_pushdown, .. } => { @@ -118,6 +126,46 @@ async fn register_arrow( Ok(()) } +async fn register_parquet( + session: &SessionContext, + name: &str, + file: &Path, + schema: &Schema, +) -> anyhow::Result<()> { + // Idempotent conversion from TPCH CSV to Parquet. + let pq_file = idempotent( + &file.with_extension("").with_extension("parquet"), + |pq_file| { + let df = block_on( + session.read_csv( + file.to_str().unwrap(), + CsvReadOptions::default() + .delimiter(b'|') + .has_header(false) + .file_extension("tbl") + .schema(schema), + ), + ) + .unwrap(); + + block_on(df.write_parquet( + pq_file.as_os_str().to_str().unwrap(), + DataFrameWriteOptions::default(), + None, + )) + }, + ) + .unwrap(); + + Ok(session + .register_parquet( + name, + pq_file.as_os_str().to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?) +} + async fn register_vortex( session: &SessionContext, name: &str, @@ -158,6 +206,15 @@ async fn register_vortex( Ok(()) } +pub fn tpch_queries() -> impl Iterator { + (1..=22) + .filter(|q| { + // Query 15 has multiple SQL statements so doesn't yet run in DataFusion. + *q != 15 + }) + .map(|q| (q, tpch_query(q))) +} + pub fn tpch_query(query_idx: usize) -> String { let manifest_dir = Path::new(env!("CARGO_MANIFEST_DIR")) .join("tpch") diff --git a/encodings/byte_bool/src/compute/mod.rs b/encodings/byte_bool/src/compute/mod.rs index 550c151c4c..977609fb51 100644 --- a/encodings/byte_bool/src/compute/mod.rs +++ b/encodings/byte_bool/src/compute/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_buffer::BooleanBuffer; use num_traits::AsPrimitive; use vortex::validity::Validity; -use vortex::ToArrayData; +use vortex::Array; use vortex::{ compute::{ compare::CompareFn, slice::SliceFn, take::TakeFn, unary::fill_forward::FillForwardFn, @@ -15,7 +15,7 @@ use vortex::{ validity::ArrayValidity, ArrayDType, ArrayData, IntoArray, }; -use vortex::{Array, IntoCanonical}; +use vortex::{IntoArrayVariant, ToArrayData}; use vortex_dtype::{match_each_integer_ptype, Nullability}; use vortex_error::{vortex_bail, VortexResult}; use vortex_expr::Operator; @@ -130,7 +130,7 @@ impl TakeFn for ByteBoolArray { impl CompareFn for ByteBoolArray { fn compare(&self, other: &Array, op: Operator) -> VortexResult { - let canonical = other.clone().into_canonical()?.into_bool()?; + let canonical = other.clone().into_bool()?; let lhs = BooleanBuffer::from(self.maybe_null_slice()); let rhs = canonical.boolean_buffer(); diff --git a/encodings/byte_bool/src/stats.rs b/encodings/byte_bool/src/stats.rs index d5d8ff81b5..925de0563a 100644 --- a/encodings/byte_bool/src/stats.rs +++ b/encodings/byte_bool/src/stats.rs @@ -1,6 +1,6 @@ use vortex::{ stats::{ArrayStatisticsCompute, Stat, StatsSet}, - AsArray, IntoCanonical, + AsArray, IntoArrayVariant, }; use vortex_error::VortexResult; @@ -13,7 +13,7 @@ impl ArrayStatisticsCompute for ByteBoolArray { } // TODO(adamgs): This is slightly wasteful and could be optimized in the future - let bools = self.as_array_ref().clone().into_canonical()?.into_bool()?; + let bools = self.as_array_ref().clone().into_bool()?; bools.compute_statistics(stat) } } diff --git a/encodings/datetime-parts/src/compress.rs b/encodings/datetime-parts/src/compress.rs index 163cd52de4..c275100062 100644 --- a/encodings/datetime-parts/src/compress.rs +++ b/encodings/datetime-parts/src/compress.rs @@ -1,14 +1,12 @@ use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; use vortex::compute::unary::cast::try_cast; -use vortex::{Array, IntoArray, IntoCanonical}; +use vortex::{Array, IntoArray, IntoArrayVariant}; use vortex_dtype::PType; use vortex_error::VortexResult; pub fn compress_localdatetime(array: LocalDateTimeArray) -> VortexResult<(Array, Array, Array)> { - let timestamps = try_cast(&array.timestamps(), PType::I64.into())? - .into_canonical()? - .into_primitive()?; + let timestamps = try_cast(&array.timestamps(), PType::I64.into())?.into_primitive()?; let divisor = match array.time_unit() { TimeUnit::Ns => 1_000_000_000, diff --git a/encodings/dict/src/compute.rs b/encodings/dict/src/compute.rs index 2cb8de7c63..867ba87227 100644 --- a/encodings/dict/src/compute.rs +++ b/encodings/dict/src/compute.rs @@ -50,7 +50,7 @@ impl SliceFn for DictArray { mod test { use vortex::array::primitive::PrimitiveArray; use vortex::array::varbin::VarBinArray; - use vortex::{IntoArray, IntoCanonical, ToArray}; + use vortex::{IntoArray, IntoArrayVariant, ToArray}; use vortex_dtype::{DType, Nullability}; use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray}; @@ -67,12 +67,7 @@ mod test { ]); let (codes, values) = dict_encode_typed_primitive::(&reference); let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap(); - let flattened_dict = dict - .to_array() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap(); + let flattened_dict = dict.to_array().into_primitive().unwrap(); assert_eq!(flattened_dict.buffer(), reference.buffer()); } @@ -84,43 +79,14 @@ mod test { ); let (codes, values) = dict_encode_varbin(&reference); let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap(); - let flattened_dict = dict - .to_array() - .into_canonical() - .unwrap() - .into_varbin() - .unwrap(); + let flattened_dict = dict.to_array().into_varbin().unwrap(); assert_eq!( - flattened_dict - .offsets() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .buffer(), - reference - .offsets() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .buffer() + flattened_dict.offsets().into_primitive().unwrap().buffer(), + reference.offsets().into_primitive().unwrap().buffer() ); assert_eq!( - flattened_dict - .bytes() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .buffer(), - reference - .bytes() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .buffer() + flattened_dict.bytes().into_primitive().unwrap().buffer(), + reference.bytes().into_primitive().unwrap().buffer() ); } } diff --git a/encodings/dict/src/dict.rs b/encodings/dict/src/dict.rs index bce7a116bb..6b9d304910 100644 --- a/encodings/dict/src/dict.rs +++ b/encodings/dict/src/dict.rs @@ -5,7 +5,7 @@ use vortex::compute::take::take; use vortex::compute::unary::scalar_at::scalar_at; use vortex::validity::{ArrayValidity, LogicalValidity}; use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use vortex::{impl_encoding, ArrayDType, Canonical, IntoCanonical}; +use vortex::{impl_encoding, ArrayDType, Canonical, IntoArrayVariant, IntoCanonical}; use vortex_dtype::match_each_integer_ptype; use vortex_error::vortex_bail; @@ -69,12 +69,7 @@ impl ArrayValidity for DictArray { fn logical_validity(&self) -> LogicalValidity { if self.dtype().is_nullable() { - let primitive_codes = self - .codes() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap(); + let primitive_codes = self.codes().into_primitive().unwrap(); match_each_integer_ptype!(primitive_codes.ptype(), |$P| { ArrayAccessor::<$P>::with_iterator(&primitive_codes, |iter| { LogicalValidity::Array( diff --git a/encodings/dict/src/lib.rs b/encodings/dict/src/lib.rs index 26bd4b80ca..e2f9228cbf 100644 --- a/encodings/dict/src/lib.rs +++ b/encodings/dict/src/lib.rs @@ -1,7 +1,7 @@ //! Implementation of Dictionary encoding. //! //! Expose a [DictArray] which is zero-copy equivalent to Arrow's -//! [arrow_array::array::DictionaryArray] type. +//! [DictionaryArray](https://docs.rs/arrow/latest/arrow/array/struct.DictionaryArray.html). pub use compress::*; pub use dict::*; diff --git a/encodings/fastlanes/src/bitpacking/mod.rs b/encodings/fastlanes/src/bitpacking/mod.rs index c1cea11d38..a0129bdb93 100644 --- a/encodings/fastlanes/src/bitpacking/mod.rs +++ b/encodings/fastlanes/src/bitpacking/mod.rs @@ -204,7 +204,7 @@ impl ArrayTrait for BitPackedArray { #[cfg(test)] mod test { use vortex::array::primitive::PrimitiveArray; - use vortex::{IntoArray, IntoCanonical}; + use vortex::{IntoArray, IntoArrayVariant}; use crate::BitPackedArray; @@ -216,8 +216,6 @@ mod test { let expected = &[1, 0, 1, 0, 1, 0, u64::MAX]; let results = packed .into_array() - .into_canonical() - .unwrap() .into_primitive() .unwrap() .maybe_null_slice::() diff --git a/encodings/runend/src/default/runend.rs b/encodings/runend/src/default/runend.rs index 9bfaa2dab1..5cd25fa4a1 100644 --- a/encodings/runend/src/default/runend.rs +++ b/encodings/runend/src/default/runend.rs @@ -130,7 +130,7 @@ mod test { use vortex::compute::slice::slice; use vortex::compute::unary::scalar_at::scalar_at; use vortex::validity::Validity; - use vortex::{ArrayDType, IntoArray, IntoCanonical}; + use vortex::{ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::{DType, Nullability, PType}; use crate::default::RunEndArray; @@ -179,11 +179,7 @@ mod test { assert_eq!(arr.len(), 5); assert_eq!( - arr.into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .maybe_null_slice::(), + arr.into_primitive().unwrap().maybe_null_slice::(), vec![2, 2, 3, 3, 3] ); } @@ -198,11 +194,7 @@ mod test { .unwrap(); assert_eq!( - arr.into_canonical() - .unwrap() - .into_primitive() - .unwrap() - .maybe_null_slice::(), + arr.into_primitive().unwrap().maybe_null_slice::(), vec![1, 1, 2, 2, 2, 3, 3, 3, 3, 3] ); } diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index e0e7533df9..1cdf5aabd3 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -115,7 +115,7 @@ fn take_strict_sorted(chunked: &ChunkedArray, indices: &Array) -> VortexResult(), &[1, 1, 1, 2]); diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index d56b24e49c..e60b67805c 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -167,7 +167,7 @@ mod test { use crate::array::chunked::ChunkedArray; use crate::compute::slice::slice; use crate::compute::unary::scalar_subtract::subtract_scalar; - use crate::{Array, IntoArray, IntoArrayVariant, IntoCanonical, ToArray}; + use crate::{Array, IntoArray, IntoArrayVariant, ToArray}; fn chunked_array() -> ChunkedArray { ChunkedArray::try_new( @@ -231,8 +231,6 @@ mod test { let results = chunks_out .next() .unwrap() - .into_canonical() - .unwrap() .into_primitive() .unwrap() .maybe_null_slice::() @@ -241,8 +239,6 @@ mod test { let results = chunks_out .next() .unwrap() - .into_canonical() - .unwrap() .into_primitive() .unwrap() .maybe_null_slice::() @@ -251,8 +247,6 @@ mod test { let results = chunks_out .next() .unwrap() - .into_canonical() - .unwrap() .into_primitive() .unwrap() .maybe_null_slice::() diff --git a/vortex-array/src/array/primitive/compute/filter_indices.rs b/vortex-array/src/array/primitive/compute/filter_indices.rs index b6f2c91552..910a195344 100644 --- a/vortex-array/src/array/primitive/compute/filter_indices.rs +++ b/vortex-array/src/array/primitive/compute/filter_indices.rs @@ -73,7 +73,7 @@ mod test { use super::*; use crate::validity::Validity; - use crate::IntoCanonical; + use crate::IntoArrayVariant; fn apply_conjunctive_filter(arr: &PrimitiveArray, conj: Conjunction) -> VortexResult { arr.filter_indices(&Disjunction::from_iter([conj])) @@ -107,8 +107,6 @@ mod test { let field = FieldPath::root(); let filtered_primitive = apply_conjunctive_filter(&arr, Conjunction::from(field.lt(lit(5u32)))) - .unwrap() - .into_canonical() .unwrap() .into_bool() .unwrap(); @@ -117,8 +115,6 @@ mod test { let filtered_primitive = apply_conjunctive_filter(&arr, Conjunction::from(field.gt(lit(5u32)))) - .unwrap() - .into_canonical() .unwrap() .into_bool() .unwrap(); @@ -127,8 +123,6 @@ mod test { let filtered_primitive = apply_conjunctive_filter(&arr, Conjunction::from(field.equal(lit(5u32)))) - .unwrap() - .into_canonical() .unwrap() .into_bool() .unwrap(); @@ -137,8 +131,6 @@ mod test { let filtered_primitive = apply_conjunctive_filter(&arr, Conjunction::from(field.gte(lit(5u32)))) - .unwrap() - .into_canonical() .unwrap() .into_bool() .unwrap(); @@ -147,8 +139,6 @@ mod test { let filtered_primitive = apply_conjunctive_filter(&arr, Conjunction::from(field.lte(lit(5u32)))) - .unwrap() - .into_canonical() .unwrap() .into_bool() .unwrap(); @@ -166,8 +156,6 @@ mod test { Conjunction::from_iter([field.lt(lit(5u32)), field.gt(lit(2u32))]), ) .unwrap() - .into_canonical() - .unwrap() .into_bool() .unwrap(); let filtered = to_int_indices(filtered_primitive); @@ -184,8 +172,6 @@ mod test { Conjunction::from_iter([field.lt(lit(5u32)), field.gt(lit(5u32))]), ) .unwrap() - .into_canonical() - .unwrap() .into_bool() .unwrap(); let filtered = to_int_indices(filtered_primitive); @@ -202,13 +188,7 @@ mod test { let c2 = Conjunction::from(field.gt(lit(5u32))); let disj = Disjunction::from_iter([c1, c2]); - let filtered_primitive = arr - .filter_indices(&disj) - .unwrap() - .into_canonical() - .unwrap() - .into_bool() - .unwrap(); + let filtered_primitive = arr.filter_indices(&disj).unwrap().into_bool().unwrap(); let filtered = to_int_indices(filtered_primitive); assert_eq!(filtered, [0u64, 1, 2, 3, 5, 6, 7, 8, 9]) } diff --git a/vortex-array/src/array/primitive/compute/subtract_scalar.rs b/vortex-array/src/array/primitive/compute/subtract_scalar.rs index 871353699d..2509122df6 100644 --- a/vortex-array/src/array/primitive/compute/subtract_scalar.rs +++ b/vortex-array/src/array/primitive/compute/subtract_scalar.rs @@ -84,14 +84,12 @@ mod test { use crate::array::primitive::PrimitiveArray; use crate::compute::unary::scalar_subtract::subtract_scalar; - use crate::{IntoArray, IntoCanonical}; + use crate::{IntoArray, IntoArrayVariant}; #[test] fn test_scalar_subtract_unsigned() { let values = vec![1u16, 2, 3].into_array(); let results = subtract_scalar(&values, &1u16.into()) - .unwrap() - .into_canonical() .unwrap() .into_primitive() .unwrap() @@ -104,8 +102,6 @@ mod test { fn test_scalar_subtract_signed() { let values = vec![1i64, 2, 3].into_array(); let results = subtract_scalar(&values, &(-1i64).into()) - .unwrap() - .into_canonical() .unwrap() .into_primitive() .unwrap() @@ -119,8 +115,6 @@ mod test { let values = PrimitiveArray::from_nullable_vec(vec![Some(1u16), Some(2), None, Some(3)]) .into_array(); let flattened = subtract_scalar(&values, &Some(1u16).into()) - .unwrap() - .into_canonical() .unwrap() .into_primitive() .unwrap(); @@ -143,8 +137,6 @@ mod test { let values = vec![1.0f64, 2.0, 3.0].into_array(); let to_subtract = -1f64; let results = subtract_scalar(&values, &to_subtract.into()) - .unwrap() - .into_canonical() .unwrap() .into_primitive() .unwrap() diff --git a/vortex-array/src/array/sparse/flatten.rs b/vortex-array/src/array/sparse/flatten.rs index eff5645068..2ecde0a5b9 100644 --- a/vortex-array/src/array/sparse/flatten.rs +++ b/vortex-array/src/array/sparse/flatten.rs @@ -19,11 +19,7 @@ impl IntoCanonical for SparseArray { validity.append_n(self.len(), false); if matches!(self.dtype(), DType::Bool(_)) { - let values = self - .values() - .into_canonical()? - .into_bool()? - .boolean_buffer(); + let values = self.values().into_bool()?.boolean_buffer(); canonicalize_sparse_bools(values, &indices, self.len(), self.fill_value(), validity) } else { let values = self.values().into_primitive()?; diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 62aa6e2852..c1b1effe2f 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -9,7 +9,7 @@ use crate::compute::unary::scalar_at::scalar_at; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, IntoCanonical}; +use crate::{impl_encoding, ArrayDType, IntoArrayVariant}; mod compute; mod flatten; @@ -111,12 +111,7 @@ impl SparseArray { /// Return indices as a vector of usize with the indices_offset applied. pub fn resolved_indices(&self) -> Vec { - let flat_indices = self - .indices() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap(); + let flat_indices = self.indices().into_primitive().unwrap(); match_each_integer_ptype!(flat_indices.ptype(), |$P| { flat_indices .maybe_null_slice::<$P>() @@ -197,7 +192,7 @@ mod test { use crate::compute::slice::slice; use crate::compute::unary::cast::try_cast; use crate::compute::unary::scalar_at::scalar_at; - use crate::{Array, IntoArray, IntoCanonical}; + use crate::{Array, IntoArray, IntoArrayVariant}; fn nullable_fill() -> Scalar { Scalar::null(DType::Primitive(PType::I32, Nullable)) @@ -220,12 +215,7 @@ mod test { fn assert_sparse_array(sparse: &Array, values: &[Option]) { let sparse_arrow = ArrayAccessor::::with_iterator( - &sparse - .clone() - .into_canonical() - .unwrap() - .into_primitive() - .unwrap(), + &sparse.clone().into_primitive().unwrap(), |iter| iter.map(|v| v.cloned()).collect_vec(), ) .unwrap(); diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs index 8740ac0b48..9b1477f001 100644 --- a/vortex-array/src/array/varbinview/accessor.rs +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -4,7 +4,7 @@ use crate::accessor::ArrayAccessor; use crate::array::primitive::PrimitiveArray; use crate::array::varbinview::VarBinViewArray; use crate::validity::ArrayValidity; -use crate::{Canonical, IntoCanonical}; +use crate::IntoArrayVariant; impl ArrayAccessor<[u8]> for VarBinViewArray { fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( @@ -13,11 +13,7 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { ) -> VortexResult { let views = self.view_slice(); let bytes: Vec = (0..self.metadata().data_lens.len()) - .map(|i| { - self.bytes(i) - .into_canonical() - .and_then(Canonical::into_primitive) - }) + .map(|i| self.bytes(i).into_primitive()) .collect::>>()?; let validity = self.logical_validity().to_null_buffer()?; diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index f5e08f83c0..07c96c3d39 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -18,7 +18,7 @@ use crate::compute::slice::slice; use crate::validity::Validity; use crate::validity::{ArrayValidity, LogicalValidity, ValidityMetadata}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, ArrayData, Canonical, IntoCanonical}; +use crate::{impl_encoding, ArrayDType, ArrayData, Canonical, IntoArrayVariant, IntoCanonical}; mod accessor; mod builder; @@ -220,7 +220,6 @@ impl VarBinViewArray { view._ref.offset as usize, (view._ref.size + view._ref.offset) as usize, )? - .into_canonical()? .into_primitive()?; Ok(data_buf.maybe_null_slice::().to_vec()) } else { @@ -248,8 +247,6 @@ fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Views should be buffer of u8 let views = var_bin_view .views() - .into_canonical() - .expect("into_canonical") .into_primitive() .expect("views must be primitive"); assert_eq!(views.ptype(), PType::U8); @@ -259,12 +256,7 @@ fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { .expect("null buffer"); let data = (0..var_bin_view.metadata().data_lens.len()) - .map(|i| { - var_bin_view - .bytes(i) - .into_canonical() - .and_then(Canonical::into_primitive) - }) + .map(|i| var_bin_view.bytes(i).into_primitive()) .collect::>>() .expect("bytes arrays must be primitive"); if !data.is_empty() { diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index abb9cb1a28..b3b1f00c36 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -216,13 +216,11 @@ fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { // FIXME(ngates): do not copy offsets again PType::U64 => try_cast(&offsets.to_array(), PType::I64.into()) .expect("cast to i64") - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"), _ => try_cast(&offsets.to_array(), PType::I32.into()) .expect("cast to i32") - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"), }; let nulls = varbin_array @@ -232,8 +230,7 @@ fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { let data = varbin_array .bytes() - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"); assert_eq!(data.ptype(), PType::U8); let data = data.buffer(); @@ -285,8 +282,7 @@ fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayR // A LocalDateTime maps to an Arrow Timestamp array with no timezone. let timestamps = try_cast(&local_date_time_array.timestamps(), PType::I64.into()) .expect("timestamps must cast to i64") - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("must be i64 array"); let validity = timestamps .logical_validity() diff --git a/vortex-array/src/stream/take_rows.rs b/vortex-array/src/stream/take_rows.rs index 8057c91454..0cab47119f 100644 --- a/vortex-array/src/stream/take_rows.rs +++ b/vortex-array/src/stream/take_rows.rs @@ -13,8 +13,8 @@ use crate::compute::take::take; use crate::compute::unary::scalar_subtract::subtract_scalar; use crate::stats::{ArrayStatistics, Stat}; use crate::stream::ArrayStream; -use crate::{Array, ArrayDType}; -use crate::{IntoArray, IntoCanonical}; +use crate::IntoArray; +use crate::{Array, ArrayDType, IntoArrayVariant}; #[pin_project] pub struct TakeRows { @@ -91,9 +91,7 @@ impl Stream for TakeRows { // TODO(ngates): this is probably too heavy to run on the event loop. We should spawn // onto a worker pool. - let indices_for_batch = slice(this.indices, left, right)? - .into_canonical()? - .into_primitive()?; + let indices_for_batch = slice(this.indices, left, right)?.into_primitive()?; let shifted_arr = match_each_integer_ptype!(indices_for_batch.ptype(), |$T| { subtract_scalar(&indices_for_batch.into_array(), &Scalar::from(curr_offset as $T))? }); diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 638632972a..ae50395e8c 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -8,7 +8,7 @@ use crate::compute::slice::slice; use crate::compute::take::take; use crate::compute::unary::scalar_at::scalar_at; use crate::stats::ArrayStatistics; -use crate::{Array, Canonical, IntoArray, IntoArrayVariant, IntoCanonical}; +use crate::{Array, IntoArray, IntoArrayVariant}; pub trait ArrayValidity { fn is_valid(&self, index: usize) -> bool; @@ -144,18 +144,8 @@ impl PartialEq for Validity { (Self::AllValid, Self::AllValid) => true, (Self::AllInvalid, Self::AllInvalid) => true, (Self::Array(a), Self::Array(b)) => { - a.clone() - .into_canonical() - .unwrap() - .into_bool() - .unwrap() - .boolean_buffer() - == b.clone() - .into_canonical() - .unwrap() - .into_bool() - .unwrap() - .boolean_buffer() + a.clone().into_bool().unwrap().boolean_buffer() + == b.clone().into_bool().unwrap().boolean_buffer() } _ => false, } @@ -212,8 +202,7 @@ impl FromIterator for Validity { LogicalValidity::AllValid(count) => BooleanBuffer::new_set(count), LogicalValidity::AllInvalid(count) => BooleanBuffer::new_unset(count), LogicalValidity::Array(array) => array - .into_canonical() - .and_then(Canonical::into_bool) + .into_bool() .expect("validity must flatten to BoolArray") .boolean_buffer(), }; diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 9a6aeb0007..49ea4e404a 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -446,6 +446,10 @@ impl Stream for VortexRecordBatchStream { Poll::Ready(Some(Ok(batch))) } + + fn size_hint(&self) -> (usize, Option) { + (self.num_chunks, Some(self.num_chunks)) + } } impl RecordBatchStream for VortexRecordBatchStream { diff --git a/vortex-ipc/src/chunked_reader/take_rows.rs b/vortex-ipc/src/chunked_reader/take_rows.rs index 562e515899..3d885c2bef 100644 --- a/vortex-ipc/src/chunked_reader/take_rows.rs +++ b/vortex-ipc/src/chunked_reader/take_rows.rs @@ -13,7 +13,7 @@ use vortex::compute::unary::cast::try_cast; use vortex::compute::unary::scalar_subtract::subtract_scalar; use vortex::stats::ArrayStatistics; use vortex::stream::{ArrayStream, ArrayStreamExt}; -use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::PType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; @@ -66,12 +66,8 @@ impl ChunkedArrayReader { .collect_vec(), ) .into_array(); - let start_rows = take(&self.row_offsets, &start_chunks)? - .into_canonical()? - .into_primitive()?; - let start_bytes = take(&self.byte_offsets, &start_chunks)? - .into_canonical()? - .into_primitive()?; + let start_rows = take(&self.row_offsets, &start_chunks)?.into_primitive()?; + let start_bytes = take(&self.byte_offsets, &start_chunks)?.into_primitive()?; let stop_chunks = PrimitiveArray::from( coalesced_chunks @@ -80,12 +76,8 @@ impl ChunkedArrayReader { .collect_vec(), ) .into_array(); - let stop_rows = take(&self.row_offsets, &stop_chunks)? - .into_canonical()? - .into_primitive()?; - let stop_bytes = take(&self.byte_offsets, &stop_chunks)? - .into_canonical()? - .into_primitive()?; + let stop_rows = take(&self.row_offsets, &stop_chunks)?.into_primitive()?; + let stop_bytes = take(&self.byte_offsets, &stop_chunks)?.into_primitive()?; // For each chunk-range, read the data as an ArrayStream and call take on it. let chunks = stream::iter(0..coalesced_chunks.len()) @@ -162,13 +154,9 @@ impl ChunkedArrayReader { fn find_chunks(row_offsets: &Array, indices: &Array) -> VortexResult> { // TODO(ngates): lots of optimizations to be had here, potentially lots of push-down. // For now, we just flatten everything into primitive arrays and iterate. - let row_offsets = try_cast(row_offsets, PType::U64.into())? - .into_canonical()? - .into_primitive()?; + let row_offsets = try_cast(row_offsets, PType::U64.into())?.into_primitive()?; let _rows = format!("{:?}", row_offsets.maybe_null_slice::()); - let indices = try_cast(indices, PType::U64.into())? - .into_canonical()? - .into_primitive()?; + let indices = try_cast(indices, PType::U64.into())?.into_primitive()?; let _indices = format!("{:?}", indices.maybe_null_slice::()); if let (Some(last_idx), Some(num_rows)) = (