Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Validity for SparseArray, make scalar_at for bitpacked array respect patches #194

Merged
merged 13 commits into from
Apr 4, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ benchmarks/.out

# Scratch
*.txt
*.vortex
2 changes: 2 additions & 0 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::patch::patch;
use vortex::ptype::{NativePType, PType};
use vortex::scalar::Scalar;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::alp::ALPFloat;
Expand Down Expand Up @@ -99,6 +100,7 @@ where
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from(exc).into_array(),
len,
Scalar::null(&values.dtype().as_nullable()),
)
.into_array()
}),
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl EncodingCompression for SparseEncoding {
ctx.named("values")
.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
sparse_array.fill_value.clone(),
)
.into_array())
}
Expand Down
93 changes: 60 additions & 33 deletions vortex-array/src/array/sparse/compute.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow_buffer::BooleanBufferBuilder;
use itertools::Itertools;
use vortex_error::{vortex_err, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::primitive::PrimitiveArray;
Expand All @@ -12,6 +12,7 @@ use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::compute::ArrayCompute;
use crate::match_each_native_ptype;
use crate::ptype::NativePType;
use crate::scalar::Scalar;

impl ArrayCompute for SparseArray {
Expand All @@ -30,6 +31,14 @@ impl ArrayCompute for SparseArray {

impl AsContiguousFn for SparseArray {
fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult<ArrayRef> {
let all_fill_types_are_equal = arrays
.iter()
.map(|a| a.as_sparse().fill_value())
.all_equal();
if !all_fill_types_are_equal {
vortex_bail!("Cannot concatenate SparseArrays with differing fill values");
}

Ok(SparseArray::new(
as_contiguous(
&arrays
Expand All @@ -46,6 +55,7 @@ impl AsContiguousFn for SparseArray {
.collect_vec(),
)?,
arrays.iter().map(|a| a.len()).sum(),
self.fill_value().clone(),
)
.into_array())
}
Expand All @@ -58,26 +68,17 @@ impl FlattenFn for SparseArray {

let mut validity = BooleanBufferBuilder::new(self.len());
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
validity.append_n(self.len(), false);

let values = flatten(self.values())?;
if let FlattenedArray::Primitive(parray) = values {
let null_fill = self.fill_value().is_null();
if let FlattenedArray::Primitive(ref parray) = values {
match_each_native_ptype!(parray.ptype(), |$P| {
let mut values = vec![$P::default(); self.len()];
let mut offset = 0;

for v in parray.typed_data::<$P>() {
let idx = indices[offset];
values[idx] = *v;
validity.set_bit(idx, true);
offset += 1;
}

let validity = validity.finish();

Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
values,
Some(validity.into()),
)))
flatten_primitive::<$P>(
self,
parray,
indices,
null_fill,
validity
)
})
} else {
Err(vortex_err!(
Expand All @@ -86,26 +87,52 @@ impl FlattenFn for SparseArray {
}
}
}
fn flatten_primitive<T: NativePType>(
sparse_array: &SparseArray,
parray: &PrimitiveArray,
indices: Vec<usize>,
null_fill: bool,
mut validity: BooleanBufferBuilder,
) -> VortexResult<FlattenedArray> {
let fill_value = if null_fill {
T::default()
} else {
sparse_array.fill_value.clone().try_into()?
};
let mut values = vec![fill_value; sparse_array.len()];

for (offset, v) in parray.typed_data::<T>().iter().enumerate() {
let idx = indices[offset];
values[idx] = *v;
validity.set_bit(idx, true);
}

let validity = validity.finish();
if null_fill {
Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
values,
Some(validity.into()),
)))
} else {
Ok(FlattenedArray::Primitive(PrimitiveArray::from(values)))
}
}

impl ScalarAtFn for SparseArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
// Check whether `true_patch_index` exists in the patch index array
// First, get the index of the patch index array that is the first index
// greater than or equal to the true index
let true_patch_index = index + self.indices_offset;
search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left).and_then(|idx| {
// If the value at this index is equal to the true index, then it exists in the patch index array,
// and we should return the value at the corresponding index in the patch values array
scalar_at(self.indices(), idx)
.or_else(|_| Ok(Scalar::null(self.values().dtype())))
.and_then(usize::try_from)
.and_then(|patch_index| {
if patch_index == true_patch_index {
scalar_at(self.values(), idx)
} else {
Ok(Scalar::null(self.values().dtype()))
}
})
})
let idx = search_sorted(self.indices(), true_patch_index, SearchSortedSide::Left)?;

// If the value at this index is equal to the true index, then it exists in the patch index array,
// and we should return the value at the corresponding index in the patch values array
let patch_index: usize = scalar_at(self.indices(), idx)?.try_into()?;
if patch_index == true_patch_index {
scalar_at(self.values(), idx)?.cast(self.dtype())
} else {
Ok(self.fill_value().clone())
}
}
}
77 changes: 63 additions & 14 deletions vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use linkme::distributed_slice;
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::DType;

use crate::array::constant::ConstantArray;
use crate::array::validity::Validity;
use crate::array::{check_slice_bounds, Array, ArrayRef};
use crate::compress::EncodingCompression;
Expand All @@ -15,6 +16,7 @@ use crate::compute::ArrayCompute;
use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS};
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::ptype::PType;
use crate::scalar::{BoolScalar, Scalar};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsCompute, StatsSet};
use crate::{impl_array, ArrayWalker};
Expand All @@ -31,22 +33,29 @@ pub struct SparseArray {
indices_offset: usize,
len: usize,
stats: Arc<RwLock<StatsSet>>,
fill_value: Scalar,
}

