Skip to content

Commit

Permalink
Array2: VarBin (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Apr 15, 2024
1 parent 4e85688 commit 9992e94
Show file tree
Hide file tree
Showing 27 changed files with 1,017 additions and 90 deletions.
10 changes: 10 additions & 0 deletions vortex-array2/src/accessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use vortex_error::VortexResult;

pub trait ArrayAccessor {
type Item<'a>;

fn with_iterator<F: for<'a> FnOnce(&mut dyn Iterator<Item = Self::Item<'a>>) -> R, R>(
&self,
f: F,
) -> VortexResult<R>;
}
2 changes: 1 addition & 1 deletion vortex-array2/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl BoolArray<'_> {
length: buffer.len(),
},
vec![Buffer::Owned(buffer.into_inner())].into(),
validity.to_array_data().into_iter().collect_vec().into(),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)
}
Expand Down
10 changes: 6 additions & 4 deletions vortex-array2/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ impl ChunkedArray<'_> {
self.array().child(idx + 1, self.array().dtype())
}

pub fn chunks(&self) -> impl Iterator<Item = Array> {
(0..self.nchunks()).map(|c| self.chunk(c).unwrap())
}

pub fn nchunks(&self) -> usize {
self.chunk_ends().len() - 1
}
Expand Down Expand Up @@ -104,6 +100,12 @@ impl ChunkedArray<'_> {
}
}

impl<'a> ChunkedArray<'a> {
pub fn chunks(&'a self) -> impl Iterator<Item = Array<'a>> {
(0..self.nchunks()).map(|c| self.chunk(c).unwrap())
}
}

impl FromIterator<OwnedArray> for OwnedChunkedArray {
fn from_iter<T: IntoIterator<Item = OwnedArray>>(iter: T) -> Self {
let chunks: Vec<OwnedArray> = iter.into_iter().collect();
Expand Down
1 change: 1 addition & 0 deletions vortex-array2/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pub mod chunked;
pub mod constant;
pub mod primitive;
pub mod r#struct;
pub mod varbin;
6 changes: 6 additions & 0 deletions vortex-array2/src/array/primitive/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::compute::cast::CastFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
use crate::compute::search_sorted::SearchSortedFn;
use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

Expand All @@ -14,6 +15,7 @@ mod cast;
mod fill;
mod scalar_at;
mod search_sorted;
mod slice;
mod take;

impl ArrayCompute for PrimitiveArray<'_> {
Expand Down Expand Up @@ -41,6 +43,10 @@ impl ArrayCompute for PrimitiveArray<'_> {
Some(self)
}

fn slice(&self) -> Option<&dyn SliceFn> {
Some(self)
}

fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}
Expand Down
19 changes: 19 additions & 0 deletions vortex-array2/src/array/primitive/compute/slice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use vortex::match_each_native_ptype;
use vortex_error::VortexResult;

use crate::array::primitive::PrimitiveArray;
use crate::compute::slice::SliceFn;
use crate::IntoArray;
use crate::OwnedArray;

impl SliceFn for PrimitiveArray<'_> {
fn slice(&self, start: usize, stop: usize) -> VortexResult<OwnedArray> {
match_each_native_ptype!(self.ptype(), |$T| {
Ok(PrimitiveArray::try_new(
self.scalar_buffer::<$T>().slice(start, stop - start),
self.validity().slice(start, stop)?,
)?
.into_array())
})
}
}
14 changes: 9 additions & 5 deletions vortex-array2/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ pub struct PrimitiveMetadata {
}

impl PrimitiveArray<'_> {
pub fn buffer(&self) -> &Buffer {
self.array().buffer(0).expect("missing buffer")
}

pub fn validity(&self) -> Validity {
self.metadata()
.validity
Expand All @@ -39,6 +35,14 @@ impl PrimitiveArray<'_> {
self.dtype().try_into().unwrap()
}

pub fn buffer(&self) -> &Buffer {
self.array().buffer(0).expect("missing buffer")
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
ScalarBuffer::new(self.buffer().clone().into(), 0, self.len())
}

pub fn typed_data<T: NativePType>(&self) -> &[T] {
self.buffer().typed_data::<T>()
}
Expand All @@ -55,7 +59,7 @@ impl PrimitiveArray<'_> {
validity: validity.to_metadata(buffer.len())?,
},
vec![Buffer::Owned(buffer.into_inner())].into(),
validity.to_array_data().into_iter().collect_vec().into(),
validity.into_array_data().into_iter().collect_vec().into(),
HashMap::default(),
)
}
Expand Down
49 changes: 49 additions & 0 deletions vortex-array2/src/array/varbin/accessor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use vortex::match_each_integer_ptype;
use vortex_error::VortexResult;

use crate::accessor::ArrayAccessor;
use crate::array::varbin::VarBinArray;
use crate::validity::ArrayValidity;

