From 0095d79b640f5acfa4021dd2560750e4d5a45f45 Mon Sep 17 00:00:00 2001 From: Josh Casale Date: Thu, 4 Apr 2024 15:02:56 +0100 Subject: [PATCH] validity() for SparseArray, scalar_at respects patches, flatten respects fill_value --- .gitignore | 1 + vortex-alp/src/compress.rs | 2 + vortex-array/src/array/sparse/compress.rs | 2 + vortex-array/src/array/sparse/compute.rs | 31 +++++++-- vortex-array/src/array/sparse/mod.rs | 70 +++++++++++++++++---- vortex-array/src/array/sparse/serde.rs | 15 ++++- vortex-fastlanes/src/bitpacking/compress.rs | 3 +- vortex-fastlanes/src/bitpacking/compute.rs | 15 ++++- 8 files changed, 115 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index d41539df1a..2e32a1b26f 100644 --- a/.gitignore +++ b/.gitignore @@ -189,3 +189,4 @@ benchmarks/.out # Scratch *.txt +*.vortex diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 70d146a6c8..d92fc62a0f 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -7,6 +7,7 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; use vortex::compute::flatten::flatten_primitive; use vortex::compute::patch::patch; use vortex::ptype::{NativePType, PType}; +use vortex::scalar::Scalar; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::alp::ALPFloat; @@ -99,6 +100,7 @@ where PrimitiveArray::from(exc_pos).into_array(), PrimitiveArray::from(exc).into_array(), len, + Scalar::null(values.dtype()), ) .into_array() }), diff --git a/vortex-array/src/array/sparse/compress.rs b/vortex-array/src/array/sparse/compress.rs index 9d0a62e008..9150ffa042 100644 --- a/vortex-array/src/array/sparse/compress.rs +++ b/vortex-array/src/array/sparse/compress.rs @@ -4,6 +4,7 @@ use crate::array::downcast::DowncastArrayBuiltin; use crate::array::sparse::{SparseArray, SparseEncoding}; use crate::array::{Array, ArrayRef}; use crate::compress::{CompressConfig, CompressCtx, EncodingCompression}; +use crate::scalar::{NullScalar, Scalar}; impl EncodingCompression for SparseEncoding { fn cost(&self) -> u8 { @@ -32,6 +33,7 @@ impl EncodingCompression for SparseEncoding { ctx.named("values") .compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?, sparse_array.len(), + Scalar::Null(NullScalar::new()), ) .into_array()) } diff --git a/vortex-array/src/array/sparse/compute.rs b/vortex-array/src/array/sparse/compute.rs index 837ecb1d08..36b7f5ce2b 100644 --- a/vortex-array/src/array/sparse/compute.rs +++ b/vortex-array/src/array/sparse/compute.rs @@ -30,6 +30,16 @@ impl ArrayCompute for SparseArray { impl AsContiguousFn for SparseArray { fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { + let fill_types = arrays + .iter() + .map(|a| a.as_sparse().clone().fill_value) + .dedup() + .collect_vec(); + assert_eq!( + 1, + fill_types.len(), + "Cannot concatenate SparseArrays with differing fill values" + ); Ok(SparseArray::new( as_contiguous( &arrays @@ -46,11 +56,13 @@ impl AsContiguousFn for SparseArray { .collect_vec(), )?, arrays.iter().map(|a| a.len()).sum(), + fill_types.first().unwrap().clone(), ) .into_array()) } } +#[allow(unreachable_code)] impl FlattenFn for SparseArray { fn flatten(&self) -> VortexResult { // Resolve our indices into a vector of usize applying the offset @@ -58,11 +70,16 @@ impl FlattenFn for SparseArray { let mut validity = BooleanBufferBuilder::new(self.len()); validity.append_n(self.len(), false); - let values = flatten(self.values())?; + let null_fill = self.fill_value.is_null(); if let FlattenedArray::Primitive(parray) = values { match_each_native_ptype!(parray.ptype(), |$P| { - let mut values = vec![$P::default(); self.len()]; + let mut values = if null_fill { + vec![$P::default(); self.len()] + } else { + let p_fill_value: $P = self.fill_value.clone().try_into()?; + vec![p_fill_value; self.len()] + }; let mut offset = 0; for v in parray.typed_data::<$P>() { @@ -73,11 +90,15 @@ impl FlattenFn for SparseArray { } let validity = validity.finish(); - - Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable( + if null_fill { + Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable( values, Some(validity.into()), - ))) + ))) + } else { + Ok(FlattenedArray::Primitive(PrimitiveArray::from(values))) + } + }) } else { Err(vortex_err!( diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index fecbccc166..14281550a4 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -5,6 +5,7 @@ use linkme::distributed_slice; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::DType; +use crate::array::constant::ConstantArray; use crate::array::validity::Validity; use crate::array::{check_slice_bounds, Array, ArrayRef}; use crate::compress::EncodingCompression; @@ -15,6 +16,7 @@ use crate::compute::ArrayCompute; use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::ptype::PType; +use crate::scalar::{BoolScalar, Scalar}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsCompute, StatsSet}; use crate::{impl_array, ArrayWalker}; @@ -31,15 +33,21 @@ pub struct SparseArray { indices_offset: usize, len: usize, stats: Arc>, + fill_value: Scalar, } impl SparseArray { - pub fn new(indices: ArrayRef, values: ArrayRef, len: usize) -> Self { - Self::try_new(indices, values, len).unwrap() + pub fn new(indices: ArrayRef, values: ArrayRef, len: usize, fill_value: Scalar) -> Self { + Self::try_new(indices, values, len, fill_value).unwrap() } - pub fn try_new(indices: ArrayRef, values: ArrayRef, len: usize) -> VortexResult { - Self::new_with_offset(indices, values, len, 0) + pub fn try_new( + indices: ArrayRef, + values: ArrayRef, + len: usize, + fill_value: Scalar, + ) -> VortexResult { + Self::new_with_offset(indices, values, len, 0, fill_value) } pub(crate) fn new_with_offset( @@ -47,6 +55,7 @@ impl SparseArray { values: ArrayRef, len: usize, indices_offset: usize, + fill_value: Scalar, ) -> VortexResult { if !matches!(indices.dtype(), &DType::IDX) { vortex_bail!("Cannot use {} as indices", indices.dtype()); @@ -58,6 +67,7 @@ impl SparseArray { indices_offset, len, stats: Arc::new(RwLock::new(StatsSet::new())), + fill_value, }) } @@ -123,6 +133,7 @@ impl Array for SparseArray { values: self.values.slice(index_start_index, index_end_index)?, len: stop - start, stats: Arc::new(RwLock::new(StatsSet::new())), + fill_value: self.fill_value.clone(), } .into_array()) } @@ -142,7 +153,16 @@ impl Array for SparseArray { } fn validity(&self) -> Option { - todo!() + let validity = SparseArray { + indices: self.indices.clone(), + values: ConstantArray::new(Scalar::Bool(BoolScalar::non_nullable(true)), self.len) + .into_array(), + indices_offset: self.indices_offset, + len: self.len, + stats: self.stats.clone(), + fill_value: Scalar::Bool(BoolScalar::non_nullable(false)), + }; + Some(Validity::Array(validity.into_array())) } fn walk(&self, walker: &mut dyn ArrayWalker) -> VortexResult<()> { @@ -189,19 +209,31 @@ impl Encoding for SparseEncoding { mod test { use itertools::Itertools; use vortex_error::VortexError; + use vortex_schema::Nullability::Nullable; + use vortex_schema::Signedness::Signed; + use vortex_schema::{DType, IntWidth}; use crate::array::sparse::SparseArray; use crate::array::Array; use crate::array::IntoArray; use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; + use crate::scalar::Scalar; - fn sparse_array() -> SparseArray { + fn nullable_fill() -> Scalar { + Scalar::null(&DType::Int(IntWidth::_32, Signed, Nullable)) + } + fn non_nullable_fill() -> Scalar { + Scalar::from(42i32) + } + + fn sparse_array(fill_value: Scalar) -> SparseArray { // merged array: [null, null, 100, null, null, 200, null, null, 300, null] SparseArray::new( vec![2u64, 5, 8].into_array(), vec![100i32, 200, 300].into_array(), 10, + fill_value, ) } @@ -216,7 +248,7 @@ mod test { #[test] pub fn iter() { assert_sparse_array( - &sparse_array(), + &sparse_array(nullable_fill()), &[ None, None, @@ -234,15 +266,27 @@ mod test { #[test] pub fn iter_sliced() { + let p_fill_val = Some(non_nullable_fill().try_into().unwrap()); + assert_sparse_array( + sparse_array(non_nullable_fill()) + .slice(2, 7) + .unwrap() + .as_ref(), + &[Some(100), p_fill_val, p_fill_val, Some(200), p_fill_val], + ); + } + + #[test] + pub fn iter_sliced_nullable() { assert_sparse_array( - sparse_array().slice(2, 7).unwrap().as_ref(), + sparse_array(nullable_fill()).slice(2, 7).unwrap().as_ref(), &[Some(100), None, None, Some(200), None], ); } #[test] pub fn iter_sliced_twice() { - let sliced_once = sparse_array().slice(1, 8).unwrap(); + let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap(); assert_sparse_array( sliced_once.as_ref(), &[None, Some(100), None, None, Some(200), None, None], @@ -256,10 +300,10 @@ mod test { #[test] pub fn test_scalar_at() { assert_eq!( - usize::try_from(scalar_at(&sparse_array(), 2).unwrap()).unwrap(), + usize::try_from(scalar_at(&sparse_array(nullable_fill()), 2).unwrap()).unwrap(), 100 ); - let error = scalar_at(&sparse_array(), 10).err().unwrap(); + let error = scalar_at(&sparse_array(nullable_fill()), 10).err().unwrap(); let VortexError::OutOfBounds(i, start, stop, _) = error else { unreachable!() }; @@ -270,7 +314,7 @@ mod test { #[test] pub fn scalar_at_sliced() { - let sliced = sparse_array().slice(2, 7).unwrap(); + let sliced = sparse_array(nullable_fill()).slice(2, 7).unwrap(); assert_eq!( usize::try_from(scalar_at(sliced.as_ref(), 0).unwrap()).unwrap(), 100 @@ -286,7 +330,7 @@ mod test { #[test] pub fn scalar_at_sliced_twice() { - let sliced_once = sparse_array().slice(1, 8).unwrap(); + let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap(); assert_eq!( usize::try_from(scalar_at(sliced_once.as_ref(), 1).unwrap()).unwrap(), 100 diff --git a/vortex-array/src/array/sparse/serde.rs b/vortex-array/src/array/sparse/serde.rs index 34930dc6ff..9809464326 100644 --- a/vortex-array/src/array/sparse/serde.rs +++ b/vortex-array/src/array/sparse/serde.rs @@ -6,6 +6,7 @@ use vortex_schema::DType; use crate::array::sparse::{SparseArray, SparseEncoding}; use crate::array::{Array, ArrayRef}; +use crate::scalar::{NullScalar, Scalar}; use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; impl ArraySerde for SparseArray { @@ -34,9 +35,15 @@ impl EncodingSerde for SparseEncoding { let offset = ctx.read_usize()?; let indices = ctx.with_schema(&DType::IDX).read()?; let values = ctx.read()?; - Ok(SparseArray::new_with_offset(indices, values, len, offset) - .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? - .into_array()) + Ok(SparseArray::new_with_offset( + indices, + values, + len, + offset, + Scalar::Null(NullScalar::new()), + ) + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .into_array()) } } @@ -47,6 +54,7 @@ mod test { use crate::array::sparse::SparseArray; use crate::array::Array; use crate::array::IntoArray; + use crate::scalar::{NullScalar, Scalar}; use crate::serde::test::roundtrip_array; #[test] @@ -55,6 +63,7 @@ mod test { vec![7u64, 37, 71, 97].into_array(), PrimitiveArray::from_iter(vec![Some(0), None, Some(2), Some(42)]).into_array(), 100, + Scalar::Null(NullScalar::new()), ); let read_arr = roundtrip_array(&arr).unwrap(); diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 78c8c317bf..17327db8c9 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -12,6 +12,7 @@ use vortex::compute::patch::patch; use vortex::match_each_integer_ptype; use vortex::ptype::PType::{I16, I32, I64, I8, U16, U32, U64, U8}; use vortex::ptype::{NativePType, PType}; +use vortex::scalar::NullScalar; use vortex::scalar::{ListScalarVec, Scalar}; use vortex::stats::Stat; use vortex_error::{vortex_bail, vortex_err, VortexResult}; @@ -158,7 +159,7 @@ fn bitpack_patches( values.push(*v); } } - SparseArray::new(indices.into_array(), values.into_array(), parray.len()).into_array() + SparseArray::new(indices.into_array(), values.into_array(), parray.len(), Scalar::Null(NullScalar::new())).into_array() }) } diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index d1f3d11d6b..11f54fdb84 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -1,6 +1,7 @@ use itertools::Itertools; +use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::PrimitiveArray; -use vortex::array::{Array, ArrayRef}; +use vortex::array::{Array, ArrayRef, ArrayValidity}; use vortex::compute::as_contiguous::as_contiguous; use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray}; use vortex::compute::scalar_at::ScalarAtFn; @@ -45,7 +46,17 @@ impl ScalarAtFn for BitPackedArray { return Ok(Scalar::from(0 as $P)); }) } - unpack_single(self, index) + + match self.patches.clone() { + None => unpack_single(self, index), + Some(patch) => { + if patch.is_valid(index) { + ScalarAtFn::scalar_at(patch.as_sparse(), index) + } else { + unpack_single(self, index) + } + } + } } }