diff --git a/encodings/fastlanes/src/for/compress.rs b/encodings/fastlanes/src/for/compress.rs index 1b49e32cde..64346447f1 100644 --- a/encodings/fastlanes/src/for/compress.rs +++ b/encodings/fastlanes/src/for/compress.rs @@ -1,7 +1,8 @@ use itertools::Itertools; use num_traits::{PrimInt, WrappingAdd, WrappingSub}; -use vortex::array::{ConstantArray, PrimitiveArray}; +use vortex::array::{ConstantArray, PrimitiveArray, SparseArray}; use vortex::stats::{trailing_zeros, ArrayStatistics, Stat}; +use vortex::validity::LogicalValidity; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::{match_each_integer_ptype, NativePType}; use vortex_error::{vortex_err, VortexResult}; @@ -9,7 +10,14 @@ use vortex_scalar::Scalar; use crate::FoRArray; -pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> { +pub fn for_compress(array: &Array) -> VortexResult { + let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; + for_params + .map(|(shift, min)| FoRArray::try_new(child.clone(), shift, min).map(|a| a.into_array())) + .unwrap_or(Ok(child)) +} + +pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option<(Scalar, u8)>)> { let shift = trailing_zeros(array.array()); let min = array .statistics() @@ -18,16 +26,52 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> Ok(match_each_integer_ptype!(array.ptype(), |$T| { if shift == <$T>::PTYPE.bit_width() as u8 { - (ConstantArray::new( - Scalar::zero::<$T>(array.dtype().nullability()) - .reinterpret_cast(array.ptype().to_unsigned()), - array.len(), - ) - .into_array(), min, shift) + match array.validity().to_logical(array.len()) { + LogicalValidity::AllValid(_) => ( + ConstantArray::new( + Scalar::zero::<$T>(array.dtype().nullability()), + array.len(), + ) + .into_array(), + None, + ), + LogicalValidity::AllInvalid(_) => ( + ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(), + None, + ), + LogicalValidity::Array(a) => { + let valid_indices = PrimitiveArray::from( + a.into_bool()? + .boolean_buffer() + .set_indices() + .map(|i| i as u64) + .collect::>(), + ) + .into_array(); + let valid_len = valid_indices.len(); + ( + SparseArray::try_new( + valid_indices, + ConstantArray::new( + Scalar::zero::<$T>(array.dtype().nullability()), + valid_len, + ) + .into_array(), + array.len(), + Scalar::null(array.dtype().clone()), + )? + .into_array(), + None, + ) + } + } } else { - (compress_primitive::<$T>(&array, shift, $T::try_from(&min)?) - .reinterpret_cast(array.ptype().to_unsigned()) - .into_array(), min, shift) + ( + compress_primitive::<$T>(&array, shift, $T::try_from(&min)?) + .reinterpret_cast(array.ptype().to_unsigned()) + .into_array(), + Some((min, shift)), + ) } })) } @@ -101,20 +145,17 @@ mod test { fn test_compress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); + let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); - let (_, reference, _) = for_compress(&array).unwrap(); - assert_eq!(u32::try_from(reference).unwrap(), 1_000_000u32); + assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32); } #[test] fn test_decompress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let (compressed, reference, shift) = for_compress(&array).unwrap(); - let decompressed = FoRArray::try_new(compressed, reference, shift) - .unwrap() - .into_primitive() - .unwrap(); + let compressed = for_compress(array.array()).unwrap(); + let decompressed = compressed.into_primitive().unwrap(); assert_eq!( decompressed.maybe_null_slice::(), array.maybe_null_slice::() @@ -124,8 +165,7 @@ mod test { #[test] fn test_overflow() { let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec()); - let (compressed, reference, shift) = for_compress(&array).unwrap(); - let compressed = FoRArray::try_new(compressed, reference, shift).unwrap(); + let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap()); let encoded = compressed.encoded().into_primitive().unwrap(); diff --git a/encodings/fastlanes/src/for/compute.rs b/encodings/fastlanes/src/for/compute.rs index 6b146b3f50..3ca1a79973 100644 --- a/encodings/fastlanes/src/for/compute.rs +++ b/encodings/fastlanes/src/for/compute.rs @@ -96,33 +96,30 @@ mod test { use vortex::array::PrimitiveArray; use vortex::compute::unary::scalar_at; use vortex::compute::{search_sorted, SearchResult, SearchSortedSide}; - use vortex::IntoArray; - use crate::{for_compress, FoRArray}; + use crate::for_compress; #[test] fn for_scalar_at() { - let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap(); - let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array(); - assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into()); - assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into()); - assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into()); + let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); + assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into()); + assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into()); + assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into()); } #[test] fn for_search() { - let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap(); - let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array(); + let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); assert_eq!( - search_sorted(&forarr, 15, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(), SearchResult::Found(1) ); assert_eq!( - search_sorted(&forarr, 20, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 2000, SearchSortedSide::Left).unwrap(), SearchResult::NotFound(3) ); assert_eq!( - search_sorted(&forarr, 10, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 1000, SearchSortedSide::Left).unwrap(), SearchResult::NotFound(0) ); } diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index fff5353c54..553d21b014 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -4,10 +4,10 @@ use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::{trailing_zeros, ArrayStatistics}; use vortex::validity::ArrayValidity; -use vortex::{Array, ArrayDef, IntoArray}; +use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding}; +use vortex_fastlanes::{for_compress_parts, FoR, FoRArray, FoREncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -52,16 +52,30 @@ impl EncodingCompressor for FoRCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let (child, min, shift) = for_compress(&PrimitiveArray::try_from(array)?)?; + let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; - let compressed_child = ctx - .named("for") - .excluding(self) - .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; - Ok(CompressedArray::new( - FoRArray::try_new(compressed_child.array, min, shift).map(|a| a.into_array())?, - Some(CompressionTree::new(self, vec![compressed_child.path])), - )) + for_params + .map(|(min, shift)| { + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::new( + FoRArray::try_new(compressed_child.array, min, shift) + .map(|a| a.into_array())?, + Some(CompressionTree::new(self, vec![compressed_child.path])), + )) + }) + .unwrap_or_else(|| { + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::new( + compressed_child.array, + Some(CompressionTree::new(self, vec![compressed_child.path])), + )) + }) } fn used_encodings(&self) -> HashSet {