diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index f555211e96..97b8ead8e5 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -203,14 +203,15 @@ impl FromIterator> for BoolArray { if validity.is_empty() { BoolArray::from(values) } else { - BoolArray::new(BooleanBuffer::from(values), Some(Validity::from(validity))) + BoolArray::new(BooleanBuffer::from(values), Some(validity.into())) } } } #[cfg(test)] mod test { - use super::*; + use crate::array::bool::BoolArray; + use crate::array::Array; use crate::compute::scalar_at::scalar_at; #[test] diff --git a/vortex-array/src/array/varbin/accessor.rs b/vortex-array/src/array/varbin/accessor.rs index f8cc88ab47..ff6fd1b055 100644 --- a/vortex-array/src/array/varbin/accessor.rs +++ b/vortex-array/src/array/varbin/accessor.rs @@ -1,29 +1,13 @@ -use num_traits::AsPrimitive; - use crate::accessor::ArrayAccessor; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::varbin::VarBinArray; -use crate::array::Array; -use crate::compute::flatten::flatten_primitive; -use crate::compute::scalar_at::scalar_at; -use crate::match_each_native_ptype; use crate::validity::ArrayValidity; -fn offset_at(array: &dyn Array, index: usize) -> usize { - if let Some(parray) = array.maybe_primitive() { - match_each_native_ptype!(parray.ptype(), |$P| { - parray.typed_data::<$P>()[index].as_() - }) - } else { - scalar_at(array, index).and_then(|s| s.try_into()).unwrap() - } -} - impl<'a> ArrayAccessor<'a, &'a [u8]> for VarBinArray { fn value(&'a self, index: usize) -> Option<&'a [u8]> { if self.is_valid(index) { - let start = offset_at(self.offsets(), index); - let end = offset_at(self.offsets(), index + 1); + let start = self.offset_at(index); + let end = self.offset_at(index + 1); Some(&self.bytes().as_primitive().buffer()[start..end]) } else { None @@ -34,16 +18,7 @@ impl<'a> ArrayAccessor<'a, &'a [u8]> for VarBinArray { impl ArrayAccessor<'_, Vec> for VarBinArray { fn value(&self, index: usize) -> Option> { if self.is_valid(index) { - let start = offset_at(self.offsets(), index); - let end = offset_at(self.offsets(), index + 1); - - let slice_bytes = self.bytes().slice(start, end).unwrap(); - Some( - flatten_primitive(&slice_bytes) - .unwrap() - .typed_data::() - .to_vec(), - ) + Some(self.bytes_at(index).unwrap()) } else { None } diff --git a/vortex-array/src/array/varbin/builder.rs b/vortex-array/src/array/varbin/builder.rs index 72c7e34ee9..1604806978 100644 --- a/vortex-array/src/array/varbin/builder.rs +++ b/vortex-array/src/array/varbin/builder.rs @@ -1,5 +1,6 @@ +use std::mem; + use arrow_buffer::NullBufferBuilder; -use num_traits::PrimInt; use vortex_schema::DType; use crate::array::primitive::PrimitiveArray; @@ -8,13 +9,13 @@ use crate::array::Array; use crate::ptype::NativePType; use crate::validity::Validity; -pub struct VarBinBuilder { +pub struct VarBinBuilder { offsets: Vec, data: Vec, validity: NullBufferBuilder, } -impl VarBinBuilder { +impl VarBinBuilder { pub fn with_capacity(len: usize) -> Self { let mut offsets = Vec::with_capacity(len + 1); offsets.push(O::zero()); @@ -25,27 +26,33 @@ impl VarBinBuilder { } } + #[inline] pub fn push(&mut self, value: Option<&[u8]>) { match value { - Some(v) => { - self.offsets - .push(O::from(self.data.len() + v.len()).unwrap()); - self.data.extend_from_slice(v); - self.validity.append_non_null(); - } - None => { - self.offsets.push(self.offsets[self.offsets.len() - 1]); - self.validity.append_null(); - } + Some(v) => self.push_value(v), + None => self.push_null(), } } - pub fn finish(self, dtype: DType) -> VarBinArray { - let offsets = PrimitiveArray::from(self.offsets); - let data = PrimitiveArray::from(self.data); + #[inline] + pub fn push_value(&mut self, value: &[u8]) { + self.offsets + .push(O::from(self.data.len() + value.len()).unwrap()); + self.data.extend_from_slice(value); + self.validity.append_non_null(); + } + + #[inline] + pub fn push_null(&mut self) { + self.offsets.push(self.offsets[self.offsets.len() - 1]); + self.validity.append_null(); + } + + pub fn finish(&mut self, dtype: DType) -> VarBinArray { + let offsets = PrimitiveArray::from(mem::take(&mut self.offsets)); + let data = PrimitiveArray::from(mem::take(&mut self.data)); - // TODO(ngates): create our own ValidityBuilder that doesn't need mut or clone on finish. - let nulls = self.validity.finish_cloned(); + let nulls = self.validity.finish(); let validity = if dtype.is_nullable() { Some( @@ -70,7 +77,7 @@ mod test { use crate::array::varbin::builder::VarBinBuilder; use crate::array::Array; use crate::compute::scalar_at::scalar_at; - use crate::scalar::Scalar; + use crate::scalar::Utf8Scalar; #[test] fn test_builder() { @@ -82,7 +89,10 @@ mod test { assert_eq!(array.len(), 3); assert_eq!(array.nullability(), Nullable); - assert_eq!(scalar_at(&array, 0).unwrap(), Scalar::from("hello")); + assert_eq!( + scalar_at(&array, 0).unwrap(), + Utf8Scalar::nullable("hello".to_owned()).into() + ); assert!(scalar_at(&array, 1).unwrap().is_null()); } } diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index a7bc63f2e9..02fb0b5cf9 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -9,7 +9,7 @@ use vortex_schema::DType; use crate::array::downcast::DowncastArrayBuiltin; use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; +use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::array::{Array, ArrayRef}; use crate::arrow::wrappers::{as_nulls, as_offset_buffer}; use crate::compute::as_arrow::AsArrowArray; @@ -20,9 +20,8 @@ use crate::compute::scalar_at::ScalarAtFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; use crate::ptype::PType; -use crate::scalar::{BinaryScalar, Scalar, Utf8Scalar}; -use crate::validity::Validity; -use crate::validity::{ArrayValidity, OwnedValidity}; +use crate::scalar::Scalar; +use crate::validity::{ArrayValidity, OwnedValidity, Validity}; use crate::view::ToOwnedView; mod take; @@ -154,18 +153,10 @@ impl FlattenFn for VarBinArray { impl ScalarAtFn for VarBinArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - self.bytes_at(index).map(|bytes| { - if matches!(self.dtype, DType::Utf8(_)) { - unsafe { String::from_utf8_unchecked(bytes) }.into() - } else { - bytes.into() - } - }) - // FIXME(ngates): there's something weird about this. - } else if matches!(self.dtype, DType::Utf8(_)) { - Ok(Utf8Scalar::none().into()) + self.bytes_at(index) + .map(|bytes| varbin_scalar(bytes, self.dtype())) } else { - Ok(BinaryScalar::none().into()) + Ok(Scalar::null(self.dtype())) } } } diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs index be100ab1a0..5704a76e24 100644 --- a/vortex-array/src/array/varbin/compute/take.rs +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -1,4 +1,3 @@ -use num_traits::PrimInt; use vortex_error::VortexResult; use vortex_schema::DType; @@ -37,7 +36,7 @@ impl TakeFn for VarBinArray { } } -fn take( +fn take( dtype: DType, offsets: &[O], data: &[u8], @@ -58,7 +57,7 @@ fn take( builder.finish(dtype) } -fn take_nullable( +fn take_nullable( dtype: DType, offsets: &[O], data: &[u8], diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index ea7e85c43b..90fdb92978 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -1,12 +1,13 @@ use std::sync::{Arc, RwLock}; use linkme::distributed_slice; -use num_traits::{FromPrimitive, Unsigned}; +use num_traits::AsPrimitive; +pub use stats::VarBinAccumulator; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_schema::{DType, IntWidth, Nullability, Signedness}; use crate::array::downcast::DowncastArrayBuiltin; -use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::builder::VarBinBuilder; use crate::array::{check_slice_bounds, Array, ArrayRef}; use crate::compress::EncodingCompression; use crate::compute::flatten::flatten_primitive; @@ -15,7 +16,9 @@ use crate::compute::ArrayCompute; use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; use crate::iterator::ArrayIter; +use crate::match_each_native_ptype; use crate::ptype::NativePType; +use crate::scalar::{BinaryScalar, Scalar, Utf8Scalar}; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; use crate::validity::OwnedValidity; @@ -24,7 +27,7 @@ use crate::view::AsView; use crate::{impl_array, ArrayWalker}; mod accessor; -mod builder; +pub mod builder; mod compress; mod compute; mod serde; @@ -119,23 +122,14 @@ impl VarBinArray { fn from_vec_sized(vec: Vec, dtype: DType) -> Self where - K: NativePType + FromPrimitive + Unsigned, + K: NativePType, T: AsRef<[u8]>, { - let mut offsets: Vec = Vec::with_capacity(vec.len() + 1); - let mut values: Vec = Vec::new(); - offsets.push(K::zero()); + let mut builder = VarBinBuilder::::with_capacity(vec.len()); for v in vec { - values.extend_from_slice(v.as_ref()); - offsets.push(::from_usize(values.len()).unwrap()); + builder.push_value(v.as_ref()); } - - VarBinArray::new( - PrimitiveArray::from(offsets).into_array(), - PrimitiveArray::from(values).into_array(), - dtype, - None, - ) + builder.finish(dtype) } pub fn from_iter, I: IntoIterator>>( @@ -143,35 +137,11 @@ impl VarBinArray { dtype: DType, ) -> Self { let iter = iter.into_iter(); - let (lower, _) = iter.size_hint(); - - let mut validity: Vec = Vec::with_capacity(lower); - let mut offsets: Vec = Vec::with_capacity(lower + 1); - offsets.push(0); - let mut bytes: Vec = Vec::new(); - for i in iter { - if let Some(v) = i { - validity.push(true); - bytes.extend_from_slice(v.as_ref()); - offsets.push(bytes.len() as u64); - } else { - validity.push(false); - offsets.push(bytes.len() as u64); - } - } - - let offsets_ref = PrimitiveArray::from(offsets).into_array(); - let bytes_ref = PrimitiveArray::from(bytes).into_array(); - if validity.is_empty() { - VarBinArray::new(offsets_ref, bytes_ref, dtype, None) - } else { - VarBinArray::new( - offsets_ref, - bytes_ref, - dtype.as_nullable(), - Some(validity.into()), - ) + let mut builder = VarBinBuilder::::with_capacity(iter.size_hint().0); + for v in iter { + builder.push(v.as_ref().map(|o| o.as_ref())); } + builder.finish(dtype) } pub fn iter_primitive(&self) -> VortexResult> { @@ -185,13 +155,24 @@ impl VarBinArray { ArrayIter::new(self) } + pub(self) fn offset_at(&self, index: usize) -> usize { + if let Some(parray) = self.offsets().maybe_primitive() { + match_each_native_ptype!(parray.ptype(), |$P| { + parray.typed_data::<$P>()[index].as_() + }) + } else { + scalar_at(self.offsets(), index) + .unwrap() + .try_into() + .unwrap() + } + } + pub fn bytes_at(&self, index: usize) -> VortexResult> { - let start = scalar_at(self.offsets(), index)?.try_into()?; - let end = scalar_at(self.offsets(), index + 1)?.try_into()?; + let start = self.offset_at(index); + let end = self.offset_at(index + 1); let sliced = self.bytes().slice(start, end)?; - Ok(flatten_primitive(sliced.as_ref())? - .typed_data::() - .to_vec()) + Ok(flatten_primitive(sliced.as_ref())?.buffer().to_vec()) } } @@ -323,25 +304,38 @@ impl From> for VarBinArray { impl<'a> FromIterator> for VarBinArray { fn from_iter>>(iter: T) -> Self { - VarBinArray::from_iter(iter, DType::Binary(Nullability::NonNullable)) + VarBinArray::from_iter(iter, DType::Binary(Nullability::Nullable)) } } impl FromIterator>> for VarBinArray { fn from_iter>>>(iter: T) -> Self { - VarBinArray::from_iter(iter, DType::Binary(Nullability::NonNullable)) + VarBinArray::from_iter(iter, DType::Binary(Nullability::Nullable)) } } impl FromIterator> for VarBinArray { fn from_iter>>(iter: T) -> Self { - VarBinArray::from_iter(iter, DType::Utf8(Nullability::NonNullable)) + VarBinArray::from_iter(iter, DType::Utf8(Nullability::Nullable)) } } impl<'a> FromIterator> for VarBinArray { fn from_iter>>(iter: T) -> Self { - VarBinArray::from_iter(iter, DType::Utf8(Nullability::NonNullable)) + VarBinArray::from_iter(iter, DType::Utf8(Nullability::Nullable)) + } +} + +pub fn varbin_scalar(value: Vec, dtype: &DType) -> Scalar { + if matches!(dtype, DType::Utf8(_)) { + let str = unsafe { String::from_utf8_unchecked(value) }; + Utf8Scalar::try_new(Some(str), dtype.nullability()) + .unwrap() + .into() + } else { + BinaryScalar::try_new(Some(value), dtype.nullability()) + .unwrap() + .into() } } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 05d28ccbf1..fffe2a5523 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -1,80 +1,96 @@ +use std::borrow::Cow; use std::cmp::Ordering; use std::collections::HashMap; use vortex_error::VortexResult; use vortex_schema::DType; -use crate::array::varbin::VarBinArray; -use crate::array::varbinview::VarBinViewArray; +use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::array::Array; use crate::stats::{Stat, StatsCompute, StatsSet}; -pub trait BinaryArray { - fn bytes_at(&self, index: usize) -> VortexResult>; +impl StatsCompute for VarBinArray { + fn compute(&self, _stat: &Stat) -> VortexResult { + let mut acc = VarBinAccumulator::default(); + self.iter_primitive() + .map(|prim_iter| { + for next_val in prim_iter { + acc.nullable_next(next_val.map(Cow::from)); + } + }) + .unwrap_or_else(|_| { + for next_val in self.iter() { + acc.nullable_next(next_val.map(Cow::from)); + } + }); + Ok(acc.finish(self.dtype())) + } } -impl StatsCompute for T -where - T: BinaryArray + Array, -{ - fn compute(&self, _stat: &Stat) -> VortexResult { - let mut min = vec![0xFF]; - let mut max = vec![0x00]; - let mut is_constant = true; - let mut is_sorted = true; - let mut last_value = vec![0x00]; - let mut runs: usize = 0; - for i in 0..self.len() { - let next_val = self.bytes_at(i).unwrap(); - if next_val < min { - min.clone_from(&next_val); - } - if next_val > max { - max.clone_from(&next_val); - } - match next_val.cmp(&last_value) { - Ordering::Less => is_sorted = false, - Ordering::Equal => continue, - Ordering::Greater => {} - } - is_constant = false; - last_value = next_val; - runs += 1; - } +pub struct VarBinAccumulator<'a> { + min: Cow<'a, [u8]>, + max: Cow<'a, [u8]>, + is_constant: bool, + is_sorted: bool, + is_strict_sorted: bool, + last_value: Cow<'a, [u8]>, + null_count: usize, + runs: usize, +} - Ok(StatsSet::from(HashMap::from([ - ( - Stat::Min, - if matches!(self.dtype(), DType::Utf8(_)) { - unsafe { String::from_utf8_unchecked(min.to_vec()) }.into() - } else { - min.into() - }, - ), - ( - Stat::Max, - if matches!(self.dtype(), DType::Utf8(_)) { - unsafe { String::from_utf8_unchecked(max.to_vec()) }.into() - } else { - max.into() - }, - ), - (Stat::RunCount, runs.into()), - (Stat::IsSorted, is_sorted.into()), - (Stat::IsConstant, is_constant.into()), - ]))) +impl Default for VarBinAccumulator<'_> { + fn default() -> Self { + Self { + min: Cow::from(&[0xFF]), + max: Cow::from(&[0x00]), + is_constant: true, + is_sorted: true, + is_strict_sorted: true, + last_value: Cow::from(&[0x00]), + runs: 0, + null_count: 0, + } } } -impl BinaryArray for VarBinArray { - fn bytes_at(&self, index: usize) -> VortexResult> { - VarBinArray::bytes_at(self, index) +impl<'a> VarBinAccumulator<'a> { + pub fn nullable_next(&mut self, val: Option>) { + match val { + None => self.null_count += 1, + Some(v) => self.next(v), + } } -} -impl BinaryArray for VarBinViewArray { - fn bytes_at(&self, index: usize) -> VortexResult> { - VarBinViewArray::bytes_at(self, index) + pub fn next(&mut self, val: Cow<'a, [u8]>) { + if val < self.min { + self.min.clone_from(&val); + } else if val > self.max { + self.max.clone_from(&val); + } + + match val.cmp(&self.last_value) { + Ordering::Less => self.is_sorted = false, + Ordering::Equal => { + self.is_strict_sorted = false; + return; + } + Ordering::Greater => {} + } + self.is_constant = false; + self.last_value = val; + self.runs += 1; + } + + pub fn finish(&self, dtype: &DType) -> StatsSet { + StatsSet::from(HashMap::from([ + (Stat::Min, varbin_scalar(self.min.to_vec(), dtype)), + (Stat::Max, varbin_scalar(self.max.to_vec(), dtype)), + (Stat::RunCount, self.runs.into()), + (Stat::IsSorted, self.is_sorted.into()), + (Stat::IsStrictSorted, self.is_strict_sorted.into()), + (Stat::IsConstant, self.is_constant.into()), + (Stat::NullCount, self.null_count.into()), + ])) } } @@ -82,20 +98,15 @@ impl BinaryArray for VarBinViewArray { mod test { use vortex_schema::{DType, Nullability}; - use crate::array::primitive::PrimitiveArray; use crate::array::varbin::VarBinArray; use crate::array::Array; use crate::stats::Stat; fn array(dtype: DType) -> VarBinArray { - let values = PrimitiveArray::from( - "hello worldhello world this is a long string" - .as_bytes() - .to_vec(), - ); - let offsets = PrimitiveArray::from(vec![0, 11, 44]); - - VarBinArray::new(offsets.into_array(), values.into_array(), dtype, None) + VarBinArray::from_vec( + vec!["hello world", "hello world this is a long string"], + dtype, + ) } #[test] diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs new file mode 100644 index 0000000000..0b8996e15f --- /dev/null +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -0,0 +1,31 @@ +use crate::accessor::ArrayAccessor; +use crate::array::downcast::DowncastArrayBuiltin; +use crate::array::varbinview::VarBinViewArray; +use crate::validity::ArrayValidity; + +impl<'a> ArrayAccessor<'a, &'a [u8]> for VarBinViewArray { + fn value(&'a self, index: usize) -> Option<&'a [u8]> { + if self.is_valid(index) { + let view = &self.view_slice()[index]; + if view.is_inlined() { + Some(unsafe { &view.inlined.data }) + } else { + let offset = unsafe { view._ref.offset as usize }; + let buffer_idx = unsafe { view._ref.buffer_index as usize }; + Some(&self.data()[buffer_idx].as_primitive().buffer()[offset..offset + view.size()]) + } + } else { + None + } + } +} + +impl<'a> ArrayAccessor<'a, Vec> for VarBinViewArray { + fn value(&'a self, index: usize) -> Option> { + if self.is_valid(index) { + Some(self.bytes_at(index).unwrap()) + } else { + None + } + } +} diff --git a/vortex-array/src/array/varbinview/builder.rs b/vortex-array/src/array/varbinview/builder.rs new file mode 100644 index 0000000000..a71c9ef595 --- /dev/null +++ b/vortex-array/src/array/varbinview/builder.rs @@ -0,0 +1,111 @@ +use std::marker::PhantomData; +use std::mem; +use std::mem::ManuallyDrop; + +use arrow_buffer::NullBufferBuilder; +use vortex_schema::DType; + +use crate::array::primitive::PrimitiveArray; +use crate::array::varbinview::{BinaryView, Inlined, Ref, VarBinViewArray, VIEW_SIZE}; +use crate::array::{Array, ArrayRef, IntoArray}; +use crate::validity::Validity; + +pub struct VarBinViewBuilder> { + views: Vec, + nulls: NullBufferBuilder, + completed: Vec, + in_progress: Vec, + block_size: u32, + phantom: PhantomData, +} + +impl> VarBinViewBuilder { + pub fn with_capacity(capacity: usize) -> Self { + Self { + views: Vec::with_capacity(capacity), + nulls: NullBufferBuilder::new(capacity), + completed: Vec::new(), + in_progress: Vec::new(), + block_size: 16 * 1024, + phantom: Default::default(), + } + } + + #[inline] + pub fn push(&mut self, value: Option) { + match value { + None => self.push_null(), + Some(v) => self.push_value(v), + } + } + + #[inline] + pub fn push_value(&mut self, value: T) { + let vbytes = value.as_ref(); + if self.in_progress.len() + vbytes.len() > self.in_progress.capacity() { + let done = mem::replace( + &mut self.in_progress, + Vec::with_capacity(vbytes.len().max(self.block_size as usize)), + ); + if !done.is_empty() { + assert!(self.completed.len() < u32::MAX as usize); + self.completed.push(PrimitiveArray::from(done).into_array()); + } + } + + if vbytes.len() > BinaryView::MAX_INLINED_SIZE { + self.views.push(BinaryView { + _ref: Ref::new( + vbytes.len() as u32, + vbytes[0..4].try_into().unwrap(), + self.completed.len() as u32, + self.in_progress.len() as u32, + ), + }); + self.in_progress.extend_from_slice(vbytes); + } else { + self.views.push(BinaryView { + inlined: Inlined::new(vbytes), + }); + } + self.nulls.append_non_null(); + } + + #[inline] + pub fn push_null(&mut self) { + self.views.push(BinaryView { + inlined: Inlined::new("".as_bytes()), + }); + self.nulls.append_null(); + } + + pub fn finish(&mut self, dtype: DType) -> VarBinViewArray { + let mut completed = mem::take(&mut self.completed); + if !self.in_progress.is_empty() { + completed.push(PrimitiveArray::from(mem::take(&mut self.in_progress)).into_array()); + } + + let nulls = self.nulls.finish(); + let validity = if dtype.is_nullable() { + Some( + nulls + .map(Validity::from) + .unwrap_or_else(|| Validity::Valid(self.views.len())), + ) + } else { + assert!(nulls.is_none(), "dtype and validity mismatch"); + None + }; + + let views_u8: Vec = unsafe { + let mut views_clone = ManuallyDrop::new(mem::take(&mut self.views)); + Vec::from_raw_parts( + views_clone.as_mut_ptr() as _, + views_clone.len() * VIEW_SIZE, + views_clone.capacity() * VIEW_SIZE, + ) + }; + + VarBinViewArray::try_new(views_u8.into_array(), completed, dtype, validity).unwrap() + } +} diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index 9ec24417d3..c3cf2289c6 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -6,6 +6,7 @@ use itertools::Itertools; use vortex_error::{vortex_bail, VortexResult}; use vortex_schema::DType; +use crate::array::varbin::varbin_scalar; use crate::array::varbinview::VarBinViewArray; use crate::array::Array; use crate::arrow::wrappers::as_nulls; @@ -35,13 +36,8 @@ impl ArrayCompute for VarBinViewArray { impl ScalarAtFn for VarBinViewArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { - self.bytes_at(index).map(|bytes| { - if matches!(self.dtype, DType::Utf8(_)) { - unsafe { String::from_utf8_unchecked(bytes) }.into() - } else { - bytes.into() - } - }) + self.bytes_at(index) + .map(|bytes| varbin_scalar(bytes, self.dtype())) } else { Ok(Scalar::null(self.dtype())) } @@ -56,12 +52,12 @@ impl FlattenFn for VarBinViewArray { .iter() .map(|d| flatten(d.as_ref()).unwrap().into_array()) .collect::>(); - Ok(FlattenedArray::VarBinView(VarBinViewArray::new( + Ok(FlattenedArray::VarBinView(VarBinViewArray::try_new( views, data, self.dtype.clone(), self.validity().to_owned_view(), - ))) + )?)) } } diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 57e3acc241..a9beb6b0b1 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,15 +1,20 @@ -use std::mem; +use std::fmt::{Debug, Formatter}; use std::sync::{Arc, RwLock}; +use std::{mem, slice}; use linkme::distributed_slice; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_schema::{DType, IntWidth, Nullability, Signedness}; +use crate::array::downcast::DowncastArrayBuiltin; +use crate::array::primitive::PrimitiveEncoding; +use crate::array::varbinview::builder::VarBinViewBuilder; use crate::array::{check_slice_bounds, Array, ArrayRef}; use crate::compute::flatten::flatten_primitive; use crate::compute::ArrayCompute; use crate::encoding::{Encoding, EncodingId, EncodingRef, ENCODINGS}; use crate::formatter::{ArrayDisplay, ArrayFormatter}; +use crate::iterator::ArrayIter; use crate::serde::{ArraySerde, EncodingSerde}; use crate::stats::{Stats, StatsSet}; use crate::validity::OwnedValidity; @@ -17,34 +22,37 @@ use crate::validity::{Validity, ValidityView}; use crate::view::AsView; use crate::{impl_array, ArrayWalker}; +mod accessor; +mod builder; mod compute; mod serde; +mod stats; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] #[repr(C, align(8))] struct Inlined { size: u32, - data: [u8; 12], + data: [u8; BinaryView::MAX_INLINED_SIZE], } impl Inlined { #[allow(dead_code)] - pub fn new(value: &str) -> Self { + pub fn new(value: &[u8]) -> Self { assert!( - value.len() < 13, + value.len() <= BinaryView::MAX_INLINED_SIZE, "Inlined strings must be shorter than 13 characters, {} given", value.len() ); let mut inlined = Inlined { size: value.len() as u32, - data: [0u8; 12], + data: [0u8; BinaryView::MAX_INLINED_SIZE], }; - inlined.data[..value.len()].copy_from_slice(value.as_bytes()); + inlined.data[..value.len()].copy_from_slice(value); inlined } } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] #[repr(C, align(8))] struct Ref { size: u32, @@ -53,27 +61,50 @@ struct Ref { offset: u32, } +impl Ref { + pub fn new(size: u32, prefix: [u8; 4], buffer_index: u32, offset: u32) -> Self { + Self { + size, + prefix, + buffer_index, + offset, + } + } +} + #[derive(Clone, Copy)] #[repr(C, align(8))] -union BinaryView { +pub union BinaryView { inlined: Inlined, _ref: Ref, } impl BinaryView { + pub const MAX_INLINED_SIZE: usize = 12; + #[inline] - pub fn from_le_bytes(bytes: [u8; 16]) -> BinaryView { - unsafe { mem::transmute(bytes) } + pub fn size(&self) -> usize { + unsafe { self.inlined.size as usize } } - #[inline] - #[allow(dead_code)] - pub fn to_le_bytes(self) -> [u8; 16] { - unsafe { mem::transmute(self) } + pub fn is_inlined(&self) -> bool { + unsafe { self.inlined.size <= Self::MAX_INLINED_SIZE as u32 } + } +} + +impl Debug for BinaryView { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut s = f.debug_struct("BinaryView"); + if self.is_inlined() { + s.field("inline", unsafe { &self.inlined }); + } else { + s.field("ref", unsafe { &self._ref }); + } + s.finish() } } -pub const VIEW_SIZE: usize = std::mem::size_of::(); +pub const VIEW_SIZE: usize = mem::size_of::(); #[derive(Debug, Clone)] pub struct VarBinViewArray { @@ -85,15 +116,6 @@ pub struct VarBinViewArray { } impl VarBinViewArray { - pub fn new( - views: ArrayRef, - data: Vec, - dtype: DType, - validity: Option, - ) -> Self { - Self::try_new(views, data, dtype, validity).unwrap() - } - pub fn try_new( views: ArrayRef, data: Vec, @@ -135,22 +157,17 @@ impl VarBinViewArray { }) } - pub fn plain_size(&self) -> usize { - (0..self.views.len() / VIEW_SIZE).fold(0usize, |acc, i| { - let view = self.view_at(i); - unsafe { acc + view.inlined.size as usize } - }) + pub(self) fn view_slice(&self) -> &[BinaryView] { + unsafe { + slice::from_raw_parts( + self.views.as_primitive().typed_data::().as_ptr() as _, + self.views.len() / VIEW_SIZE, + ) + } } pub(self) fn view_at(&self, index: usize) -> BinaryView { - let view_vec = flatten_primitive( - self.views - .slice(index * VIEW_SIZE, (index + 1) * VIEW_SIZE) - .unwrap() - .as_ref(), - ) - .unwrap(); - BinaryView::from_le_bytes(view_vec.typed_data::().try_into().unwrap()) + self.view_slice()[index] } #[inline] @@ -163,19 +180,55 @@ impl VarBinViewArray { &self.data } + pub fn from_vec>(vec: Vec, dtype: DType) -> Self { + let mut builder = VarBinViewBuilder::with_capacity(vec.len()); + for v in vec { + builder.push_value(v) + } + builder.finish(dtype) + } + + pub fn from_iter, I: IntoIterator>>( + iter: I, + dtype: DType, + ) -> Self { + let iter = iter.into_iter(); + let mut builder = VarBinViewBuilder::with_capacity(iter.size_hint().0); + for v in iter { + builder.push(v) + } + builder.finish(dtype) + } + + pub fn iter_primitive(&self) -> VortexResult> { + if self + .data() + .iter() + .all(|b| b.encoding().id() == PrimitiveEncoding::ID) + { + Ok(ArrayIter::new(self)) + } else { + Err(vortex_err!("Bytes array was not a primitive array")) + } + } + + pub fn iter(&self) -> ArrayIter<'_, VarBinViewArray, Vec> { + ArrayIter::new(self) + } + pub fn bytes_at(&self, index: usize) -> VortexResult> { let view = self.view_at(index); unsafe { if view.inlined.size > 12 { let arrow_data_buffer = flatten_primitive( - self.data + &self + .data .get(view._ref.buffer_index as usize) .unwrap() .slice( view._ref.offset as usize, (view._ref.size + view._ref.offset) as usize, - )? - .as_ref(), + )?, )?; // TODO(ngates): can we avoid returning a copy? Ok(arrow_data_buffer.typed_data::().to_vec()) @@ -227,6 +280,10 @@ impl Array for VarBinViewArray { &VarBinViewEncoding } + fn nbytes(&self) -> usize { + self.views.nbytes() + self.data.iter().map(|arr| arr.nbytes()).sum::() + } + #[inline] fn with_compute_mut( &self, @@ -235,10 +292,6 @@ impl Array for VarBinViewArray { f(self) } - fn nbytes(&self) -> usize { - self.views.nbytes() + self.data.iter().map(|arr| arr.nbytes()).sum::() - } - fn serde(&self) -> Option<&dyn ArraySerde> { Some(self) } @@ -288,44 +341,68 @@ impl ArrayDisplay for VarBinViewArray { } } +impl From> for VarBinViewArray { + fn from(value: Vec<&[u8]>) -> Self { + VarBinViewArray::from_vec(value, DType::Binary(Nullability::NonNullable)) + } +} + +impl From>> for VarBinViewArray { + fn from(value: Vec>) -> Self { + VarBinViewArray::from_vec(value, DType::Binary(Nullability::NonNullable)) + } +} + +impl From> for VarBinViewArray { + fn from(value: Vec) -> Self { + VarBinViewArray::from_vec(value, DType::Utf8(Nullability::NonNullable)) + } +} + +impl From> for VarBinViewArray { + fn from(value: Vec<&str>) -> Self { + VarBinViewArray::from_vec(value, DType::Utf8(Nullability::NonNullable)) + } +} + +impl<'a> FromIterator> for VarBinViewArray { + fn from_iter>>(iter: T) -> Self { + VarBinViewArray::from_iter(iter, DType::Binary(Nullability::NonNullable)) + } +} + +impl FromIterator>> for VarBinViewArray { + fn from_iter>>>(iter: T) -> Self { + VarBinViewArray::from_iter(iter, DType::Binary(Nullability::NonNullable)) + } +} + +impl FromIterator> for VarBinViewArray { + fn from_iter>>(iter: T) -> Self { + VarBinViewArray::from_iter(iter, DType::Utf8(Nullability::NonNullable)) + } +} + +impl<'a> FromIterator> for VarBinViewArray { + fn from_iter>>(iter: T) -> Self { + VarBinViewArray::from_iter(iter, DType::Utf8(Nullability::NonNullable)) + } +} + #[cfg(test)] mod test { - use super::*; - use crate::array::primitive::PrimitiveArray; + use arrow_array::array::StringViewArray as ArrowStringViewArray; + + use crate::array::varbinview::VarBinViewArray; + use crate::array::Array; + use crate::compute::as_arrow::as_arrow; use crate::compute::scalar_at::scalar_at; use crate::scalar::Scalar; - fn binary_array() -> VarBinViewArray { - let values = PrimitiveArray::from("hello world this is a long string".as_bytes().to_vec()); - let view1 = BinaryView { - inlined: Inlined::new("hello world"), - }; - let view2 = BinaryView { - _ref: Ref { - size: 33, - prefix: "hell".as_bytes().try_into().unwrap(), - buffer_index: 0, - offset: 0, - }, - }; - let view_arr = PrimitiveArray::from( - vec![view1.to_le_bytes(), view2.to_le_bytes()] - .into_iter() - .flatten() - .collect::>(), - ); - - VarBinViewArray::new( - view_arr.into_array(), - vec![values.into_array()], - DType::Utf8(Nullability::NonNullable), - None, - ) - } - #[test] pub fn varbin_view() { - let binary_arr = binary_array(); + let binary_arr = + VarBinViewArray::from(vec!["hello world", "hello world this is a long string"]); assert_eq!(binary_arr.len(), 2); assert_eq!( scalar_at(&binary_arr, 0).unwrap(), @@ -339,10 +416,31 @@ mod test { #[test] pub fn slice() { - let binary_arr = binary_array().slice(1, 2).unwrap(); + let binary_arr = + VarBinViewArray::from(vec!["hello world", "hello world this is a long string"]) + .slice(1, 2) + .unwrap(); assert_eq!( scalar_at(&binary_arr, 0).unwrap(), Scalar::from("hello world this is a long string") ); } + + #[test] + pub fn iter() { + let binary_array = + VarBinViewArray::from(vec!["hello world", "hello world this is a long string"]); + assert_eq!( + as_arrow(&binary_array) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect::>(), + ArrowStringViewArray::from(vec!["hello world", "hello world this is a long string",]) + .iter() + .collect::>() + ); + } } diff --git a/vortex-array/src/array/varbinview/serde.rs b/vortex-array/src/array/varbinview/serde.rs index 10e28dd3bc..40996c45d6 100644 --- a/vortex-array/src/array/varbinview/serde.rs +++ b/vortex-array/src/array/varbinview/serde.rs @@ -30,51 +30,24 @@ impl EncodingSerde for VarBinViewEncoding { for _ in 0..num_data { data_bufs.push(ctx.bytes().read()?); } - Ok(VarBinViewArray::new(views, data_bufs, ctx.schema().clone(), validity).into_array()) + Ok( + VarBinViewArray::try_new(views, data_bufs, ctx.schema().clone(), validity) + .unwrap() + .into_array(), + ) } } #[cfg(test)] mod test { - use vortex_schema::{DType, Nullability}; - use crate::array::downcast::DowncastArrayBuiltin; - use crate::array::primitive::PrimitiveArray; - use crate::array::varbinview::{BinaryView, Inlined, Ref, VarBinViewArray}; - use crate::array::Array; + use crate::array::varbinview::VarBinViewArray; use crate::serde::test::roundtrip_array; - fn binary_array() -> VarBinViewArray { - let values = PrimitiveArray::from("hello world this is a long string".as_bytes().to_vec()); - let view1 = BinaryView { - inlined: Inlined::new("hello world"), - }; - let view2 = BinaryView { - _ref: Ref { - size: 33, - prefix: "hell".as_bytes().try_into().unwrap(), - buffer_index: 0, - offset: 0, - }, - }; - let view_arr = PrimitiveArray::from( - vec![view1.to_le_bytes(), view2.to_le_bytes()] - .into_iter() - .flatten() - .collect::>(), - ); - - VarBinViewArray::new( - view_arr.into_array(), - vec![values.into_array()], - DType::Utf8(Nullability::NonNullable), - None, - ) - } - #[test] fn roundtrip() { - let arr = binary_array(); + let arr = VarBinViewArray::from(vec!["hello world", "hello world this is a long string"]); + let read_arr = roundtrip_array(&arr).unwrap(); assert_eq!( diff --git a/vortex-array/src/array/varbinview/stats.rs b/vortex-array/src/array/varbinview/stats.rs new file mode 100644 index 0000000000..1cdd5366b9 --- /dev/null +++ b/vortex-array/src/array/varbinview/stats.rs @@ -0,0 +1,26 @@ +use std::borrow::Cow; + +use vortex_error::VortexResult; + +use crate::array::varbin::VarBinAccumulator; +use crate::array::varbinview::VarBinViewArray; +use crate::array::Array; +use crate::stats::{Stat, StatsCompute, StatsSet}; + +impl StatsCompute for VarBinViewArray { + fn compute(&self, _stat: &Stat) -> VortexResult { + let mut acc = VarBinAccumulator::default(); + self.iter_primitive() + .map(|prim_iter| { + for next_val in prim_iter { + acc.nullable_next(next_val.map(Cow::from)); + } + }) + .unwrap_or_else(|_| { + for next_val in self.iter() { + acc.nullable_next(next_val.map(Cow::from)); + } + }); + Ok(acc.finish(self.dtype())) + } +} diff --git a/vortex-array/src/encode.rs b/vortex-array/src/encode.rs index a848e26cbb..b099ca08ee 100644 --- a/vortex-array/src/encode.rs +++ b/vortex-array/src/encode.rs @@ -128,7 +128,7 @@ impl FromArrowArray<&GenericByteViewArray> for ArrayRef { _ => panic!("Invalid data type for ByteViewArray"), }; - VarBinViewArray::new( + VarBinViewArray::try_new( value.views().inner().clone().into_array(), value .data_buffers() @@ -138,6 +138,7 @@ impl FromArrowArray<&GenericByteViewArray> for ArrayRef { dtype, nulls(value.nulls(), nullable, value.len()), ) + .unwrap() .into_array() } } diff --git a/vortex-array/src/scalar/binary.rs b/vortex-array/src/scalar/binary.rs index dbd5d29260..04bd559699 100644 --- a/vortex-array/src/scalar/binary.rs +++ b/vortex-array/src/scalar/binary.rs @@ -33,6 +33,12 @@ impl From> for Scalar { } } +impl From<&[u8]> for Scalar { + fn from(value: &[u8]) -> Self { + BinaryScalar::some(value.to_vec()).into() + } +} + impl TryFrom for Vec { type Error = VortexError; @@ -40,8 +46,7 @@ impl TryFrom for Vec { let Scalar::Binary(b) = value else { vortex_bail!(MismatchedTypes: "binary", value.dtype()); }; - b.value() - .cloned() + b.into_value() .ok_or_else(|| vortex_err!("Can't extract present value from null scalar")) } } diff --git a/vortex-array/src/scalar/list.rs b/vortex-array/src/scalar/list.rs index 6cbbe75db1..24d12864e9 100644 --- a/vortex-array/src/scalar/list.rs +++ b/vortex-array/src/scalar/list.rs @@ -81,7 +81,7 @@ impl> TryFrom for ListScalarVec< vs.into_iter().map(|v| v.try_into()).try_collect()?, )) } else { - Err(vortex_err!("can't extract present value from null scalar",)) + Err(vortex_err!("can't extract present value from null scalar")) } } else { Err(vortex_err!(MismatchedTypes: "any list", value.dtype())) @@ -99,7 +99,7 @@ impl<'a, T: TryFrom<&'a Scalar, Error = VortexError>> TryFrom<&'a Scalar> for Li vs.iter().map(|v| v.try_into()).try_collect()?, )) } else { - Err(vortex_err!("can't extract present value from null scalar",)) + Err(vortex_err!("can't extract present value from null scalar")) } } else { Err(vortex_err!(MismatchedTypes: "any list", value.dtype())) diff --git a/vortex-array/src/serde/mod.rs b/vortex-array/src/serde/mod.rs index 4735713f67..598e8a1f3d 100644 --- a/vortex-array/src/serde/mod.rs +++ b/vortex-array/src/serde/mod.rs @@ -238,7 +238,7 @@ impl<'a> ReadCtx<'a> { { serde.read(self) } else { - Err(vortex_err!("Failed to recognize encoding ID",)) + Err(vortex_err!("Failed to recognize encoding ID")) } } } diff --git a/vortex-dict/src/compress.rs b/vortex-dict/src/compress.rs index d1f7332eae..7f911b0722 100644 --- a/vortex-dict/src/compress.rs +++ b/vortex-dict/src/compress.rs @@ -4,7 +4,6 @@ use ahash::RandomState; use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::HashMap; use num_traits::AsPrimitive; -use vortex::array::bool::BoolArray; use vortex::array::primitive::{PrimitiveArray, PrimitiveEncoding}; use vortex::array::varbin::{VarBinArray, VarBinEncoding}; use vortex::array::{Array, ArrayKind, ArrayRef}; @@ -13,7 +12,6 @@ use vortex::match_each_native_ptype; use vortex::ptype::NativePType; use vortex::scalar::AsBytes; use vortex::stats::Stat; -use vortex::validity::Validity; use vortex_error::VortexResult; use vortex_schema::DType; @@ -140,7 +138,7 @@ pub fn dict_encode_typed_primitive( validity.push(false); validity.extend(vec![true; values.len() - 1]); - Some(Validity::array(BoolArray::from(validity).into_array()).unwrap()) + Some(validity.into()) } else { None }; @@ -222,7 +220,7 @@ where validity.push(false); validity.extend(vec![true; offsets.len() - 2]); - Some(Validity::array(BoolArray::from(validity).into_array()).unwrap()) + Some(validity.into()) } else { None }; @@ -323,6 +321,7 @@ mod test { codes.buffer().typed_data::(), &[1, 0, 2, 1, 0, 3, 2, 0] ); + assert_eq!(String::from_utf8(values.bytes_at(0).unwrap()).unwrap(), ""); assert_eq!( values .iter_primitive() diff --git a/vortex-ree/src/compute.rs b/vortex-ree/src/compute.rs index 5f8cf7524b..c4922354db 100644 --- a/vortex-ree/src/compute.rs +++ b/vortex-ree/src/compute.rs @@ -45,7 +45,7 @@ impl FlattenFn for REEArray { ) .map(FlattenedArray::Primitive) } else { - Err(vortex_err!("Cannot yet flatten non-primitive REE array",)) + Err(vortex_err!("Cannot yet flatten non-primitive REE array")) } } }