diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index aff0ad0745..99a8bfc2c5 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -1,6 +1,6 @@ use arrow_array::RecordBatchReader; use itertools::Itertools; -use log::info; +use log::{info, warn}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ProjectionMask; use std::collections::HashSet; @@ -24,7 +24,7 @@ use vortex::dtype::DType; use vortex::formatter::display_tree; use vortex_alp::ALPEncoding; use vortex_dict::DictEncoding; -use vortex_fastlanes::{BitPackedEncoding, FoREncoding}; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; use vortex_ree::REEEncoding; use vortex_roaring::RoaringBoolEncoding; @@ -46,8 +46,7 @@ pub fn enumerate_arrays() -> Vec<&'static dyn Encoding> { &DictEncoding, &BitPackedEncoding, &FoREncoding, - // &DeltaEncoding, - // &FFoREncoding, + &DeltaEncoding, &REEEncoding, &RoaringBoolEncoding, // &RoaringIntEncoding, @@ -117,7 +116,7 @@ pub fn compress_taxi_data() -> ArrayRef { let dtype: DType = schema.clone().try_into().unwrap(); let compressed = ChunkedArray::new(chunks.clone(), dtype).boxed(); - info!("Compressed array {}", display_tree(compressed.as_ref())); + warn!("Compressed array {}\n", display_tree(compressed.as_ref())); let mut field_bytes = vec![0; schema.fields().len()]; for chunk in chunks { @@ -156,10 +155,10 @@ mod test { .unwrap(); } - #[ignore] + // #[ignore] #[test] fn compression_ratio() { - setup_logger(LevelFilter::Warn); + setup_logger(LevelFilter::Debug); _ = compress_taxi_data(); } } diff --git a/vortex-array/src/ptype.rs b/vortex-array/src/ptype.rs index 815720fad7..71f79d36e8 100644 --- a/vortex-array/src/ptype.rs +++ b/vortex-array/src/ptype.rs @@ -163,6 +163,26 @@ impl PType { pub fn bit_width(&self) -> usize { self.byte_width() * 8 } + + pub fn to_signed(self) -> PType { + match self { + PType::U8 => PType::I8, + PType::U16 => PType::I16, + PType::U32 => PType::I32, + PType::U64 => PType::I64, + _ => self, + } + } + + pub fn to_unsigned(self) -> PType { + match self { + PType::I8 => PType::U8, + PType::I16 => PType::U16, + PType::I32 => PType::U32, + PType::I64 => PType::U64, + _ => self, + } + } } impl TryFrom<&DType> for PType { diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 82e051b837..209680037e 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -134,7 +134,7 @@ fn bitpack_primitive(array: &[T], bit_width: usize) }); // Pad the last chunk with zeros to a full 1024 elements. - let last_chunk_size = array.len() % 1024; + let last_chunk_size = array.len() - ((num_chunks - 1) * 1024); let mut last_chunk: [T; 1024] = [T::default(); 1024]; last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]); TryBitPack::try_bitpack_into(&last_chunk, bit_width, &mut output).unwrap(); diff --git a/vortex-fastlanes/src/delta/compress.rs b/vortex-fastlanes/src/delta/compress.rs index 2795e88ee5..61345a51eb 100644 --- a/vortex-fastlanes/src/delta/compress.rs +++ b/vortex-fastlanes/src/delta/compress.rs @@ -1,5 +1,4 @@ use arrayref::array_ref; -use log::debug; use std::mem::size_of; use fastlanez_sys::{transpose, Delta}; @@ -7,9 +6,11 @@ use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use vortex::compute::fill::fill_forward; use vortex::error::VortexResult; use vortex::match_each_signed_integer_ptype; use vortex::ptype::NativePType; +use vortex::stats::Stat; use crate::{DeltaArray, DeltaEncoding}; @@ -21,18 +22,34 @@ impl EncodingCompression for DeltaEncoding { ) -> Option<&dyn EncodingCompression> { // Only support primitive arrays let Some(parray) = array.maybe_primitive() else { - debug!("Skipping Delta: not primitive"); return None; }; // Only supports signed ints if !parray.ptype().is_signed_int() { - debug!("Skipping Delta: not int"); return None; } - debug!("Compressing with Delta"); + if parray + .stats() + .get_or_compute_cast::(&Stat::Min) + .unwrap_or(0) + != 0 + { + return None; + } + Some(self) + // + // // For now, only consider delta on sorted arrays + // if parray + // .stats() + // .get_or_compute_as::(&Stat::IsSorted) + // .unwrap_or(false) + // { + // return Some(self); + // } + // None } fn compress( @@ -46,14 +63,19 @@ impl EncodingCompression for DeltaEncoding { let validity = parray .validity() - .map(|v| ctx.compress(v.as_ref(), like_delta.and_then(|d| d.validity()))) + .map(|v| { + ctx.auxiliary("validity") + .compress(v.as_ref(), like_delta.and_then(|d| d.validity())) + }) .transpose()?; + // Fill forward nulls + let filled = fill_forward(array)?; let delta_encoded = match_each_signed_integer_ptype!(parray.ptype(), |$T| { - PrimitiveArray::from(delta_primitive(parray.buffer().typed_data::<$T>())) + PrimitiveArray::from(delta_primitive(filled.as_primitive().typed_data::<$T>())) }); - let encoded = ctx.next_level().compress( + let encoded = ctx.named("deltas").compress( delta_encoded.as_ref(), like_delta.map(|d| d.encoded().as_ref()), )?; @@ -85,11 +107,17 @@ where Delta::delta(&transposed, &mut base, &mut output); }); - // Pad the last chunk with zeros to a full 1024 elements. - let last_chunk_size = array.len() % 1024; - let mut last_chunk: [T; 1024] = [T::default(); 1024]; - last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]); - Delta::delta(&last_chunk, &mut base, &mut output); + // To avoid padding, the remainder is encoded with scalar logic. + let mut base_scalar = base[base.len() - 1]; + let last_chunk_size = array.len() - ((num_chunks - 1) * 1024); + for i in array.len() - last_chunk_size..array.len() { + let next = array[i]; + output.push(next - base_scalar); + base_scalar = next; + } + // let mut last_chunk: [T; 1024] = [T::default(); 1024]; + // last_chunk[..last_chunk_size].copy_from_slice(&array[array.len() - last_chunk_size..]); + // Delta::delta(&last_chunk, &mut base, &mut output); output } diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 968f393d43..44048fd959 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -1,4 +1,5 @@ use itertools::Itertools; +use num_traits::CheckedSub; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; @@ -6,13 +7,14 @@ use vortex::array::{Array, ArrayRef}; use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; use vortex::error::VortexResult; use vortex::match_each_integer_ptype; +use vortex::ptype::NativePType; use vortex::stats::Stat; use crate::{FoRArray, FoREncoding}; impl EncodingCompression for FoREncoding { fn cost(&self) -> u8 { - 0 + 1 } fn can_compress( @@ -28,10 +30,26 @@ impl EncodingCompression for FoREncoding { return None; } - // Nothing for us to do if the min is already zero - if parray.stats().get_or_compute_cast::(&Stat::Min)? == 0 { - return None; - } + match_each_integer_ptype!(parray.ptype(), |$T| { + let min = parray + .stats() + .get_or_compute_as::<$T>(&Stat::Min) + .unwrap_or(<$T>::default()); + + // Nothing for us to do if the min is already zero + if min == 0 { + return None; + } + + // Check for overflow + let max = parray + .stats() + .get_or_compute_as::<$T>(&Stat::Max) + .unwrap_or(<$T>::default()); + if max.checked_sub(min).is_none() { + return None; + } + }); Some(self) } @@ -45,15 +63,7 @@ impl EncodingCompression for FoREncoding { let parray = array.as_primitive(); let child = match_each_integer_ptype!(parray.ptype(), |$T| { - let min = parray.stats().get_or_compute_as::<$T>(&Stat::Min).unwrap_or(<$T>::default()); - - // TODO(ngates): check for overflow - let values = parray.buffer().typed_data::<$T>().iter().map(|v| v - min) - // TODO(ngates): cast to unsigned - // .map(|v| v as parray.ptype().to_unsigned()::T) - .collect_vec(); - - PrimitiveArray::from(values) + compress_primitive::<$T>(parray) }); // TODO(ngates): remove FoR as a potential encoding from the ctx @@ -67,6 +77,36 @@ impl EncodingCompression for FoREncoding { Ok(FoRArray::try_new(compressed_child, reference)?.boxed()) } } +fn compress_primitive(parray: &PrimitiveArray) -> PrimitiveArray { + let min = parray + .stats() + .get_or_compute_as::(&Stat::Min) + .unwrap_or(::default()); + let max = parray + .stats() + .get_or_compute_as::(&Stat::Max) + .unwrap_or(::default()); + + let _buffer = parray.typed_data::(); + if max.checked_sub(&min).is_none() { + // Delta would cause overflow + return parray.clone(); + } + + // TODO(ngates): check for overflow + let values = parray + .typed_data::() + .iter() + .map(|&v| { + v.checked_sub(&min) + .unwrap_or_else(|| panic!("Underflow when compressing FoR")) + }) + // TODO(ngates): cast to unsigned + // .map(|v| v as parray.ptype().to_unsigned()::T) + .collect_vec(); + + PrimitiveArray::from(values) +} #[cfg(test)] mod test {