Skip to content

Commit

Permalink
Implement Validity for SparseArray, make scalar_at for bitpacked arra…
Browse files Browse the repository at this point in the history
…y respect patches (#194)

before:
Issue: #193

#147 uncovered that we did not
respect patches in scalar_at calculations, causing a panic when REE ends
arrays were bitpacked with patches.


After:
* We have a validity is implementation for BitPackedArray
* ScalarAtFn for BitPackedArray no longer ignores patches
* benches/compress no longer panics
* flatten respects fill_value

---------

Co-authored-by: Will Manning <[email protected]>
Co-authored-by: Robert Kruszewski <[email protected]>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent 5013e56 commit 02d8180
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ benchmarks/.out

# Scratch
*.txt
*.vortex
2 changes: 2 additions & 0 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::patch::patch;
use vortex::ptype::{NativePType, PType};
use vortex::scalar::Scalar;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::alp::ALPFloat;
Expand Down Expand Up @@ -99,6 +100,7 @@ where
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from(exc).into_array(),
len,
Scalar::null(&values.dtype().as_nullable()),
)
.into_array()
}),
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl EncodingCompression for SparseEncoding {
ctx.named("values")
.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
sparse_array.fill_value.clone(),
)
.into_array())
}
Expand Down
93 changes: 60 additions & 33 deletions vortex-array/src/array/sparse/compute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_buffer::BooleanBufferBuilder;
use itertools::Itertools;
use vortex_error::{vortex_err, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::primitive::PrimitiveArray;
Expand All @@ -12,6 +12,7 @@ use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::compute::ArrayCompute;
use crate::match_each_native_ptype;
use crate::ptype::NativePType;
use crate::scalar::Scalar;

impl ArrayCompute for SparseArray {
Expand All @@ -30,6 +31,14 @@ impl ArrayCompute for SparseArray {

impl AsContiguousFn for SparseArray {
fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult<ArrayRef> {
let all_fill_types_are_equal = arrays
.iter()
.map(|a| a.as_sparse().fill_value())
.all_equal();
if !all_fill_types_are_equal {
vortex_bail!("Cannot concatenate SparseArrays with differing fill values");
}

Ok(SparseArray::new(
as_contiguous(
&arrays
Expand All @@ -46,6 +55,7 @@ impl AsContiguousFn for SparseArray {
.collect_vec(),
)?,
arrays.iter().map(|a| a.len()).sum(),
self.fill_value().clone(),
)
.into_array())
}
Expand All @@ -58,26 +68,17 @@ impl FlattenFn for SparseArray {

let mut validity = BooleanBufferBuilder::new(self.len());
validity.append_n(self.len(), false);

let values = flatten(self.values())?;
if let FlattenedArray::Primitive(parray) = values {
let null_fill = self.fill_value().is_null();
if let FlattenedArray::Primitive(ref parray) = values {
match_each_native_ptype!(parray.ptype(), |$P| {
let mut values = vec![$P::default(); self.len()];
let mut offset = 0;

for v in parray.typed_data::<$P>() {
let idx = indices[offset];
values[idx] = *v;
validity.set_bit(idx, true);
offset += 1;
}

let validity = validity.finish();

Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
values,
Some(validity.into()),
)))
flatten_primitive::<$P>(
self,
parray,
indices,
null_fill,
validity
)
})
} else {
Err(vortex_err!(
Expand All @@ -86,26 +87,52 @@ impl FlattenFn for SparseArray {
}
}
}
fn flatten_primitive<T: NativePType>(
sparse_array: &SparseArray,
parray: &PrimitiveArray,
indices: Vec<usize>,
null_fill: bool,
mut validity: BooleanBufferBuilder,
) -> VortexResult<FlattenedArray> {
let fill_value = if null_fill {
T::default()
} else {
sparse_array.fill_value.clone().try_into()?
};
let mut values = vec![fill_value; sparse_array.len()];

for (offset, v) in parray.typed_data::<T>().iter().enumerate() {
let idx = indices[offset];
values[idx] = *v;
validity.set_bit(idx, true);
}

let validity = validity.finish();
if null_fill {
Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
values,
Some(validity.into()),
)))
} else {
Ok(FlattenedArray::Primitive(PrimitiveArray::from(values)))
}
}

impl ScalarAtFn for SparseArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
// Check whether `true_patch_index` exists in the patch index array
// First, get the index of the patch index array that is the first index
// greater than or equal to the true index
let true_patch_index = index + self.indices_offset;
search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left).and_then(|idx| {
// If the value at this index is equal to the true index, then it exists in the patch index array,
// and we should return the value at the corresponding index in the patch values array
scalar_at(self.indices(), idx)
.or_else(|_| Ok(Scalar::null(self.values().dtype())))
.and_then(usize::try_from)
.and_then(|patch_index| {
if patch_index == true_patch_index {
scalar_at(self.values(), idx)
} else {
Ok(Scalar::null(self.values().dtype()))
}
})
})
let idx = search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left)?;

// If the value at this index is equal to the true index, then it exists in the patch index array,
// and we should return the value at the corresponding index in the patch values array
let patch_index: usize = scalar_at(self.indices(), idx)?.try_into()?;
if patch_index == true_patch_index {
scalar_at(self.values(), idx)?.cast(self.dtype())
} else {
Ok(self.fill_value().clone())
}
}
}
77 changes: 63 additions & 14 deletions vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use linkme::distributed_slice;
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::DType;

use crate::array::constant::ConstantArray;
use crate::array::validity::Validity;
use crate::array::{check_slice_bounds, Array, ArrayRef};
use crate::compress::EncodingCompression;
Expand All @@ -15,6 +16,7 @@ use crate::compute::ArrayCompute;
use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS};
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::ptype::PType;
use crate::scalar::{BoolScalar, Scalar};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsCompute, StatsSet};
use crate::{impl_array, ArrayWalker};
Expand All @@ -31,22 +33,29 @@ pub struct SparseArray {
indices_offset: usize,
len: usize,
stats: Arc<RwLock<StatsSet>>,
fill_value: Scalar,
}

