Skip to content

Commit

Permalink
Better iterators for VarBin/VarBinView that don't always copy (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Apr 12, 2024
1 parent f9a3510 commit e7bd331
Show file tree
Hide file tree
Showing 19 changed files with 544 additions and 323 deletions.
5 changes: 3 additions & 2 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,15 @@ impl FromIterator<Option<bool>> for BoolArray {
if validity.is_empty() {
BoolArray::from(values)
} else {
BoolArray::new(BooleanBuffer::from(values), Some(Validity::from(validity)))
BoolArray::new(BooleanBuffer::from(values), Some(validity.into()))
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::array::bool::BoolArray;
use crate::array::Array;
use crate::compute::scalar_at::scalar_at;

#[test]
Expand Down
31 changes: 3 additions & 28 deletions vortex-array/src/array/varbin/accessor.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,13 @@
use num_traits::AsPrimitive;

use crate::accessor::ArrayAccessor;
use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::varbin::VarBinArray;
use crate::array::Array;
use crate::compute::flatten::flatten_primitive;
use crate::compute::scalar_at::scalar_at;
use crate::match_each_native_ptype;
use crate::validity::ArrayValidity;

fn offset_at(array: &dyn Array, index: usize) -> usize {
if let Some(parray) = array.maybe_primitive() {
match_each_native_ptype!(parray.ptype(), |$P| {
parray.typed_data::<$P>()[index].as_()
})
} else {
scalar_at(array, index).and_then(|s| s.try_into()).unwrap()
}
}

impl<'a> ArrayAccessor<'a, &'a [u8]> for VarBinArray {
fn value(&'a self, index: usize) -> Option<&'a [u8]> {
if self.is_valid(index) {
let start = offset_at(self.offsets(), index);
let end = offset_at(self.offsets(), index + 1);
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
Some(&self.bytes().as_primitive().buffer()[start..end])
} else {
None
Expand All @@ -34,16 +18,7 @@ impl<'a> ArrayAccessor<'a, &'a [u8]> for VarBinArray {
impl ArrayAccessor<'_, Vec<u8>> for VarBinArray {
fn value(&self, index: usize) -> Option<Vec<u8>> {
if self.is_valid(index) {
let start = offset_at(self.offsets(), index);
let end = offset_at(self.offsets(), index + 1);

let slice_bytes = self.bytes().slice(start, end).unwrap();
Some(
flatten_primitive(&slice_bytes)
.unwrap()
.typed_data::<u8>()
.to_vec(),
)
Some(self.bytes_at(index).unwrap())
} else {
None
}
Expand Down
50 changes: 30 additions & 20 deletions vortex-array/src/array/varbin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::mem;

use arrow_buffer::NullBufferBuilder;
use num_traits::PrimInt;
use vortex_schema::DType;

use crate::array::primitive::PrimitiveArray;
Expand All @@ -8,13 +9,13 @@ use crate::array::Array;
use crate::ptype::NativePType;
use crate::validity::Validity;

pub struct VarBinBuilder<O: NativePType + PrimInt> {
pub struct VarBinBuilder<O: NativePType> {
offsets: Vec<O>,
data: Vec<u8>,
validity: NullBufferBuilder,
}

impl<O: NativePType + PrimInt> VarBinBuilder<O> {
impl<O: NativePType> VarBinBuilder<O> {
pub fn with_capacity(len: usize) -> Self {
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(O::zero());
Expand All @@ -25,27 +26,33 @@ impl<O: NativePType + PrimInt> VarBinBuilder<O> {
}
}

#[inline]
pub fn push(&mut self, value: Option<&[u8]>) {
match value {
Some(v) => {
self.offsets
.push(O::from(self.data.len() + v.len()).unwrap());
self.data.extend_from_slice(v);
self.validity.append_non_null();
}
None => {
self.offsets.push(self.offsets[self.offsets.len() - 1]);
self.validity.append_null();
}
Some(v) => self.push_value(v),
None => self.push_null(),
}
}

pub fn finish(self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(self.offsets);
let data = PrimitiveArray::from(self.data);
#[inline]
pub fn push_value(&mut self, value: &[u8]) {
self.offsets
.push(O::from(self.data.len() + value.len()).unwrap());
self.data.extend_from_slice(value);
self.validity.append_non_null();
}

#[inline]
pub fn push_null(&mut self) {
self.offsets.push(self.offsets[self.offsets.len() - 1]);
self.validity.append_null();
}

pub fn finish(&mut self, dtype: DType) -> VarBinArray {
let offsets = PrimitiveArray::from(mem::take(&mut self.offsets));
let data = PrimitiveArray::from(mem::take(&mut self.data));

// TODO(ngates): create our own ValidityBuilder that doesn't need mut or clone on finish.
let nulls = self.validity.finish_cloned();
let nulls = self.validity.finish();

let validity = if dtype.is_nullable() {
Some(
Expand All @@ -70,7 +77,7 @@ mod test {
use crate::array::varbin::builder::VarBinBuilder;
use crate::array::Array;
use crate::compute::scalar_at::scalar_at;
use crate::scalar::Scalar;
use crate::scalar::Utf8Scalar;

#[test]
fn test_builder() {
Expand All @@ -82,7 +89,10 @@ mod test {

assert_eq!(array.len(), 3);
assert_eq!(array.nullability(), Nullable);
assert_eq!(scalar_at(&array, 0).unwrap(), Scalar::from("hello"));
assert_eq!(
scalar_at(&array, 0).unwrap(),
Utf8Scalar::nullable("hello".to_owned()).into()
);
assert!(scalar_at(&array, 1).unwrap().is_null());
}
}
21 changes: 6 additions & 15 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vortex_schema::DType;

use crate::array::downcast::DowncastArrayBuiltin;
use crate::array::primitive::PrimitiveArray;
use crate::array::varbin::VarBinArray;
use crate::array::varbin::{varbin_scalar, VarBinArray};
use crate::array::{Array, ArrayRef};
use crate::arrow::wrappers::{as_nulls, as_offset_buffer};
use crate::compute::as_arrow::AsArrowArray;
Expand All @@ -20,9 +20,8 @@ use crate::compute::scalar_at::ScalarAtFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;
use crate::ptype::PType;
use crate::scalar::{BinaryScalar, Scalar, Utf8Scalar};
use crate::validity::Validity;
use crate::validity::{ArrayValidity, OwnedValidity};
use crate::scalar::Scalar;
use crate::validity::{ArrayValidity, OwnedValidity, Validity};
use crate::view::ToOwnedView;

mod take;
Expand Down Expand Up @@ -154,18 +153,10 @@ impl FlattenFn for VarBinArray {
impl ScalarAtFn for VarBinArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if self.is_valid(index) {
self.bytes_at(index).map(|bytes| {
if matches!(self.dtype, DType::Utf8(_)) {
unsafe { String::from_utf8_unchecked(bytes) }.into()
} else {
bytes.into()
}
})
// FIXME(ngates): there's something weird about this.
} else if matches!(self.dtype, DType::Utf8(_)) {
Ok(Utf8Scalar::none().into())
self.bytes_at(index)
.map(|bytes| varbin_scalar(bytes, self.dtype()))
} else {
Ok(BinaryScalar::none().into())
Ok(Scalar::null(self.dtype()))
}
}
}
5 changes: 2 additions & 3 deletions vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use num_traits::PrimInt;
use vortex_error::VortexResult;
use vortex_schema::DType;

Expand Down Expand Up @@ -37,7 +36,7 @@ impl TakeFn for VarBinArray {
}
}

fn take<I: NativePType + PrimInt, O: NativePType + PrimInt>(
fn take<I: NativePType, O: NativePType>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand All @@ -58,7 +57,7 @@ fn take<I: NativePType + PrimInt, O: NativePType + PrimInt>(
builder.finish(dtype)
}

fn take_nullable<I: NativePType + PrimInt, O: NativePType + PrimInt>(
fn take_nullable<I: NativePType, O: NativePType>(
dtype: DType,
offsets: &[O],
data: &[u8],
Expand Down
Loading

0 comments on commit e7bd331

Please sign in to comment.