impl ArrayAccessor for VarBinArray<'_> {
type Item<'a> = Option<&'a [u8]>;

fn with_iterator<F: for<'a> FnOnce(&mut dyn Iterator<Item = Self::Item<'a>>) -> R, R>(
&self,
f: F,
) -> VortexResult<R> {
// TODO(ngates): what happens if bytes is much larger than sliced_bytes?
let primitive = self.bytes().flatten_primitive()?;
let offsets = self.offsets().flatten_primitive()?;
let validity = self.logical_validity().to_null_buffer()?;

match_each_integer_ptype!(offsets.ptype(), |$T| {
let offsets = offsets.typed_data::<$T>();
let bytes = primitive.typed_data::<u8>();

match validity {
None => {
let mut iter = offsets
.iter()
.zip(offsets.iter().skip(1))
.map(|(start, end)| Some(&bytes[*start as usize..*end as usize]));
Ok(f(&mut iter))
}
Some(validity) => {
let mut iter = offsets
.iter()
.zip(offsets.iter().skip(1))
.zip(validity.iter())
.map(|((start, end), valid)| {
if valid {
Some(&bytes[*start as usize..*end as usize])
} else {
None
}
});
Ok(f(&mut iter))
}
}
})
}
}
30 changes: 30 additions & 0 deletions vortex-array2/src/array/varbin/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use vortex_error::VortexResult;

use crate::array::varbin::VarBinArray;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::ArrayTrait;

impl ArrayValidity for VarBinArray<'_> {
fn is_valid(&self, index: usize) -> bool {
self.validity().is_valid(index)
}

fn logical_validity(&self) -> LogicalValidity {
self.validity().to_logical(self.len())
}
}

impl AcceptArrayVisitor for VarBinArray<'_> {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("offsets", &self.offsets())?;
visitor.visit_child("offsets", &self.bytes())?;
visitor.visit_validity(&self.validity())
}
}

impl ArrayTrait for VarBinArray<'_> {
fn len(&self) -> usize {
self.offsets().len() - 1
}
}
94 changes: 94 additions & 0 deletions vortex-array2/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::mem;

use arrow_buffer::NullBufferBuilder;
use vortex::ptype::NativePType;
use vortex_schema::DType;

use crate::array::primitive::PrimitiveArray;
use crate::array::varbin::{OwnedVarBinArray, VarBinArray};
use crate::validity::Validity;
use crate::IntoArray;

pub struct VarBinBuilder<O: NativePType> {
offsets: Vec<O>,
data: Vec<u8>,
validity: NullBufferBuilder,
}

impl<O: NativePType> VarBinBuilder<O> {
pub fn with_capacity(len: usize) -> Self {
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(O::zero());
Self {
offsets,
data: Vec::new(),
validity: NullBufferBuilder::new(len),
}
}

#[inline]
pub fn push(&mut self, value: Option<&[u8]>) {
match value {
Some(v) => self.push_value(v),
None => self.push_null(),
}
}

#[inline]
pub fn push_value(&mut self, value: &[u8]) {
self.offsets
.push(O::from(self.data.len() + value.len()).unwrap());
self.data.extend_from_slice(value);
self.validity.append_non_null();
}

#[inline]
pub fn push_null(&mut self) {
self.offsets.push(self.offsets[self.offsets.len() - 1]);
self.validity.append_null();
}

pub fn finish(&mut self, dtype: DType) -> OwnedVarBinArray {
let offsets = PrimitiveArray::from(mem::take(&mut self.offsets));
let data = PrimitiveArray::from(mem::take(&mut self.data));

let nulls = self.validity.finish();

let validity = if dtype.is_nullable() {
nulls.map(Validity::from).unwrap_or(Validity::AllValid)
} else {
assert!(nulls.is_none(), "dtype and validity mismatch");
Validity::NonNullable
};

VarBinArray::new(offsets.into_array(), data.into_array(), dtype, validity)
}
}

#[cfg(test)]
mod test {
use vortex::scalar::Utf8Scalar;
use vortex_schema::DType;
use vortex_schema::Nullability::Nullable;

use crate::array::varbin::builder::VarBinBuilder;
use crate::compute::scalar_at::scalar_at;
use crate::IntoArray;

#[test]
fn test_builder() {
let mut builder = VarBinBuilder::<i32>::with_capacity(0);
builder.push(Some(b"hello"));
builder.push(None);
builder.push(Some(b"world"));
let array = builder.finish(DType::Utf8(Nullable)).into_array();

assert_eq!(array.len(), 3);
assert_eq!(array.dtype().nullability(), Nullable);
assert_eq!(
scalar_at(&array, 0).unwrap(),
Utf8Scalar::nullable("hello".to_owned()).into()
);
assert!(scalar_at(&array, 1).unwrap().is_null());
}
}
Loading

0 comments on commit 9992e94

Please sign in to comment.