diff --git a/.gitignore b/.gitignore index d41539df1a..2e32a1b26f 100644 --- a/.gitignore +++ b/.gitignore @@ -189,3 +189,4 @@ benchmarks/.out # Scratch *.txt +*.vortex diff --git a/vortex-alp/src/compress.rs b/vortex-alp/src/compress.rs index 70d146a6c8..3a11fccc0f 100644 --- a/vortex-alp/src/compress.rs +++ b/vortex-alp/src/compress.rs @@ -7,6 +7,7 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; use vortex::compute::flatten::flatten_primitive; use vortex::compute::patch::patch; use vortex::ptype::{NativePType, PType}; +use vortex::scalar::Scalar; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::alp::ALPFloat; @@ -99,6 +100,7 @@ where PrimitiveArray::from(exc_pos).into_array(), PrimitiveArray::from(exc).into_array(), len, + Scalar::null(&values.dtype().as_nullable()), ) .into_array() }), diff --git a/vortex-array/src/array/sparse/compress.rs b/vortex-array/src/array/sparse/compress.rs index 9d0a62e008..97dd58e806 100644 --- a/vortex-array/src/array/sparse/compress.rs +++ b/vortex-array/src/array/sparse/compress.rs @@ -32,6 +32,7 @@ impl EncodingCompression for SparseEncoding { ctx.named("values") .compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?, sparse_array.len(), + sparse_array.fill_value.clone(), ) .into_array()) } diff --git a/vortex-array/src/array/sparse/compute.rs b/vortex-array/src/array/sparse/compute.rs index 837ecb1d08..9e5a2e4ec0 100644 --- a/vortex-array/src/array/sparse/compute.rs +++ b/vortex-array/src/array/sparse/compute.rs @@ -1,6 +1,6 @@ use arrow_buffer::BooleanBufferBuilder; use itertools::Itertools; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::primitive::PrimitiveArray; @@ -12,6 +12,7 @@ use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; use crate::compute::ArrayCompute; use crate::match_each_native_ptype; +use crate::ptype::NativePType; use crate::scalar::Scalar; impl ArrayCompute for SparseArray { @@ -30,6 +31,14 @@ impl ArrayCompute for SparseArray { impl AsContiguousFn for SparseArray { fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult { + let all_fill_types_are_equal = arrays + .iter() + .map(|a| a.as_sparse().fill_value()) + .all_equal(); + if !all_fill_types_are_equal { + vortex_bail!("Cannot concatenate SparseArrays with differing fill values"); + } + Ok(SparseArray::new( as_contiguous( &arrays @@ -46,6 +55,7 @@ impl AsContiguousFn for SparseArray { .collect_vec(), )?, arrays.iter().map(|a| a.len()).sum(), + self.fill_value().clone(), ) .into_array()) } @@ -58,26 +68,17 @@ impl FlattenFn for SparseArray { let mut validity = BooleanBufferBuilder::new(self.len()); validity.append_n(self.len(), false); - let values = flatten(self.values())?; - if let FlattenedArray::Primitive(parray) = values { + let null_fill = self.fill_value().is_null(); + if let FlattenedArray::Primitive(ref parray) = values { match_each_native_ptype!(parray.ptype(), |$P| { - let mut values = vec![$P::default(); self.len()]; - let mut offset = 0; - - for v in parray.typed_data::<$P>() { - let idx = indices[offset]; - values[idx] = *v; - validity.set_bit(idx, true); - offset += 1; - } - - let validity = validity.finish(); - - Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable( - values, - Some(validity.into()), - ))) + flatten_primitive::<$P>( + self, + parray, + indices, + null_fill, + validity + ) }) } else { Err(vortex_err!( @@ -86,6 +87,36 @@ impl FlattenFn for SparseArray { } } } +fn flatten_primitive( + sparse_array: &SparseArray, + parray: &PrimitiveArray, + indices: Vec, + null_fill: bool, + mut validity: BooleanBufferBuilder, +) -> VortexResult { + let fill_value = if null_fill { + T::default() + } else { + sparse_array.fill_value.clone().try_into()? + }; + let mut values = vec![fill_value; sparse_array.len()]; + + for (offset, v) in parray.typed_data::().iter().enumerate() { + let idx = indices[offset]; + values[idx] = *v; + validity.set_bit(idx, true); + } + + let validity = validity.finish(); + if null_fill { + Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable( + values, + Some(validity.into()), + ))) + } else { + Ok(FlattenedArray::Primitive(PrimitiveArray::from(values))) + } +} impl ScalarAtFn for SparseArray { fn scalar_at(&self, index: usize) -> VortexResult { @@ -93,19 +124,15 @@ impl ScalarAtFn for SparseArray { // First, get the index of the patch index array that is the first index // greater than or equal to the true index let true_patch_index = index + self.indices_offset; - search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left).and_then(|idx| { - // If the value at this index is equal to the true index, then it exists in the patch index array, - // and we should return the value at the corresponding index in the patch values array - scalar_at(self.indices(), idx) - .or_else(|_| Ok(Scalar::null(self.values().dtype()))) - .and_then(usize::try_from) - .and_then(|patch_index| { - if patch_index == true_patch_index { - scalar_at(self.values(), idx) - } else { - Ok(Scalar::null(self.values().dtype())) - } - }) - }) + let idx = search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left)?; + + // If the value at this index is equal to the true index, then it exists in the patch index array, + // and we should return the value at the corresponding index in the patch values array + let patch_index: usize = scalar_at(self.indices(), idx)?.try_into()?; + if patch_index == true_patch_index { + scalar_at(self.values(), idx)?.cast(self.dtype()) + } else { + Ok(self.fill_value().clone()) + } } } diff --git a/vortex-array/src/array/sparse/mod.rs b/vortex-array/src/array/sparse/mod.rs index 84a1bc9845..62d5a4309d 100644 --- a/vortex-array/src/array/sparse/mod.rs +++ b/vortex-array/src/array/sparse/mod.rs @@ -5,6 +5,7 @@ use linkme::distributed_slice; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::DType; +use crate::array::constant::ConstantArray; use crate::array::validity::Validity; use crate::array::{check_slice_bounds, Array, ArrayRef}; use crate::compress::EncodingCompression; @@ -15,6 +16,7 @@ use crate::compute::ArrayCompute; use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::ptype::PType; +use crate::scalar::{BoolScalar, Scalar}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsCompute, StatsSet}; use crate::{impl_array, ArrayWalker}; @@ -31,22 +33,29 @@ pub struct SparseArray { indices_offset: usize, len: usize, stats: Arc>, + fill_value: Scalar, } impl SparseArray { - pub fn new(indices: ArrayRef, values: ArrayRef, len: usize) -> Self { - Self::try_new(indices, values, len).unwrap() + pub fn new(indices: ArrayRef, values: ArrayRef, len: usize, fill_value: Scalar) -> Self { + Self::try_new(indices, values, len, fill_value).unwrap() } - pub fn try_new(indices: ArrayRef, values: ArrayRef, len: usize) -> VortexResult { - Self::new_with_offset(indices, values, len, 0) + pub fn try_new( + indices: ArrayRef, + values: ArrayRef, + len: usize, + fill_value: Scalar, + ) -> VortexResult { + Self::try_new_with_offset(indices, values, len, 0, fill_value) } - pub(crate) fn new_with_offset( + pub(crate) fn try_new_with_offset( indices: ArrayRef, values: ArrayRef, len: usize, indices_offset: usize, + fill_value: Scalar, ) -> VortexResult { if !matches!(indices.dtype(), &DType::IDX) { vortex_bail!("Cannot use {} as indices", indices.dtype()); @@ -58,6 +67,7 @@ impl SparseArray { indices_offset, len, stats: Arc::new(RwLock::new(StatsSet::new())), + fill_value, }) } @@ -76,6 +86,11 @@ impl SparseArray { &self.indices } + #[inline] + fn fill_value(&self) -> &Scalar { + &self.fill_value + } + /// Return indices as a vector of usize with the indices_offset applied. pub fn resolved_indices(&self) -> Vec { flatten_primitive(cast(self.indices(), PType::U64.into()).unwrap().as_ref()) @@ -111,7 +126,16 @@ impl Array for SparseArray { } fn validity(&self) -> Option { - todo!() + let validity = SparseArray { + indices: self.indices.clone(), + values: ConstantArray::new(Scalar::Bool(BoolScalar::non_nullable(true)), self.len) + .into_array(), + indices_offset: self.indices_offset, + len: self.len, + stats: self.stats.clone(), + fill_value: Scalar::Bool(BoolScalar::non_nullable(false)), + }; + Some(Validity::Array(validity.into_array())) } fn slice(&self, start: usize, stop: usize) -> VortexResult { @@ -127,6 +151,7 @@ impl Array for SparseArray { values: self.values.slice(index_start_index, index_end_index)?, len: stop - start, stats: Arc::new(RwLock::new(StatsSet::new())), + fill_value: self.fill_value.clone(), } .into_array()) } @@ -196,19 +221,31 @@ impl Encoding for SparseEncoding { mod test { use itertools::Itertools; use vortex_error::VortexError; + use vortex_schema::Nullability::Nullable; + use vortex_schema::Signedness::Signed; + use vortex_schema::{DType, IntWidth}; use crate::array::sparse::SparseArray; use crate::array::Array; use crate::array::IntoArray; use crate::compute::flatten::flatten_primitive; use crate::compute::scalar_at::scalar_at; + use crate::scalar::Scalar; - fn sparse_array() -> SparseArray { + fn nullable_fill() -> Scalar { + Scalar::null(&DType::Int(IntWidth::_32, Signed, Nullable)) + } + fn non_nullable_fill() -> Scalar { + Scalar::from(42i32) + } + + fn sparse_array(fill_value: Scalar) -> SparseArray { // merged array: [null, null, 100, null, null, 200, null, null, 300, null] SparseArray::new( vec![2u64, 5, 8].into_array(), vec![100i32, 200, 300].into_array(), 10, + fill_value, ) } @@ -223,7 +260,7 @@ mod test { #[test] pub fn iter() { assert_sparse_array( - &sparse_array(), + &sparse_array(nullable_fill()), &[ None, None, @@ -241,15 +278,27 @@ mod test { #[test] pub fn iter_sliced() { + let p_fill_val = Some(non_nullable_fill().try_into().unwrap()); + assert_sparse_array( + sparse_array(non_nullable_fill()) + .slice(2, 7) + .unwrap() + .as_ref(), + &[Some(100), p_fill_val, p_fill_val, Some(200), p_fill_val], + ); + } + + #[test] + pub fn iter_sliced_nullable() { assert_sparse_array( - sparse_array().slice(2, 7).unwrap().as_ref(), + sparse_array(nullable_fill()).slice(2, 7).unwrap().as_ref(), &[Some(100), None, None, Some(200), None], ); } #[test] pub fn iter_sliced_twice() { - let sliced_once = sparse_array().slice(1, 8).unwrap(); + let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap(); assert_sparse_array( sliced_once.as_ref(), &[None, Some(100), None, None, Some(200), None, None], @@ -263,10 +312,10 @@ mod test { #[test] pub fn test_scalar_at() { assert_eq!( - usize::try_from(scalar_at(&sparse_array(), 2).unwrap()).unwrap(), + usize::try_from(scalar_at(&sparse_array(nullable_fill()), 2).unwrap()).unwrap(), 100 ); - let error = scalar_at(&sparse_array(), 10).err().unwrap(); + let error = scalar_at(&sparse_array(nullable_fill()), 10).err().unwrap(); let VortexError::OutOfBounds(i, start, stop, _) = error else { unreachable!() }; @@ -277,7 +326,7 @@ mod test { #[test] pub fn scalar_at_sliced() { - let sliced = sparse_array().slice(2, 7).unwrap(); + let sliced = sparse_array(nullable_fill()).slice(2, 7).unwrap(); assert_eq!( usize::try_from(scalar_at(sliced.as_ref(), 0).unwrap()).unwrap(), 100 @@ -293,7 +342,7 @@ mod test { #[test] pub fn scalar_at_sliced_twice() { - let sliced_once = sparse_array().slice(1, 8).unwrap(); + let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap(); assert_eq!( usize::try_from(scalar_at(sliced_once.as_ref(), 1).unwrap()).unwrap(), 100 diff --git a/vortex-array/src/array/sparse/serde.rs b/vortex-array/src/array/sparse/serde.rs index 34930dc6ff..125eb2ca7d 100644 --- a/vortex-array/src/array/sparse/serde.rs +++ b/vortex-array/src/array/sparse/serde.rs @@ -1,11 +1,9 @@ -use std::io; -use std::io::ErrorKind; - use vortex_error::VortexResult; use vortex_schema::DType; use crate::array::sparse::{SparseArray, SparseEncoding}; use crate::array::{Array, ArrayRef}; +use crate::scalar::Scalar; use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx}; impl ArraySerde for SparseArray { @@ -34,9 +32,17 @@ impl EncodingSerde for SparseEncoding { let offset = ctx.read_usize()?; let indices = ctx.with_schema(&DType::IDX).read()?; let values = ctx.read()?; - Ok(SparseArray::new_with_offset(indices, values, len, offset) - .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? - .into_array()) + let fill_type = values.dtype().clone().as_nullable(); + SparseArray::try_new_with_offset( + indices, + values, + len, + offset, + // NB: We should deserialize the fill value from the source, but currently do not, + // so everything that goes through this read path is nullable + Scalar::null(&fill_type), + ) + .map(|a| a.into_array()) } } @@ -47,6 +53,7 @@ mod test { use crate::array::sparse::SparseArray; use crate::array::Array; use crate::array::IntoArray; + use crate::scalar::{NullScalar, Scalar}; use crate::serde::test::roundtrip_array; #[test] @@ -55,6 +62,7 @@ mod test { vec![7u64, 37, 71, 97].into_array(), PrimitiveArray::from_iter(vec![Some(0), None, Some(2), Some(42)]).into_array(), 100, + Scalar::Null(NullScalar::new()), ); let read_arr = roundtrip_array(&arr).unwrap(); diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 78c8c317bf..fa2b24644a 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -158,7 +158,7 @@ fn bitpack_patches( values.push(*v); } } - SparseArray::new(indices.into_array(), values.into_array(), parray.len()).into_array() + SparseArray::new(indices.into_array(), values.into_array(), parray.len(), Scalar::null(&parray.dtype().as_nullable())).into_array() }) } @@ -267,12 +267,8 @@ pub(crate) fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResul }? }; - // Cast to signed if necessary - if ptype.is_signed_int() { - scalar.cast(&ptype.into()) - } else { - Ok(scalar) - } + // Cast to fix signedness and nullability + scalar.cast(array.dtype()) } /// # Safety diff --git a/vortex-fastlanes/src/bitpacking/compute.rs b/vortex-fastlanes/src/bitpacking/compute.rs index d1f3d11d6b..579480bc4e 100644 --- a/vortex-fastlanes/src/bitpacking/compute.rs +++ b/vortex-fastlanes/src/bitpacking/compute.rs @@ -3,7 +3,7 @@ use vortex::array::primitive::PrimitiveArray; use vortex::array::{Array, ArrayRef}; use vortex::compute::as_contiguous::as_contiguous; use vortex::compute::flatten::{flatten_primitive, FlattenFn, FlattenedArray}; -use vortex::compute::scalar_at::ScalarAtFn; +use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::match_each_integer_ptype; @@ -39,13 +39,14 @@ impl ScalarAtFn for BitPackedArray { if index >= self.len() { return Err(vortex_err!(OutOfBounds:index, 0, self.len())); } - if self.bit_width() == 0 { - let ptype = self.dtype().try_into()?; - match_each_integer_ptype!(&ptype, |$P| { - return Ok(Scalar::from(0 as $P)); - }) + + if let Some(patches) = self.patches() { + // NB: All non-null values are considered patches + if self.bit_width == 0 || patches.is_valid(index) { + return scalar_at(patches, index)?.cast(self.dtype()); + } } - unpack_single(self, index) + unpack_single(self, index)?.cast(self.dtype()) } } @@ -83,13 +84,17 @@ impl TakeFn for BitPackedArray { mod test { use std::sync::Arc; + use itertools::Itertools; use vortex::array::downcast::DowncastArrayBuiltin; use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; use vortex::array::Array; - use vortex::compress::{CompressConfig, CompressCtx}; + use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression}; + use vortex::compute::scalar_at::scalar_at; use vortex::compute::take::take; use vortex::encoding::EncodingRef; + use vortex::scalar::Scalar; + use crate::downcast::DowncastFastlanes; use crate::BitPackedEncoding; #[test] @@ -105,4 +110,25 @@ mod test { let res_bytes = result.as_primitive().typed_data::(); assert_eq!(res_bytes, &[0, 62, 31, 33, 9, 18]); } + + #[test] + fn test_scalar_at() { + let cfg = CompressConfig::new().with_enabled([&BitPackedEncoding as EncodingRef]); + let ctx = CompressCtx::new(Arc::new(cfg)); + + let values = (0u32..257).collect_vec(); + let uncompressed = PrimitiveArray::from(values.clone()).into_array(); + let packed = BitPackedEncoding + .compress(&uncompressed, None, ctx) + .unwrap(); + let packed = packed.as_bitpacked(); + assert!(packed.patches().is_some()); + + let patches = packed.patches().unwrap().as_sparse(); + assert_eq!(patches.resolved_indices(), vec![256]); + + values.iter().enumerate().for_each(|(i, v)| { + assert_eq!(scalar_at(packed, i).unwrap(), Scalar::from(*v)); + }); + } } diff --git a/vortex-fastlanes/src/bitpacking/mod.rs b/vortex-fastlanes/src/bitpacking/mod.rs index f4d2b1faf3..68083bb9a5 100644 --- a/vortex-fastlanes/src/bitpacking/mod.rs +++ b/vortex-fastlanes/src/bitpacking/mod.rs @@ -19,6 +19,7 @@ mod compress; mod compute; mod serde; +/// NB: All non-null values in the patches array are considered patches #[derive(Debug, Clone)] pub struct BitPackedArray { encoded: ArrayRef,