Skip to content

Commit

Permalink
FoR compressor handles nullable arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Aug 13, 2024
1 parent e7e97a5 commit ff57c60
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 43 deletions.
80 changes: 60 additions & 20 deletions encodings/fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
use itertools::Itertools;
use num_traits::{PrimInt, WrappingAdd, WrappingSub};
use vortex::array::{ConstantArray, PrimitiveArray};
use vortex::array::{ConstantArray, PrimitiveArray, SparseArray};
use vortex::stats::{trailing_zeros, ArrayStatistics, Stat};
use vortex::validity::LogicalValidity;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_dtype::{match_each_integer_ptype, NativePType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::FoRArray;

pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> {
pub fn for_compress(array: &Array) -> VortexResult<Array> {
let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?;
for_params
.map(|(shift, min)| FoRArray::try_new(child.clone(), shift, min).map(|a| a.into_array()))
.unwrap_or(Ok(child))
}

pub fn for_compress_parts(array: &PrimitiveArray) -> VortexResult<(Array, Option<(Scalar, u8)>)> {
let shift = trailing_zeros(array.array());
let min = array
.statistics()
Expand All @@ -18,16 +26,52 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)>

Ok(match_each_integer_ptype!(array.ptype(), |$T| {
if shift == <$T>::PTYPE.bit_width() as u8 {
(ConstantArray::new(
Scalar::zero::<$T>(array.dtype().nullability())
.reinterpret_cast(array.ptype().to_unsigned()),
array.len(),
)
.into_array(), min, shift)
match array.validity().to_logical(array.len()) {
LogicalValidity::AllValid(_) => (
ConstantArray::new(
Scalar::zero::<$T>(array.dtype().nullability()),
array.len(),
)
.into_array(),
None,
),
LogicalValidity::AllInvalid(_) => (
ConstantArray::new(Scalar::null(array.dtype().clone()), array.len()).into_array(),
None,
),
LogicalValidity::Array(a) => {
let valid_indices = PrimitiveArray::from(
a.into_bool()?
.boolean_buffer()
.set_indices()
.map(|i| i as u64)
.collect::<Vec<_>>(),
)
.into_array();
let valid_len = valid_indices.len();
(
SparseArray::try_new(
valid_indices,
ConstantArray::new(
Scalar::zero::<$T>(array.dtype().nullability()),
valid_len,
)
.into_array(),
array.len(),
Scalar::null(array.dtype().clone()),
)?
.into_array(),
None,
)
}
}
} else {
(compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(), min, shift)
(
compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(),
Some((min, shift)),
)
}
}))
}
Expand Down Expand Up @@ -101,20 +145,17 @@ mod test {
fn test_compress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap();

let (_, reference, _) = for_compress(&array).unwrap();
assert_eq!(u32::try_from(reference).unwrap(), 1_000_000u32);
assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32);
}

#[test]
fn test_decompress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let decompressed = FoRArray::try_new(compressed, reference, shift)
.unwrap()
.into_primitive()
.unwrap();
let compressed = for_compress(array.array()).unwrap();
let decompressed = compressed.into_primitive().unwrap();
assert_eq!(
decompressed.maybe_null_slice::<u32>(),
array.maybe_null_slice::<u32>()
Expand All @@ -124,8 +165,7 @@ mod test {
#[test]
fn test_overflow() {
let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let compressed = FoRArray::try_new(compressed, reference, shift).unwrap();
let compressed = FoRArray::try_from(for_compress(array.array()).unwrap()).unwrap();
assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap());

let encoded = compressed.encoded().into_primitive().unwrap();
Expand Down
21 changes: 9 additions & 12 deletions encodings/fastlanes/src/for/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,30 @@ mod test {
use vortex::array::PrimitiveArray;
use vortex::compute::unary::scalar_at;
use vortex::compute::{search_sorted, SearchResult, SearchSortedSide};
use vortex::IntoArray;

use crate::{for_compress, FoRArray};
use crate::for_compress;

#[test]
fn for_scalar_at() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into());
assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into());
assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into());
let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap();
assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into());
assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into());
assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into());
}

#[test]
fn for_search() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
let for_arr = for_compress(PrimitiveArray::from(vec![1100, 1500, 1900]).array()).unwrap();
assert_eq!(
search_sorted(&forarr, 15, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(),
SearchResult::Found(1)
);
assert_eq!(
search_sorted(&forarr, 20, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 2000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(3)
);
assert_eq!(
search_sorted(&forarr, 10, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(0)
);
}
Expand Down
36 changes: 25 additions & 11 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::{trailing_zeros, ArrayStatistics};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDef, IntoArray};
use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding};
use vortex_fastlanes::{for_compress_parts, FoR, FoRArray, FoREncoding};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
Expand Down Expand Up @@ -52,16 +52,30 @@ impl EncodingCompressor for FoRCompressor {
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let (child, min, shift) = for_compress(&PrimitiveArray::try_from(array)?)?;
let (child, for_params) = for_compress_parts(&array.clone().into_primitive()?)?;

let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&child, like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(compressed_child.array, min, shift).map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
for_params
.map(|(min, shift)| {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&child, like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(compressed_child.array, min, shift)
.map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
})
.unwrap_or_else(|| {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&child, like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
compressed_child.array,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
})
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
Expand Down

0 comments on commit ff57c60

Please sign in to comment.