Skip to content

Commit

Permalink
FoR compressor handles nullable arrays (#617)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Aug 13, 2024
1 parent 7267db4 commit 18a788e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 42 deletions.
63 changes: 43 additions & 20 deletions encodings/fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use itertools::Itertools;
use num_traits::{PrimInt, WrappingAdd, WrappingSub};
use vortex::array::{ConstantArray, PrimitiveArray};
use vortex::array::{ConstantArray, PrimitiveArray, SparseArray};
use vortex::stats::{trailing_zeros, ArrayStatistics, Stat};
use vortex::validity::LogicalValidity;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_dtype::{match_each_integer_ptype, NativePType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::FoRArray;

pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> {
pub fn for_compress(array: &PrimitiveArray) -> VortexResult<Array> {
let shift = trailing_zeros(array.array());
let min = array
.statistics()
Expand All @@ -18,16 +19,42 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)>

Ok(match_each_integer_ptype!(array.ptype(), |$T| {
if shift == <$T>::PTYPE.bit_width() as u8 {
(ConstantArray::new(
Scalar::zero::<$T>(array.dtype().nullability())
.reinterpret_cast(array.ptype().to_unsigned()),
array.len(),
)
.into_array(), min, shift)
match array.validity().to_logical(array.len()) {
LogicalValidity::AllValid(l) => {
ConstantArray::new(Scalar::zero::<i32>(array.dtype().nullability()), l).into_array()
},
LogicalValidity::AllInvalid(l) => {
ConstantArray::new(Scalar::null(array.dtype().clone()), l).into_array()
}
LogicalValidity::Array(a) => {
let valid_indices = PrimitiveArray::from(
a.into_bool()?
.boolean_buffer()
.set_indices()
.map(|i| i as u64)
.collect::<Vec<_>>(),
)
.into_array();
let valid_len = valid_indices.len();
SparseArray::try_new(
valid_indices,
ConstantArray::new(Scalar::zero::<$T>(array.dtype().nullability()), valid_len)
.into_array(),
array.len(),
Scalar::null(array.dtype().clone()),
)?
.into_array()
}
}
} else {
(compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(), min, shift)
FoRArray::try_new(
compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(),
min,
shift,
)?
.into_array()
}
}))
}
Expand Down Expand Up @@ -101,20 +128,17 @@ mod test {
fn test_compress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap();

let (_, reference, _) = for_compress(&array).unwrap();
assert_eq!(u32::try_from(reference).unwrap(), 1_000_000u32);
assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32);
}

#[test]
fn test_decompress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let decompressed = FoRArray::try_new(compressed, reference, shift)
.unwrap()
.into_primitive()
.unwrap();
let compressed = for_compress(&array).unwrap();
let decompressed = compressed.into_primitive().unwrap();
assert_eq!(
decompressed.maybe_null_slice::<u32>(),
array.maybe_null_slice::<u32>()
Expand All @@ -124,8 +148,7 @@ mod test {
#[test]
fn test_overflow() {
let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let compressed = FoRArray::try_new(compressed, reference, shift).unwrap();
let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap();
assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap());

let encoded = compressed.encoded().into_primitive().unwrap();
Expand Down
21 changes: 9 additions & 12 deletions encodings/fastlanes/src/for/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,30 @@ mod test {
use vortex::array::PrimitiveArray;
use vortex::compute::unary::scalar_at;
use vortex::compute::{search_sorted, SearchResult, SearchSortedSide};
use vortex::IntoArray;

use crate::{for_compress, FoRArray};
use crate::for_compress;

#[test]
fn for_scalar_at() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into());
assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into());
assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into());
let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap();
assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into());
assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into());
assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into());
}

#[test]
fn for_search() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap();
assert_eq!(
search_sorted(&forarr, 15, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(),
SearchResult::Found(1)
);
assert_eq!(
search_sorted(&forarr, 20, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 2000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(3)
);
assert_eq!(
search_sorted(&forarr, 10, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(0)
);
}
Expand Down
39 changes: 29 additions & 10 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::{trailing_zeros, ArrayStatistics};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDef, IntoArray};
use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding};
Expand Down Expand Up @@ -52,16 +52,35 @@ impl EncodingCompressor for FoRCompressor {
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let (child, min, shift) = for_compress(&PrimitiveArray::try_from(array)?)?;
let for_compressed = for_compress(&array.clone().into_primitive()?)?;

let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&child, like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(compressed_child.array, min, shift).map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
match FoRArray::try_from(for_compressed.clone()) {
Ok(for_array) => {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&for_array.encoded(), like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(
compressed_child.array,
for_array.reference().clone(),
for_array.shift(),
)
.map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}
Err(_) => {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&for_compressed, like.as_ref())?;
Ok(CompressedArray::new(
compressed_child.array,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}
}
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
Expand Down

0 comments on commit 18a788e

Please sign in to comment.