Skip to content

Commit

Permalink
validity() for SparseArray, scalar_at respects patches, flatten respe…
Browse files Browse the repository at this point in the history
…cts fill_value
  • Loading branch information
jdcasale committed Apr 4, 2024
1 parent 85f79ad commit 4a2bebf
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,4 @@ benchmarks/.out

# Scratch
*.txt
*.vortex
2 changes: 2 additions & 0 deletions vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex::compress::{CompressConfig, CompressCtx, EncodingCompression};
use vortex::compute::flatten::flatten_primitive;
use vortex::compute::patch::patch;
use vortex::ptype::{NativePType, PType};
use vortex::scalar::Scalar;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::alp::ALPFloat;
Expand Down Expand Up @@ -99,6 +100,7 @@ where
PrimitiveArray::from(exc_pos).into_array(),
PrimitiveArray::from(exc).into_array(),
len,
Scalar::null(values.dtype()),
)
.into_array()
}),
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/array/sparse/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::compress::{CompressConfig, CompressCtx, EncodingCompression};
use crate::scalar::{NullScalar, Scalar};

impl EncodingCompression for SparseEncoding {
fn cost(&self) -> u8 {
Expand Down Expand Up @@ -32,6 +33,7 @@ impl EncodingCompression for SparseEncoding {
ctx.named("values")
.compress(sparse_array.values(), sparse_like.map(|sa| sa.values()))?,
sparse_array.len(),
Scalar::Null(NullScalar::new()),
)
.into_array())
}
Expand Down
31 changes: 26 additions & 5 deletions vortex-array/src/array/sparse/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ impl ArrayCompute for SparseArray {

impl AsContiguousFn for SparseArray {
fn as_contiguous(&self, arrays: &[ArrayRef]) -> VortexResult<ArrayRef> {
let fill_types = arrays
.iter()
.map(|a| a.as_sparse().clone().fill_value)
.dedup()
.collect_vec();
assert_eq!(
1,
fill_types.len(),
"Cannot concatenate SparseArrays with differing fill values"
);
Ok(SparseArray::new(
as_contiguous(
&arrays
Expand All @@ -46,23 +56,30 @@ impl AsContiguousFn for SparseArray {
.collect_vec(),
)?,
arrays.iter().map(|a| a.len()).sum(),
fill_types.first().unwrap().clone(),
)
.into_array())
}
}

#[allow(unreachable_code)]
impl FlattenFn for SparseArray {
fn flatten(&self) -> VortexResult<FlattenedArray> {
// Resolve our indices into a vector of usize applying the offset
let indices = self.resolved_indices();

let mut validity = BooleanBufferBuilder::new(self.len());
validity.append_n(self.len(), false);

let values = flatten(self.values())?;
let null_fill = self.fill_value.is_null();
if let FlattenedArray::Primitive(parray) = values {
match_each_native_ptype!(parray.ptype(), |$P| {
let mut values = vec![$P::default(); self.len()];
let mut values = if null_fill {
vec![$P::default(); self.len()]
} else {
let p_fill_value: $P = self.fill_value.clone().try_into()?;
vec![p_fill_value; self.len()]
};
let mut offset = 0;

for v in parray.typed_data::<$P>() {
Expand All @@ -73,11 +90,15 @@ impl FlattenFn for SparseArray {
}

let validity = validity.finish();

Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
if null_fill {
Ok(FlattenedArray::Primitive(PrimitiveArray::from_nullable(
values,
Some(validity.into()),
)))
)))
} else {
Ok(FlattenedArray::Primitive(PrimitiveArray::from(values)))
}

})
} else {
Err(vortex_err!(
Expand Down
70 changes: 57 additions & 13 deletions vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use linkme::distributed_slice;
use vortex_error::{vortex_bail, VortexResult};
use vortex_schema::DType;

use crate::array::constant::ConstantArray;
use crate::array::validity::Validity;
use crate::array::{check_slice_bounds, Array, ArrayRef};
use crate::compress::EncodingCompression;
Expand All @@ -15,6 +16,7 @@ use crate::compute::ArrayCompute;
use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS};
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::ptype::PType;
use crate::scalar::{BoolScalar, Scalar};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsCompute, StatsSet};
use crate::{impl_array, ArrayWalker};
Expand All @@ -31,22 +33,29 @@ pub struct SparseArray {
indices_offset: usize,
len: usize,
stats: Arc<RwLock<StatsSet>>,
fill_value: Scalar,
}

impl SparseArray {
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize) -> Self {
Self::try_new(indices, values, len).unwrap()
pub fn new(indices: ArrayRef, values: ArrayRef, len: usize, fill_value: Scalar) -> Self {
Self::try_new(indices, values, len, fill_value).unwrap()
}

pub fn try_new(indices: ArrayRef, values: ArrayRef, len: usize) -> VortexResult<Self> {
Self::new_with_offset(indices, values, len, 0)
pub fn try_new(
indices: ArrayRef,
values: ArrayRef,
len: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
Self::new_with_offset(indices, values, len, 0, fill_value)
}

pub(crate) fn new_with_offset(
indices: ArrayRef,
values: ArrayRef,
len: usize,
indices_offset: usize,
fill_value: Scalar,
) -> VortexResult<Self> {
if !matches!(indices.dtype(), &DType::IDX) {
vortex_bail!("Cannot use {} as indices", indices.dtype());
Expand All @@ -58,6 +67,7 @@ impl SparseArray {
indices_offset,
len,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value,
})
}

Expand Down Expand Up @@ -111,7 +121,16 @@ impl Array for SparseArray {
}

fn validity(&self) -> Option<Validity> {
todo!()
let validity = SparseArray {
indices: self.indices.clone(),
values: ConstantArray::new(Scalar::Bool(BoolScalar::non_nullable(true)), self.len)
.into_array(),
indices_offset: self.indices_offset,
len: self.len,
stats: self.stats.clone(),
fill_value: Scalar::Bool(BoolScalar::non_nullable(false)),
};
Some(Validity::Array(validity.into_array()))
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Expand All @@ -127,6 +146,7 @@ impl Array for SparseArray {
values: self.values.slice(index_start_index, index_end_index)?,
len: stop - start,
stats: Arc::new(RwLock::new(StatsSet::new())),
fill_value: self.fill_value.clone(),
}
.into_array())
}
Expand Down Expand Up @@ -196,19 +216,31 @@ impl Encoding for SparseEncoding {
mod test {
use itertools::Itertools;
use vortex_error::VortexError;
use vortex_schema::Nullability::Nullable;
use vortex_schema::Signedness::Signed;
use vortex_schema::{DType, IntWidth};

use crate::array::sparse::SparseArray;
use crate::array::Array;
use crate::array::IntoArray;
use crate::compute::flatten::flatten_primitive;
use crate::compute::scalar_at::scalar_at;
use crate::scalar::Scalar;

fn sparse_array() -> SparseArray {
fn nullable_fill() -> Scalar {
Scalar::null(&DType::Int(IntWidth::_32, Signed, Nullable))
}
fn non_nullable_fill() -> Scalar {
Scalar::from(42i32)
}

fn sparse_array(fill_value: Scalar) -> SparseArray {
// merged array: [null, null, 100, null, null, 200, null, null, 300, null]
SparseArray::new(
vec![2u64, 5, 8].into_array(),
vec![100i32, 200, 300].into_array(),
10,
fill_value,
)
}

Expand All @@ -223,7 +255,7 @@ mod test {
#[test]
pub fn iter() {
assert_sparse_array(
&sparse_array(),
&sparse_array(nullable_fill()),
&[
None,
None,
Expand All @@ -241,15 +273,27 @@ mod test {

#[test]
pub fn iter_sliced() {
let p_fill_val = Some(non_nullable_fill().try_into().unwrap());
assert_sparse_array(
sparse_array(non_nullable_fill())
.slice(2, 7)
.unwrap()
.as_ref(),
&[Some(100), p_fill_val, p_fill_val, Some(200), p_fill_val],
);
}

#[test]
pub fn iter_sliced_nullable() {
assert_sparse_array(
sparse_array().slice(2, 7).unwrap().as_ref(),
sparse_array(nullable_fill()).slice(2, 7).unwrap().as_ref(),
&[Some(100), None, None, Some(200), None],
);
}

#[test]
pub fn iter_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_sparse_array(
sliced_once.as_ref(),
&[None, Some(100), None, None, Some(200), None, None],
Expand All @@ -263,10 +307,10 @@ mod test {
#[test]
pub fn test_scalar_at() {
assert_eq!(
usize::try_from(scalar_at(&sparse_array(), 2).unwrap()).unwrap(),
usize::try_from(scalar_at(&sparse_array(nullable_fill()), 2).unwrap()).unwrap(),
100
);
let error = scalar_at(&sparse_array(), 10).err().unwrap();
let error = scalar_at(&sparse_array(nullable_fill()), 10).err().unwrap();
let VortexError::OutOfBounds(i, start, stop, _) = error else {
unreachable!()
};
Expand All @@ -277,7 +321,7 @@ mod test {

#[test]
pub fn scalar_at_sliced() {
let sliced = sparse_array().slice(2, 7).unwrap();
let sliced = sparse_array(nullable_fill()).slice(2, 7).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced.as_ref(), 0).unwrap()).unwrap(),
100
Expand All @@ -293,7 +337,7 @@ mod test {

#[test]
pub fn scalar_at_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
let sliced_once = sparse_array(nullable_fill()).slice(1, 8).unwrap();
assert_eq!(
usize::try_from(scalar_at(sliced_once.as_ref(), 1).unwrap()).unwrap(),
100
Expand Down
15 changes: 12 additions & 3 deletions vortex-array/src/array/sparse/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex_schema::DType;

use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::scalar::{NullScalar, Scalar};
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for SparseArray {
Expand Down Expand Up @@ -34,9 +35,15 @@ impl EncodingSerde for SparseEncoding {
let offset = ctx.read_usize()?;
let indices = ctx.with_schema(&DType::IDX).read()?;
let values = ctx.read()?;
Ok(SparseArray::new_with_offset(indices, values, len, offset)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?
.into_array())
Ok(SparseArray::new_with_offset(
indices,
values,
len,
offset,
Scalar::Null(NullScalar::new()),
)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?
.into_array())
}
}

Expand All @@ -47,6 +54,7 @@ mod test {
use crate::array::sparse::SparseArray;
use crate::array::Array;
use crate::array::IntoArray;
use crate::scalar::{NullScalar, Scalar};
use crate::serde::test::roundtrip_array;

#[test]
Expand All @@ -55,6 +63,7 @@ mod test {
vec![7u64, 37, 71, 97].into_array(),
PrimitiveArray::from_iter(vec![Some(0), None, Some(2), Some(42)]).into_array(),
100,
Scalar::Null(NullScalar::new()),
);

let read_arr = roundtrip_array(&arr).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use vortex::compute::patch::patch;
use vortex::match_each_integer_ptype;
use vortex::ptype::PType::{I16, I32, I64, I8, U16, U32, U64, U8};
use vortex::ptype::{NativePType, PType};
use vortex::scalar::NullScalar;
use vortex::scalar::{ListScalarVec, Scalar};
use vortex::stats::Stat;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
Expand Down Expand Up @@ -158,7 +159,7 @@ fn bitpack_patches(
values.push(*v);
}
}
SparseArray::new(indices.into_array(), values.into_array(), parray.len()).into_array()
SparseArray::new(indices.into_array(), values.into_array(), parray.len(), Scalar::Null(NullScalar::new())).into_array()
})
}

Expand Down
13 changes: 12 additions & 1 deletion vortex-fastlanes/src/bitpacking/compute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use itertools::Itertools;
use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::compute::as_contiguous::as_contiguous;
Expand Down Expand Up @@ -45,7 +46,17 @@ impl ScalarAtFn for BitPackedArray {
return Ok(Scalar::from(0 as $P));
})
}
unpack_single(self, index)

match self.patches.clone() {
None => unpack_single(self, index),
Some(patch) => {
if patch.is_valid(index) {
ScalarAtFn::scalar_at(patch.as_sparse(), index)
} else {
unpack_single(self, index)
}
}
}
}
}

Expand Down

0 comments on commit 4a2bebf

Please sign in to comment.