Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FoR compressor handles nullable arrays #617

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions encodings/fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use itertools::Itertools;
use num_traits::{PrimInt, WrappingAdd, WrappingSub};
use vortex::array::{ConstantArray, PrimitiveArray};
use vortex::array::{ConstantArray, PrimitiveArray, SparseArray};
use vortex::stats::{trailing_zeros, ArrayStatistics, Stat};
use vortex::validity::LogicalValidity;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_dtype::{match_each_integer_ptype, NativePType};
use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::FoRArray;

pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)> {
pub fn for_compress(array: &PrimitiveArray) -> VortexResult<Array> {
let shift = trailing_zeros(array.array());
let min = array
.statistics()
Expand All @@ -18,16 +19,42 @@ pub fn for_compress(array: &PrimitiveArray) -> VortexResult<(Array, Scalar, u8)>

Ok(match_each_integer_ptype!(array.ptype(), |$T| {
if shift == <$T>::PTYPE.bit_width() as u8 {
(ConstantArray::new(
Scalar::zero::<$T>(array.dtype().nullability())
.reinterpret_cast(array.ptype().to_unsigned()),
array.len(),
)
.into_array(), min, shift)
match array.validity().to_logical(array.len()) {
LogicalValidity::AllValid(l) => {
ConstantArray::new(Scalar::zero::<i32>(array.dtype().nullability()), l).into_array()
},
LogicalValidity::AllInvalid(l) => {
ConstantArray::new(Scalar::null(array.dtype().clone()), l).into_array()
}
LogicalValidity::Array(a) => {
let valid_indices = PrimitiveArray::from(
a.into_bool()?
.boolean_buffer()
.set_indices()
.map(|i| i as u64)
.collect::<Vec<_>>(),
)
.into_array();
let valid_len = valid_indices.len();
SparseArray::try_new(
valid_indices,
ConstantArray::new(Scalar::zero::<$T>(array.dtype().nullability()), valid_len)
.into_array(),
array.len(),
Scalar::null(array.dtype().clone()),
)?
.into_array()
}
}
} else {
(compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(), min, shift)
FoRArray::try_new(
compress_primitive::<$T>(&array, shift, $T::try_from(&min)?)
.reinterpret_cast(array.ptype().to_unsigned())
.into_array(),
min,
shift,
)?
.into_array()
}
}))
}
Expand Down Expand Up @@ -101,20 +128,17 @@ mod test {
fn test_compress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap();

let (_, reference, _) = for_compress(&array).unwrap();
assert_eq!(u32::try_from(reference).unwrap(), 1_000_000u32);
assert_eq!(u32::try_from(compressed.reference()).unwrap(), 1_000_000u32);
}

#[test]
fn test_decompress() {
// Create a range offset by a million
let array = PrimitiveArray::from((0u32..10_000).map(|v| v + 1_000_000).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let decompressed = FoRArray::try_new(compressed, reference, shift)
.unwrap()
.into_primitive()
.unwrap();
let compressed = for_compress(&array).unwrap();
let decompressed = compressed.into_primitive().unwrap();
assert_eq!(
decompressed.maybe_null_slice::<u32>(),
array.maybe_null_slice::<u32>()
Expand All @@ -124,8 +148,7 @@ mod test {
#[test]
fn test_overflow() {
let array = PrimitiveArray::from((i8::MIN..=i8::MAX).collect_vec());
let (compressed, reference, shift) = for_compress(&array).unwrap();
let compressed = FoRArray::try_new(compressed, reference, shift).unwrap();
let compressed = FoRArray::try_from(for_compress(&array).unwrap()).unwrap();
assert_eq!(i8::MIN, i8::try_from(compressed.reference()).unwrap());

let encoded = compressed.encoded().into_primitive().unwrap();
Expand Down
21 changes: 9 additions & 12 deletions encodings/fastlanes/src/for/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,30 @@ mod test {
use vortex::array::PrimitiveArray;
use vortex::compute::unary::scalar_at;
use vortex::compute::{search_sorted, SearchResult, SearchSortedSide};
use vortex::IntoArray;

use crate::{for_compress, FoRArray};
use crate::for_compress;

#[test]
fn for_scalar_at() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
assert_eq!(scalar_at(&forarr, 0).unwrap(), 11.into());
assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into());
assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into());
let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap();
assert_eq!(scalar_at(&for_arr, 0).unwrap(), 1100.into());
assert_eq!(scalar_at(&for_arr, 1).unwrap(), 1500.into());
assert_eq!(scalar_at(&for_arr, 2).unwrap(), 1900.into());
}

#[test]
fn for_search() {
let (child, min, shift) = for_compress(&PrimitiveArray::from(vec![11, 15, 19])).unwrap();
let forarr = FoRArray::try_new(child, min, shift).unwrap().into_array();
let for_arr = for_compress(&PrimitiveArray::from(vec![1100, 1500, 1900])).unwrap();
assert_eq!(
search_sorted(&forarr, 15, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1500, SearchSortedSide::Left).unwrap(),
SearchResult::Found(1)
);
assert_eq!(
search_sorted(&forarr, 20, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 2000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(3)
);
assert_eq!(
search_sorted(&forarr, 10, SearchSortedSide::Left).unwrap(),
search_sorted(&for_arr, 1000, SearchSortedSide::Left).unwrap(),
SearchResult::NotFound(0)
);
}
Expand Down
39 changes: 29 additions & 10 deletions vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::{trailing_zeros, ArrayStatistics};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDef, IntoArray};
use vortex::{Array, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding};
Expand Down Expand Up @@ -52,16 +52,35 @@ impl EncodingCompressor for FoRCompressor {
like: Option<CompressionTree<'a>>,
ctx: SamplingCompressor<'a>,
) -> VortexResult<CompressedArray<'a>> {
let (child, min, shift) = for_compress(&PrimitiveArray::try_from(array)?)?;
let for_compressed = for_compress(&array.clone().into_primitive()?)?;

let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&child, like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(compressed_child.array, min, shift).map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
match FoRArray::try_from(for_compressed.clone()) {
Ok(for_array) => {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&for_array.encoded(), like.as_ref().and_then(|l| l.child(0)))?;
Ok(CompressedArray::new(
FoRArray::try_new(
compressed_child.array,
for_array.reference().clone(),
for_array.shift(),
)
.map(|a| a.into_array())?,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}
Err(_) => {
let compressed_child = ctx
.named("for")
.excluding(self)
.compress(&for_compressed, like.as_ref())?;
Ok(CompressedArray::new(
compressed_child.array,
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}
}
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
Expand Down
Loading