impl SparseArray {
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize) -> Self {
Self::try_new(indices, values, len).unwrap()
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize, fill_value: Scalar) -> Self {
Self::try_new(indices, values, len, fill_value).unwrap()
}

pub fn try_new(indices: ArrayRef, values: ArrayRef, len: usize) -> VortexResult<Self> {
Self::new_with_offset(indices, values, len, 0)
pub fn try_new(
indices: ArrayRef,
values: ArrayRef,
len: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
Self::try_new_with_offset(indices, values, len, 0, fill_value)
}

pub(crate) fn new_with_offset(
pub(crate) fn try_new_with_offset(
indices: ArrayRef,
values: ArrayRef,
len: usize,
indices_offset: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
if !matches!(indices.dtype(), &DType::IDX) {
vortex_bail!("Cannot use {} as indices", indices.dtype());
Expand All @@ -58,6 +67,7 @@ impl SparseArray {
indices_offset,
len,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value,
})
}

Expand All @@ -76,6 +86,11 @@ impl SparseArray {
&self.indices
}

#[inline]
fn fill_value(&self) -> &Scalar {
&self.fill_value
}

/// Return indices as a vector of usize with the indices_offset applied.
pub fn resolved_indices(&self) -> Vec<usize> {
flatten_primitive(cast(self.indices(), PType::U64.into()).unwrap().as_ref())
Expand Down Expand Up @@ -111,7 +126,16 @@ impl Array for SparseArray {
}

fn validity(&self) -> Option<Validity> {
todo!()
let validity = SparseArray {
indices: self.indices.clone(),
values: ConstantArray::new(Scalar::Bool(BoolScalar::non_nullable(true)), self.len)
.into_array(),
indices_offset: self.indices_offset,
len: self.len,
stats: self.stats.clone(),
fill_value: Scalar::Bool(BoolScalar::non_nullable(false)),
};
Some(Validity::Array(validity.into_array()))
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Expand All @@ -127,6 +151,7 @@ impl Array for SparseArray {
values: self.values.slice(index_start_index, index_end_index)?,
len: stop - start,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value: self.fill_value.clone(),
}
.into_array())
}
Expand Down Expand Up @@ -196,19 +221,31 @@ impl Encoding for SparseEncoding {
mod test {
use itertools::Itertools;
use vortex_error::VortexError;
use vortex_schema::Nullability::Nullable;
use vortex_schema::Signedness::Signed;
use vortex_schema::{DType, IntWidth};

use crate::array::sparse::SparseArray;
use crate::array::Array;
use crate::array::IntoArray;
use crate::compute::flatten::flatten_primitive;
use crate::compute::scalar_at::scalar_at;
use crate::scalar::Scalar;

fn sparse_array() -> SparseArray {
fn nullable_fill() -> Scalar {
Scalar::null(&DType::Int(IntWidth::_32, Signed, Nullable))
}
fn non_nullable_fill() -> Scalar {
Scalar::from(42i32)
}

fn sparse_array(fill_value: Scalar) -> SparseArray {
// merged array: [null, null, 100, null, null, 200, null, null, 300, null]
SparseArray::new(
vec![2u64, 5, 8].into_array(),
vec![100i32, 200, 300].into_array(),
10,
fill_value,
)
}

Expand All @@ -223,7 +260,7 @@ mod test {
#[test]
pub fn iter() {
assert_sparse_array(
&sparse_array(),
&sparse_array(nullable_fill()),
&[
None,
None,
Expand All @@ -241,15 +278,27 @@ mod test {

#[test]
pub fn iter_sliced() {
let p_fill_val = Some(non_nullable_fill().try_into().unwrap());
assert_sparse_array(
sparse_array(non_nullable_fill())
.slice(2, 7)
.unwrap()
.as_ref(),
&[Some(100), p_fill_val, p_fill_val, Some(200), p_fill_val],
);
}

#[test]
pub fn iter_sliced_nullable() {
assert_sparse_array(
sparse_array().slice(2, 7).unwrap().as_ref(),
sparse_array(nullable_fill()).slice(2, 7).unwrap().as_ref(),
&[Some(100), None, None, Some(200), None],
);
}

#[test]
pub fn iter_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_sparse_array(
sliced_once.as_ref(),
&[None, Some(100), None, None, Some(200), None, None],
Expand All @@ -263,10 +312,10 @@ mod test {
#[test]
pub fn test_scalar_at() {
assert_eq!(
usize::try_from(scalar_at(&sparse_array(), 2).unwrap()).unwrap(),
usize::try_from(scalar_at(&sparse_array(nullable_fill()), 2).unwrap()).unwrap(),
100
);
let error = scalar_at(&sparse_array(), 10).err().unwrap();
let error = scalar_at(&sparse_array(nullable_fill()), 10).err().unwrap();
let VortexError::OutOfBounds(i, start, stop, _) = error else {
unreachable!()
};
Expand All @@ -277,7 +326,7 @@ mod test {

#[test]
pub fn scalar_at_sliced() {
let sliced = sparse_array().slice(2, 7).unwrap();
let sliced = sparse_array(nullable_fill()).slice(2, 7).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced.as_ref(), 0).unwrap()).unwrap(),
100
Expand All @@ -293,7 +342,7 @@ mod test {

#[test]
pub fn scalar_at_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced_once.as_ref(), 1).unwrap()).unwrap(),
100
Expand Down
Loading