Skip to content

Commit

Permalink
Sparse Arrays (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 4, 2024
1 parent c8bd527 commit 36c6b72
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 95 deletions.
14 changes: 8 additions & 6 deletions codecz/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,24 @@

extern crate alloc;

use crate::{AlignedVec, ALIGNED_ALLOCATOR};
use alloc::sync::Arc;
use arrow_buffer::{BooleanBuffer, Buffer};
use core::ptr::NonNull;

pub fn into_u32_vec(bb: &BooleanBuffer, cardinality: usize) -> AlignedVec<u32> {
let mut vec: AlignedVec<u32> = AlignedVec::with_capacity_in(cardinality, ALIGNED_ALLOCATOR);
use arrow_buffer::{BooleanBuffer, Buffer};

use crate::{AlignedVec, ALIGNED_ALLOCATOR};

pub fn into_u64_vec(bb: &BooleanBuffer, cardinality: usize) -> AlignedVec<u64> {
let mut vec: AlignedVec<u64> = AlignedVec::with_capacity_in(cardinality, ALIGNED_ALLOCATOR);
if cardinality > 0 {
for idx in bb.set_indices() {
vec.push(idx as u32);
vec.push(idx as u64);
}
}
vec
}

pub fn gather_patches<T: Copy + Sized>(data: &[T], indices: &[u32]) -> AlignedVec<T> {
pub fn gather_patches<T: Copy + Sized>(data: &[T], indices: &[u64]) -> AlignedVec<T> {
let mut vec: AlignedVec<T> = AlignedVec::with_capacity_in(indices.len(), ALIGNED_ALLOCATOR);
for idx in indices {
vec.push(data[*idx as usize]);
Expand Down
2 changes: 1 addition & 1 deletion vortex-alp/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
let patches = if num_exceptions == 0 {
None
} else {
let patch_indices = codecz::utils::into_u32_vec(&exceptions_idx, num_exceptions);
let patch_indices = codecz::utils::into_u64_vec(&exceptions_idx, num_exceptions);
let patch_values = codecz::utils::gather_patches(
values.buffer().typed_data::<T>(),
patch_indices.as_slice(),
Expand Down
4 changes: 2 additions & 2 deletions vortex-ffor/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use log::debug;

use crate::downcast::DowncastFFOR;
use codecz::ffor;
use codecz::ffor::{FforEncoded, SupportsFFoR};
use vortex::array::downcast::DowncastArrayBuiltin;
Expand All @@ -27,6 +26,7 @@ use vortex::ptype::NativePType;
use vortex::scalar::{ListScalarVec, NullableScalar, Scalar};
use vortex::stats::Stat;

use crate::downcast::DowncastFFOR;
use crate::ffor::{FFORArray, FFoREncoding};

impl EncodingCompression for FFoREncoding {
Expand Down Expand Up @@ -145,7 +145,7 @@ where
} else {
let (patch_values, patch_indices) =
ffor::collect_exceptions(values, num_bits, min_val, num_exceptions).unwrap();
let patch_indices = codecz::utils::into_u32_vec(&patch_indices, num_exceptions);
let patch_indices = codecz::utils::into_u64_vec(&patch_indices, num_exceptions);
Some(
SparseArray::new(
PrimitiveArray::from_vec_in(patch_indices).boxed(),
Expand Down
155 changes: 72 additions & 83 deletions vortex/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,27 @@ use std::any::Any;
use std::iter;
use std::sync::{Arc, RwLock};

use arrow::array::{
Array as ArrowArray, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
};
use arrow::datatypes::{Field, Fields};
use num_traits::AsPrimitive;
use arrow::array::AsArray;
use arrow::array::BooleanBufferBuilder;
use arrow::array::{ArrayRef as ArrowArrayRef, PrimitiveArray as ArrowPrimitiveArray};
use arrow::buffer::{NullBuffer, ScalarBuffer};
use arrow::datatypes::UInt64Type;
use linkme::distributed_slice;

use crate::array::ENCODINGS;
use crate::array::{
check_index_bounds, check_slice_bounds, Array, ArrayRef, ArrowIterator, Encoding, EncodingId,
EncodingRef,
};
use crate::array::{ArrowArrayRef, ENCODINGS};
use crate::arrow::CombineChunks;
use crate::compress::EncodingCompression;
use crate::compute::search_sorted::{search_sorted_usize, SearchSortedSide};
use crate::dtype::{DType, Nullability, Signedness};
use crate::dtype::DType;
use crate::error::{VortexError, VortexResult};
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::match_arrow_numeric_type;
use crate::scalar::{NullableScalar, Scalar};
use crate::serde::{ArraySerde, EncodingSerde};
use crate::stats::{Stats, StatsSet};
use arrow::array::AsArray;
use itertools::Itertools;
use linkme::distributed_slice;

mod compress;
mod serde;
Expand All @@ -52,7 +49,6 @@ pub struct SparseArray {
// Offset value for patch indices as a result of slicing
indices_offset: usize,
len: usize,
dtype: DType,
stats: Arc<RwLock<StatsSet>>,
}

Expand All @@ -71,26 +67,14 @@ impl SparseArray {
len: usize,
indices_offset: usize,
) -> VortexResult<Self> {
if !matches!(
indices.dtype(),
DType::Int(_, Signedness::Unsigned, Nullability::NonNullable)
) {
if !matches!(indices.dtype(), &DType::IDX) {
return Err(VortexError::InvalidDType(indices.dtype().clone()));
}

let dtype = DType::Struct(
vec![
Arc::new("indices".to_string()),
Arc::new("values".to_string()),
],
vec![indices.dtype().clone(), values.dtype().clone()],
);

Ok(Self {
indices,
values,
indices_offset,
dtype,
len,
stats: Arc::new(RwLock::new(StatsSet::new())),
})
Expand Down Expand Up @@ -140,7 +124,7 @@ impl Array for SparseArray {

#[inline]
fn dtype(&self) -> &DType {
&self.dtype
self.values().dtype()
}

#[inline]
Expand Down Expand Up @@ -175,37 +159,37 @@ impl Array for SparseArray {
}

fn iter_arrow(&self) -> Box<ArrowIterator> {
// TODO(robert): Use compute dispatch to perform subtract
let indices_array = match_arrow_numeric_type!(self.indices().dtype(), |$E| {
let indices: Vec<<$E as ArrowPrimitiveType>::Native> = self
.indices()
.iter_arrow()
.flat_map(|c| {
let ends = c.as_primitive::<$E>()
.values()
.iter()
.map(|v| *v - AsPrimitive::<<$E as ArrowPrimitiveType>::Native>::as_(self.indices_offset))
.collect::<Vec<_>>();
ends.into_iter()
})
.collect();
Arc::new(ArrowPrimitiveArray::<$E>::from(indices)) as ArrowArrayRef
// Resolve our indices into a vector of usize applying the offset
let mut indices = Vec::with_capacity(self.len());
self.indices().iter_arrow().for_each(|c| {
indices.extend(
c.as_primitive::<UInt64Type>()
.values()
.into_iter()
.map(|v| (*v as usize) - self.indices_offset),
)
});

let array: ArrowArrayRef = match_arrow_numeric_type!(self.values().dtype(), |$E| {
let mut validity = BooleanBufferBuilder::new(self.len());
validity.append_n(self.len(), false);
let mut values = vec![<$E as ArrowPrimitiveType>::Native::default(); self.len()];
let mut offset = 0;
for values_array in self.values().iter_arrow() {
for v in values_array.as_primitive::<$E>().values() {
let idx = indices[offset];
values[idx] = *v;
validity.set_bit(idx, true);
offset += 1;
}
}
Arc::new(ArrowPrimitiveArray::<$E>::new(
ScalarBuffer::from(values),
Some(NullBuffer::from(validity.finish())),
))
});

let DType::Struct(names, children) = self.dtype() else {
unreachable!("DType should have been a struct")
};
let fields: Fields = names
.iter()
.zip_eq(children)
.map(|(name, dtype)| Field::new(name.as_str(), dtype.into(), dtype.is_nullable()))
.map(Arc::new)
.collect();
Box::new(iter::once(Arc::new(ArrowStructArray::new(
fields,
vec![indices_array, self.values.iter_arrow().combine_chunks()],
None,
)) as Arc<dyn ArrowArray>))
Box::new(iter::once(array))
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Expand All @@ -219,7 +203,6 @@ impl Array for SparseArray {
indices_offset: self.indices_offset + start,
indices: self.indices.slice(index_start_index, index_end_index)?,
values: self.values.slice(index_start_index, index_end_index)?,
dtype: self.dtype.clone(),
len: stop - start,
stats: Arc::new(RwLock::new(StatsSet::new())),
}
Expand Down Expand Up @@ -281,62 +264,68 @@ impl Encoding for SparseEncoding {

#[cfg(test)]
mod test {
use arrow::array::AsArray;
use arrow::datatypes::Int32Type;
use itertools::Itertools;

use crate::array::sparse::SparseArray;
use crate::array::Array;
use crate::error::VortexError;
use arrow::array::AsArray;
use arrow::datatypes::{Int32Type, UInt32Type};

fn sparse_array() -> SparseArray {
// merged array: [null, null, 100, null, null, 200, null, null, 300, null]
SparseArray::new(vec![2u32, 5, 8].into(), vec![100, 200, 300].into(), 10)
SparseArray::new(vec![2u64, 5, 8].into(), vec![100i32, 200, 300].into(), 10)
}

fn assert_sparse_array(sparse: &dyn Array, values: (&[u32], &[i32])) {
let sparse_arrow = sparse.as_ref().iter_arrow().next().unwrap();
assert_eq!(
*sparse_arrow
.as_struct()
.column_by_name("indices")
.unwrap()
.as_primitive::<UInt32Type>()
.values(),
values.0
);
assert_eq!(
*sparse_arrow
.as_struct()
.column_by_name("values")
.unwrap()
.as_primitive::<Int32Type>()
.values(),
values.1
);
fn assert_sparse_array(sparse: &dyn Array, values: &[Option<i32>]) {
let sparse_arrow = sparse
.as_ref()
.iter_arrow()
.next()
.unwrap()
.as_primitive::<Int32Type>()
.into_iter()
.collect_vec();
assert_eq!(sparse_arrow, values);
}

#[test]
pub fn iter() {
assert_sparse_array(
sparse_array().as_ref(),
(&[2u32, 5, 8], &[100i32, 200, 300]),
&[
None,
None,
Some(100),
None,
None,
Some(200),
None,
None,
Some(300),
None,
],
);
}

#[test]
pub fn iter_sliced() {
assert_sparse_array(
sparse_array().slice(2, 7).unwrap().as_ref(),
(&[0u32, 3], &[100i32, 200]),
&[Some(100), None, None, Some(200), None],
);
}

#[test]
pub fn iter_sliced_twice() {
let sliced_once = sparse_array().slice(1, 8).unwrap();
assert_sparse_array(sliced_once.as_ref(), (&[1u32, 4], &[100i32, 200]));
assert_sparse_array(
sliced_once.as_ref(),
&[None, Some(100), None, None, Some(200), None, None],
);
assert_sparse_array(
sliced_once.slice(1, 6).unwrap().as_ref(),
(&[0u32, 3], &[100i32, 200]),
&[Some(100), None, None, Some(200), None],
);
}

Expand Down
7 changes: 4 additions & 3 deletions vortex/src/array/sparse/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::io::ErrorKind;

use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::dtype::DType;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for SparseArray {
Expand All @@ -33,8 +34,8 @@ impl EncodingSerde for SparseEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
let len = ctx.read_usize()?;
let offset = ctx.read_usize()?;
let indices = ctx.subfield(0).read()?;
let values = ctx.subfield(1).read()?;
let indices = ctx.with_schema(&DType::IDX).read()?;
let values = ctx.read()?;
Ok(SparseArray::new_with_offset(indices, values, len, offset)
.map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?
.boxed())
Expand All @@ -52,7 +53,7 @@ mod test {
#[test]
fn roundtrip() {
let arr = SparseArray::new(
PrimitiveArray::from_vec(vec![7u8, 37, 71, 97]).boxed(),
PrimitiveArray::from_vec(vec![7u64, 37, 71, 97]).boxed(),
PrimitiveArray::from_iter(vec![Some(0), None, Some(2), Some(42)]).boxed(),
100,
);
Expand Down
7 changes: 7 additions & 0 deletions vortex/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ pub enum DType {
}

impl DType {
/// The default DType for indices
pub const IDX: DType = Int(
IntWidth::_64,
Signedness::Unsigned,
Nullability::NonNullable,
);

pub fn is_nullable(&self) -> bool {
use Nullability::*;

Expand Down

0 comments on commit 36c6b72

Please sign in to comment.