impl SparseArray {
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize) -> Self {
Self::try_new(indices, values, len).unwrap()
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize, fill_value: Scalar) -> Self {
Self::try_new(indices, values, len, fill_value).unwrap()
}

pub fn try_new(indices: ArrayRef, values: ArrayRef, len: usize) -> VortexResult<Self> {
Self::new_with_offset(indices, values, len, 0)
pub fn try_new(
indices: ArrayRef,
values: ArrayRef,
len: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
Self::try_new_with_offset(indices, values, len, 0, fill_value)
}

pub(crate) fn new_with_offset(
pub(crate) fn try_new_with_offset(
indices: ArrayRef,
values: ArrayRef,
len: usize,
indices_offset: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
if !matches!(indices.dtype(), &DType::IDX) {
vortex_bail!("Cannot use {} as indices", indices.dtype());
Expand All @@ -58,6 +67,7 @@ impl SparseArray {
indices_offset,
len,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value,
})
}

Expand All @@ -76,6 +86,11 @@ impl SparseArray {
&self.indices
}

#[inline]
fn fill_value(&self) -> &Scalar {
&self.fill_value
}

/// Return indices as a vector of usize with the indices_offset applied.
pub fn resolved_indices(&self) -> Vec<usize> {
flatten_primitive(cast(self.indices(), PType::U64.into()).unwrap().as_ref())
Expand Down Expand Up @@ -111,7 +126,16 @@ impl Array for SparseArray {
}

fn validity(&self) -> Option<Validity> {
todo!()
let validity = SparseArray {
indices: self.indices.clone(),
values: ConstantArray::new(Scalar::Bool(BoolScalar::non_nullable(true)), self.len)
.into_array(),
indices_offset: self.indices_offset,
len: self.len,
stats: self.stats.clone(),
fill_value: Scalar::Bool(BoolScalar::non_nullable(false)),
};
Some(Validity::Array(validity.into_array()))
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Expand All @@ -127,6 +151,7 @@ impl Array for SparseArray {
values: self.values.slice(index_start_index, index_end_index)?,
len: stop - start,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value: self.fill_value.clone(),
}
.into_array())
}
Expand Down Expand Up @@ -196,19 +221,31 @@ impl Encoding for SparseEncoding {
mod test {
use itertools::Itertools;
use vortex_error::VortexError;
use vortex_schema::Nullability::Nullable;
use vortex_schema::Signedness::Signed;
use vortex_schema::{DType, IntWidth};

use crate::array::sparse::SparseArray;
use crate::array::Array;
use crate::array::IntoArray;
use crate::compute::flatten::flatten_primitive;
use crate::compute::scalar_at::scalar_at;
use crate::scalar::Scalar;

fn sparse_array() -> SparseArray {
fn nullable_fill() -> Scalar {
Scalar::null(&DType::Int(IntWidth::_32, Signed, Nullable))
}
fn non_nullable_fill() -> Scalar {
Scalar::from(42i32)
}

fn sparse_array(fill_value: Scalar) -> SparseArray {
// merged array: [null, null, 100, null, null, 200, null, null, 300, null]
SparseArray::new(
vec![2u64, 5, 8].into_array(),
vec![100i32, 200, 300].into_array(),
10,
fill_value,
)
}

Expand All @@ -223,7 +260,7 @@ mod test {
#[test]
pub fn iter() {
assert_sparse_array(
&sparse_array(),
&sparse_array(nullable_fill()),
&[
None,
None,
Expand All @@ -241,15 +278,27 @@ mod test {

#[test]
pub fn iter_sliced() {
let p_fill_val = Some(non_nullable_fill().try_into().unwrap());
assert_sparse_array(
sparse_array(non_nullable_fill())
.slice(2, 7)
.unwrap()
.as_ref(),
&[Some(100), p_fill_val, p_fill_val, Some(200), p_fill_val],
);
}

#[test]
pub fn iter_sliced_nullable() {
assert_sparse_array(
sparse_array().slice(2, 7).unwrap().as_ref(),
sparse_array(nullable_fill()).slice(2, 7).unwrap().as_ref(),
&[Some(100), None, None, Some(200), None],
);
}

#[test]
pub fn iter_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_sparse_array(
sliced_once.as_ref(),
&[None, Some(100), None, None, Some(200), None, None],
Expand All @@ -263,10 +312,10 @@ mod test {
#[test]
pub fn test_scalar_at() {
assert_eq!(
usize::try_from(scalar_at(&sparse_array(), 2).unwrap()).unwrap(),
usize::try_from(scalar_at(&sparse_array(nullable_fill()), 2).unwrap()).unwrap(),
100
);
let error = scalar_at(&sparse_array(), 10).err().unwrap();
let error = scalar_at(&sparse_array(nullable_fill()), 10).err().unwrap();
let VortexError::OutOfBounds(i, start, stop, _) = error else {
unreachable!()
};
Expand All @@ -277,7 +326,7 @@ mod test {

#[test]
pub fn scalar_at_sliced() {
let sliced = sparse_array().slice(2, 7).unwrap();
let sliced = sparse_array(nullable_fill()).slice(2, 7).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced.as_ref(), 0).unwrap()).unwrap(),
100
Expand All @@ -293,7 +342,7 @@ mod test {

#[test]
pub fn scalar_at_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced_once.as_ref(), 1).unwrap()).unwrap(),
100
Expand Down
Loading

0 comments on commit 02d8180

Please sign in to comment.