From ff57c605604ba262cb1a0c58f420a43608718bbb Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 13 Aug 2024 17:02:27 +0100 Subject: [PATCH 1/4] FoR compressor handles nullable arrays --- encodings/fastlanes/src/for/compress.rs | 80 ++++++++++++++----- encodings/fastlanes/src/for/compute.rs | 21 +++-- .../src/compressors/for.rs | 36 ++++++--- 3 files changed, 94 insertions(+), 43 deletions(-) diff --git a/encodings/fastlanes/src/for/compress.rs b/encodings/fastlanes/src/for/compress.rs index 1b49e32cde..64346447f1 100644 --- a/encodings/fastlanes/src/for/compress.rs +++ b/encodings/fastlanes/src/for/compress.rs @@ -1,7 +1,8 @@ use itertools::Itertools; use num_traits::{PrimInt, WrappingAdd, WrappingSub}; -use vortex::array::{ConstantArray, PrimitiveArray}; +use vortex::array::{ConstantArray, PrimitiveArray, SparseArray}; use vortex::stats::{trailing_zeros, ArrayStatistics, Stat}; +use vortex::validity::LogicalValidity; use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; use vortex_dtype::{match_each_integer_ptype, NativePType}; use vortex_error::{vortex_err, VortexResult}; @@ -9,7 +10,14 @@ use vortex_scalar::Scalar; use crate::FoRArray; -pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> { +pub fn for_compress(array: &Array) -> VortexResult { + let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; + for_params + .map(|(shift, min)| FoRArray::try_new(child.clone(), shift, min).map(|a| a.into_array())) + .unwrap_or(Ok(child)) +} + +pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option<(Scalar, u8)>)> { let shift = trailing_zeros(array.array()); let min = array .statistics() @@ -18,16 +26,52 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> Ok(match_each_integer_ptype!(array.ptype(), |$T| { if shift == <$T>::PTYPE.bit_width() as u8 { - (ConstantArray::new( - Scalar::zero::<$T>(array.dtype().nullability()) - .reinterpret_cast(array.ptype().to_unsigned()), - array.len(), - ) - .into_array(), min, shift) + match array.validity().to_logical(array.len()) { + LogicalValidity::AllValid(_) => ( + ConstantArray::new( + Scalar::zero::<$T>(array.dtype().nullability()), + array.len(), + ) + .into_array(), + None, + ), + LogicalValidity::AllInvalid(_) => ( + ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(), + None, + ), + LogicalValidity::Array(a) => { + let valid_indices = PrimitiveArray::from( + a.into_bool()? + .boolean_buffer() + .set_indices() + .map(|i| i as u64) + .collect::>(), + ) + .into_array(); + let valid_len = valid_indices.len(); + ( + SparseArray::try_new( + valid_indices, + ConstantArray::new( + Scalar::zero::<$T>(array.dtype().nullability()), + valid_len, + ) + .into_array(), + array.len(), + Scalar::null(array.dtype().clone()), + )? + .into_array(), + None, + ) + } + } } else { - (compress_primitive::<$T>(&array, shift, $T::try_from(&min)?) - .reinterpret_cast(array.ptype().to_unsigned()) - .into_array(), min, shift) + ( + compress_primitive::<$T>(&array, shift, $T::try_from(&min)?) + .reinterpret_cast(array.ptype().to_unsigned()) + .into_array(), + Some((min, shift)), + ) } })) } @@ -101,20 +145,17 @@ mod test { fn test_compress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); + let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); - let (_, reference, _) = for_compress(&array).unwrap(); - assert_eq!(u32::try_from(reference).unwrap(), 1_000_000u32); + assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32); } #[test] fn test_decompress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let (compressed, reference, shift) = for_compress(&array).unwrap(); - let decompressed = FoRArray::try_new(compressed, reference, shift) - .unwrap() - .into_primitive() - .unwrap(); + let compressed = for_compress(array.array()).unwrap(); + let decompressed = compressed.into_primitive().unwrap(); assert_eq!( decompressed.maybe_null_slice::(), array.maybe_null_slice::() @@ -124,8 +165,7 @@ mod test { #[test] fn test_overflow() { let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec()); - let (compressed, reference, shift) = for_compress(&array).unwrap(); - let compressed = FoRArray::try_new(compressed, reference, shift).unwrap(); + let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap()); let encoded = compressed.encoded().into_primitive().unwrap(); diff --git a/encodings/fastlanes/src/for/compute.rs b/encodings/fastlanes/src/for/compute.rs index 6b146b3f50..3ca1a79973 100644 --- a/encodings/fastlanes/src/for/compute.rs +++ b/encodings/fastlanes/src/for/compute.rs @@ -96,33 +96,30 @@ mod test { use vortex::array::PrimitiveArray; use vortex::compute::unary::scalar_at; use vortex::compute::{search_sorted, SearchResult, SearchSortedSide}; - use vortex::IntoArray; - use crate::{for_compress, FoRArray}; + use crate::for_compress; #[test] fn for_scalar_at() { - let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap(); - let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array(); - assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into()); - assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into()); - assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into()); + let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); + assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into()); + assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into()); + assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into()); } #[test] fn for_search() { - let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap(); - let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array(); + let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); assert_eq!( - search_sorted(&forarr, 15, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(), SearchResult::Found(1) ); assert_eq!( - search_sorted(&forarr, 20, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 2000, SearchSortedSide::Left).unwrap(), SearchResult::NotFound(3) ); assert_eq!( - search_sorted(&forarr, 10, SearchSortedSide::Left).unwrap(), + search_sorted(&for_arr, 1000, SearchSortedSide::Left).unwrap(), SearchResult::NotFound(0) ); } diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index fff5353c54..553d21b014 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -4,10 +4,10 @@ use vortex::array::PrimitiveArray; use vortex::encoding::EncodingRef; use vortex::stats::{trailing_zeros, ArrayStatistics}; use vortex::validity::ArrayValidity; -use vortex::{Array, ArrayDef, IntoArray}; +use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding}; +use vortex_fastlanes::{for_compress_parts, FoR, FoRArray, FoREncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -52,16 +52,30 @@ impl EncodingCompressor for FoRCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let (child, min, shift) = for_compress(&PrimitiveArray::try_from(array)?)?; + let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; - let compressed_child = ctx - .named("for") - .excluding(self) - .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; - Ok(CompressedArray::new( - FoRArray::try_new(compressed_child.array, min, shift).map(|a| a.into_array())?, - Some(CompressionTree::new(self, vec![compressed_child.path])), - )) + for_params + .map(|(min, shift)| { + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::new( + FoRArray::try_new(compressed_child.array, min, shift) + .map(|a| a.into_array())?, + Some(CompressionTree::new(self, vec![compressed_child.path])), + )) + }) + .unwrap_or_else(|| { + let compressed_child = ctx + .named("for") + .excluding(self) + .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + Ok(CompressedArray::new( + compressed_child.array, + Some(CompressionTree::new(self, vec![compressed_child.path])), + )) + }) } fn used_encodings(&self) -> HashSet { From 849bc60a26d43664524159e19e03ed5183aad80c Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 13 Aug 2024 17:14:06 +0100 Subject: [PATCH 2/4] nicer --- encodings/fastlanes/src/for/compress.rs | 63 +++++++------------ encodings/fastlanes/src/for/compute.rs | 4 +- .../src/compressors/for.rs | 22 ++++--- 3 files changed, 39 insertions(+), 50 deletions(-) diff --git a/encodings/fastlanes/src/for/compress.rs b/encodings/fastlanes/src/for/compress.rs index 64346447f1..b51b98c252 100644 --- a/encodings/fastlanes/src/for/compress.rs +++ b/encodings/fastlanes/src/for/compress.rs @@ -10,14 +10,7 @@ use vortex_scalar::Scalar; use crate::FoRArray; -pub fn for_compress(array: &Array) -> VortexResult { - let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; - for_params - .map(|(shift, min)| FoRArray::try_new(child.clone(), shift, min).map(|a| a.into_array())) - .unwrap_or(Ok(child)) -} - -pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option<(Scalar, u8)>)> { +pub fn for_compress(array: &PrimitiveArray) -> VortexResult { let shift = trailing_zeros(array.array()); let min = array .statistics() @@ -27,18 +20,14 @@ pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option Ok(match_each_integer_ptype!(array.ptype(), |$T| { if shift == <$T>::PTYPE.bit_width() as u8 { match array.validity().to_logical(array.len()) { - LogicalValidity::AllValid(_) => ( - ConstantArray::new( - Scalar::zero::<$T>(array.dtype().nullability()), - array.len(), - ) - .into_array(), - None, - ), - LogicalValidity::AllInvalid(_) => ( - ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(), - None, - ), + LogicalValidity::AllValid(_) => ConstantArray::new( + Scalar::zero::<$T>(array.dtype().nullability()), + array.len(), + ) + .into_array(), + LogicalValidity::AllInvalid(_) => { + ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array() + } LogicalValidity::Array(a) => { let valid_indices = PrimitiveArray::from( a.into_bool()? @@ -49,29 +38,25 @@ pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option ) .into_array(); let valid_len = valid_indices.len(); - ( - SparseArray::try_new( - valid_indices, - ConstantArray::new( - Scalar::zero::<$T>(array.dtype().nullability()), - valid_len, - ) + SparseArray::try_new( + valid_indices, + ConstantArray::new(Scalar::zero::<$T>(array.dtype().nullability()), valid_len) .into_array(), - array.len(), - Scalar::null(array.dtype().clone()), - )? - .into_array(), - None, - ) + array.len(), + Scalar::null(array.dtype().clone()), + )? + .into_array() } } } else { - ( + FoRArray::try_new( compress_primitive::<$T>(&array, shift, $T::try_from(&min)?) .reinterpret_cast(array.ptype().to_unsigned()) .into_array(), - Some((min, shift)), - ) + min, + shift, + )? + .into_array() } })) } @@ -145,7 +130,7 @@ mod test { fn test_compress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); + let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap(); assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32); } @@ -154,7 +139,7 @@ mod test { fn test_decompress() { // Create a range offset by a million let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec()); - let compressed = for_compress(array.array()).unwrap(); + let compressed = for_compress(&array).unwrap(); let decompressed = compressed.into_primitive().unwrap(); assert_eq!( decompressed.maybe_null_slice::(), @@ -165,7 +150,7 @@ mod test { #[test] fn test_overflow() { let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec()); - let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap(); + let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap(); assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap()); let encoded = compressed.encoded().into_primitive().unwrap(); diff --git a/encodings/fastlanes/src/for/compute.rs b/encodings/fastlanes/src/for/compute.rs index 3ca1a79973..383f951de2 100644 --- a/encodings/fastlanes/src/for/compute.rs +++ b/encodings/fastlanes/src/for/compute.rs @@ -101,7 +101,7 @@ mod test { #[test] fn for_scalar_at() { - let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); + let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap(); assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into()); assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into()); assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into()); @@ -109,7 +109,7 @@ mod test { #[test] fn for_search() { - let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap(); + let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap(); assert_eq!( search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(), SearchResult::Found(1) diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 553d21b014..523e9d8ea0 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -7,7 +7,7 @@ use vortex::validity::ArrayValidity; use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant}; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_fastlanes::{for_compress_parts, FoR, FoRArray, FoREncoding}; +use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding}; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; use crate::SamplingCompressor; @@ -52,25 +52,29 @@ impl EncodingCompressor for FoRCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?; + let for_compressed = for_compress(&array.clone().into_primitive()?)?; - for_params - .map(|(min, shift)| { + FoRArray::try_from(for_compressed.clone()) + .and_then(|for_array| { let compressed_child = ctx .named("for") .excluding(self) - .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + .compress(&for_array.encoded(), like.as_ref().and_then(|l| l.child(0)))?; Ok(CompressedArray::new( - FoRArray::try_new(compressed_child.array, min, shift) - .map(|a| a.into_array())?, + FoRArray::try_new( + compressed_child.array, + for_array.reference().clone(), + for_array.shift(), + ) + .map(|a| a.into_array())?, Some(CompressionTree::new(self, vec![compressed_child.path])), )) }) - .unwrap_or_else(|| { + .or_else(|_| { let compressed_child = ctx .named("for") .excluding(self) - .compress(&child, like.as_ref().and_then(|l| l.child(0)))?; + .compress(&for_compressed, like.as_ref())?; Ok(CompressedArray::new( compressed_child.array, Some(CompressionTree::new(self, vec![compressed_child.path])), From 04da759723e6decbbd1feca4509805d185cb4b68 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 13 Aug 2024 17:15:56 +0100 Subject: [PATCH 3/4] nicer --- vortex-sampling-compressor/src/compressors/for.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/vortex-sampling-compressor/src/compressors/for.rs b/vortex-sampling-compressor/src/compressors/for.rs index 523e9d8ea0..01fb62f5dc 100644 --- a/vortex-sampling-compressor/src/compressors/for.rs +++ b/vortex-sampling-compressor/src/compressors/for.rs @@ -54,8 +54,8 @@ impl EncodingCompressor for FoRCompressor { ) -> VortexResult> { let for_compressed = for_compress(&array.clone().into_primitive()?)?; - FoRArray::try_from(for_compressed.clone()) - .and_then(|for_array| { + match FoRArray::try_from(for_compressed.clone()) { + Ok(for_array) => { let compressed_child = ctx .named("for") .excluding(self) @@ -69,8 +69,8 @@ impl EncodingCompressor for FoRCompressor { .map(|a| a.into_array())?, Some(CompressionTree::new(self, vec![compressed_child.path])), )) - }) - .or_else(|_| { + } + Err(_) => { let compressed_child = ctx .named("for") .excluding(self) @@ -79,7 +79,8 @@ impl EncodingCompressor for FoRCompressor { compressed_child.array, Some(CompressionTree::new(self, vec![compressed_child.path])), )) - }) + } + } } fn used_encodings(&self) -> HashSet { From a3c42f8ad4393ba387ed5d3a601daffb5b342189 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 13 Aug 2024 17:19:26 +0100 Subject: [PATCH 4/4] nicer --- encodings/fastlanes/src/for/compress.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/encodings/fastlanes/src/for/compress.rs b/encodings/fastlanes/src/for/compress.rs index b51b98c252..0f1948a6d3 100644 --- a/encodings/fastlanes/src/for/compress.rs +++ b/encodings/fastlanes/src/for/compress.rs @@ -20,13 +20,11 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult { Ok(match_each_integer_ptype!(array.ptype(), |$T| { if shift == <$T>::PTYPE.bit_width() as u8 { match array.validity().to_logical(array.len()) { - LogicalValidity::AllValid(_) => ConstantArray::new( - Scalar::zero::<$T>(array.dtype().nullability()), - array.len(), - ) - .into_array(), - LogicalValidity::AllInvalid(_) => { - ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array() + LogicalValidity::AllValid(l) => { + ConstantArray::new(Scalar::zero::(array.dtype().nullability()), l).into_array() + }, + LogicalValidity::AllInvalid(l) => { + ConstantArray::new(Scalar::null(array.dtype().clone()), l).into_array() } LogicalValidity::Array(a) => { let valid_indices = PrimitiveArray::from(