From 9992e943ae1ed2f885633805005d1ba89b92a5b8 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Mon, 15 Apr 2024 09:39:44 +0100 Subject: [PATCH] Array2: VarBin (#234) --- vortex-array2/src/accessor.rs | 10 + vortex-array2/src/array/bool/mod.rs | 2 +- vortex-array2/src/array/chunked/mod.rs | 10 +- vortex-array2/src/array/mod.rs | 1 + .../src/array/primitive/compute/mod.rs | 6 + .../src/array/primitive/compute/slice.rs | 19 ++ vortex-array2/src/array/primitive/mod.rs | 14 +- vortex-array2/src/array/varbin/accessor.rs | 49 ++++ vortex-array2/src/array/varbin/array.rs | 30 ++ vortex-array2/src/array/varbin/builder.rs | 94 ++++++ vortex-array2/src/array/varbin/compute/mod.rs | 144 +++++++++ .../src/array/varbin/compute/slice.rs | 17 ++ .../src/array/varbin/compute/take.rs | 80 +++++ vortex-array2/src/array/varbin/flatten.rs | 13 + vortex-array2/src/array/varbin/mod.rs | 277 ++++++++++++++++++ vortex-array2/src/array/varbin/stats.rs | 152 ++++++++++ vortex-array2/src/arrow/mod.rs | 1 + vortex-array2/src/arrow/wrappers.rs | 13 + vortex-array2/src/compute/mod.rs | 6 + vortex-array2/src/compute/slice.rs | 31 ++ vortex-array2/src/data.rs | 28 +- vortex-array2/src/flatten.rs | 3 + vortex-array2/src/lib.rs | 22 +- vortex-array2/src/typed.rs | 14 +- vortex-array2/src/validity.rs | 40 ++- vortex-array2/src/view.rs | 29 +- vortex-schema/src/dtype.rs | 2 + 27 files changed, 1017 insertions(+), 90 deletions(-) create mode 100644 vortex-array2/src/accessor.rs create mode 100644 vortex-array2/src/array/primitive/compute/slice.rs create mode 100644 vortex-array2/src/array/varbin/accessor.rs create mode 100644 vortex-array2/src/array/varbin/array.rs create mode 100644 vortex-array2/src/array/varbin/builder.rs create mode 100644 vortex-array2/src/array/varbin/compute/mod.rs create mode 100644 vortex-array2/src/array/varbin/compute/slice.rs create mode 100644 vortex-array2/src/array/varbin/compute/take.rs create mode 100644 vortex-array2/src/array/varbin/flatten.rs create mode 100644 vortex-array2/src/array/varbin/mod.rs create mode 100644 vortex-array2/src/array/varbin/stats.rs create mode 100644 vortex-array2/src/arrow/wrappers.rs create mode 100644 vortex-array2/src/compute/slice.rs diff --git a/vortex-array2/src/accessor.rs b/vortex-array2/src/accessor.rs new file mode 100644 index 0000000000..6688dec15f --- /dev/null +++ b/vortex-array2/src/accessor.rs @@ -0,0 +1,10 @@ +use vortex_error::VortexResult; + +pub trait ArrayAccessor { + type Item<'a>; + + fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( + &self, + f: F, + ) -> VortexResult; +} diff --git a/vortex-array2/src/array/bool/mod.rs b/vortex-array2/src/array/bool/mod.rs index 45d1d5165d..cc8ad0f829 100644 --- a/vortex-array2/src/array/bool/mod.rs +++ b/vortex-array2/src/array/bool/mod.rs @@ -48,7 +48,7 @@ impl BoolArray<'_> { length: buffer.len(), }, vec![Buffer::Owned(buffer.into_inner())].into(), - validity.to_array_data().into_iter().collect_vec().into(), + validity.into_array_data().into_iter().collect_vec().into(), HashMap::default(), ) } diff --git a/vortex-array2/src/array/chunked/mod.rs b/vortex-array2/src/array/chunked/mod.rs index 3a2eccbd59..efbc4df021 100644 --- a/vortex-array2/src/array/chunked/mod.rs +++ b/vortex-array2/src/array/chunked/mod.rs @@ -69,10 +69,6 @@ impl ChunkedArray<'_> { self.array().child(idx + 1, self.array().dtype()) } - pub fn chunks(&self) -> impl Iterator { - (0..self.nchunks()).map(|c| self.chunk(c).unwrap()) - } - pub fn nchunks(&self) -> usize { self.chunk_ends().len() - 1 } @@ -104,6 +100,12 @@ impl ChunkedArray<'_> { } } +impl<'a> ChunkedArray<'a> { + pub fn chunks(&'a self) -> impl Iterator> { + (0..self.nchunks()).map(|c| self.chunk(c).unwrap()) + } +} + impl FromIterator for OwnedChunkedArray { fn from_iter>(iter: T) -> Self { let chunks: Vec = iter.into_iter().collect(); diff --git a/vortex-array2/src/array/mod.rs b/vortex-array2/src/array/mod.rs index 2a4a5f0410..7b57e9f484 100644 --- a/vortex-array2/src/array/mod.rs +++ b/vortex-array2/src/array/mod.rs @@ -3,3 +3,4 @@ pub mod chunked; pub mod constant; pub mod primitive; pub mod r#struct; +pub mod varbin; diff --git a/vortex-array2/src/array/primitive/compute/mod.rs b/vortex-array2/src/array/primitive/compute/mod.rs index 17667643fa..180cc99601 100644 --- a/vortex-array2/src/array/primitive/compute/mod.rs +++ b/vortex-array2/src/array/primitive/compute/mod.rs @@ -5,6 +5,7 @@ use crate::compute::cast::CastFn; use crate::compute::fill::FillForwardFn; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::search_sorted::SearchSortedFn; +use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; @@ -14,6 +15,7 @@ mod cast; mod fill; mod scalar_at; mod search_sorted; +mod slice; mod take; impl ArrayCompute for PrimitiveArray<'_> { @@ -41,6 +43,10 @@ impl ArrayCompute for PrimitiveArray<'_> { Some(self) } + fn slice(&self) -> Option<&dyn SliceFn> { + Some(self) + } + fn take(&self) -> Option<&dyn TakeFn> { Some(self) } diff --git a/vortex-array2/src/array/primitive/compute/slice.rs b/vortex-array2/src/array/primitive/compute/slice.rs new file mode 100644 index 0000000000..de392700de --- /dev/null +++ b/vortex-array2/src/array/primitive/compute/slice.rs @@ -0,0 +1,19 @@ +use vortex::match_each_native_ptype; +use vortex_error::VortexResult; + +use crate::array::primitive::PrimitiveArray; +use crate::compute::slice::SliceFn; +use crate::IntoArray; +use crate::OwnedArray; + +impl SliceFn for PrimitiveArray<'_> { + fn slice(&self, start: usize, stop: usize) -> VortexResult { + match_each_native_ptype!(self.ptype(), |$T| { + Ok(PrimitiveArray::try_new( + self.scalar_buffer::<$T>().slice(start, stop - start), + self.validity().slice(start, stop)?, + )? + .into_array()) + }) + } +} diff --git a/vortex-array2/src/array/primitive/mod.rs b/vortex-array2/src/array/primitive/mod.rs index fc7e86da5a..cf055714ef 100644 --- a/vortex-array2/src/array/primitive/mod.rs +++ b/vortex-array2/src/array/primitive/mod.rs @@ -24,10 +24,6 @@ pub struct PrimitiveMetadata { } impl PrimitiveArray<'_> { - pub fn buffer(&self) -> &Buffer { - self.array().buffer(0).expect("missing buffer") - } - pub fn validity(&self) -> Validity { self.metadata() .validity @@ -39,6 +35,14 @@ impl PrimitiveArray<'_> { self.dtype().try_into().unwrap() } + pub fn buffer(&self) -> &Buffer { + self.array().buffer(0).expect("missing buffer") + } + + pub fn scalar_buffer(&self) -> ScalarBuffer { + ScalarBuffer::new(self.buffer().clone().into(), 0, self.len()) + } + pub fn typed_data(&self) -> &[T] { self.buffer().typed_data::() } @@ -55,7 +59,7 @@ impl PrimitiveArray<'_> { validity: validity.to_metadata(buffer.len())?, }, vec![Buffer::Owned(buffer.into_inner())].into(), - validity.to_array_data().into_iter().collect_vec().into(), + validity.into_array_data().into_iter().collect_vec().into(), HashMap::default(), ) } diff --git a/vortex-array2/src/array/varbin/accessor.rs b/vortex-array2/src/array/varbin/accessor.rs new file mode 100644 index 0000000000..17c62cd64a --- /dev/null +++ b/vortex-array2/src/array/varbin/accessor.rs @@ -0,0 +1,49 @@ +use vortex::match_each_integer_ptype; +use vortex_error::VortexResult; + +use crate::accessor::ArrayAccessor; +use crate::array::varbin::VarBinArray; +use crate::validity::ArrayValidity; + +impl ArrayAccessor for VarBinArray<'_> { + type Item<'a> = Option<&'a [u8]>; + + fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( + &self, + f: F, + ) -> VortexResult { + // TODO(ngates): what happens if bytes is much larger than sliced_bytes? + let primitive = self.bytes().flatten_primitive()?; + let offsets = self.offsets().flatten_primitive()?; + let validity = self.logical_validity().to_null_buffer()?; + + match_each_integer_ptype!(offsets.ptype(), |$T| { + let offsets = offsets.typed_data::<$T>(); + let bytes = primitive.typed_data::(); + + match validity { + None => { + let mut iter = offsets + .iter() + .zip(offsets.iter().skip(1)) + .map(|(start, end)| Some(&bytes[*start as usize..*end as usize])); + Ok(f(&mut iter)) + } + Some(validity) => { + let mut iter = offsets + .iter() + .zip(offsets.iter().skip(1)) + .zip(validity.iter()) + .map(|((start, end), valid)| { + if valid { + Some(&bytes[*start as usize..*end as usize]) + } else { + None + } + }); + Ok(f(&mut iter)) + } + } + }) + } +} diff --git a/vortex-array2/src/array/varbin/array.rs b/vortex-array2/src/array/varbin/array.rs new file mode 100644 index 0000000000..42ef307ac7 --- /dev/null +++ b/vortex-array2/src/array/varbin/array.rs @@ -0,0 +1,30 @@ +use vortex_error::VortexResult; + +use crate::array::varbin::VarBinArray; +use crate::validity::{ArrayValidity, LogicalValidity}; +use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +use crate::ArrayTrait; + +impl ArrayValidity for VarBinArray<'_> { + fn is_valid(&self, index: usize) -> bool { + self.validity().is_valid(index) + } + + fn logical_validity(&self) -> LogicalValidity { + self.validity().to_logical(self.len()) + } +} + +impl AcceptArrayVisitor for VarBinArray<'_> { + fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { + visitor.visit_child("offsets", &self.offsets())?; + visitor.visit_child("offsets", &self.bytes())?; + visitor.visit_validity(&self.validity()) + } +} + +impl ArrayTrait for VarBinArray<'_> { + fn len(&self) -> usize { + self.offsets().len() - 1 + } +} diff --git a/vortex-array2/src/array/varbin/builder.rs b/vortex-array2/src/array/varbin/builder.rs new file mode 100644 index 0000000000..205919c580 --- /dev/null +++ b/vortex-array2/src/array/varbin/builder.rs @@ -0,0 +1,94 @@ +use std::mem; + +use arrow_buffer::NullBufferBuilder; +use vortex::ptype::NativePType; +use vortex_schema::DType; + +use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::{OwnedVarBinArray, VarBinArray}; +use crate::validity::Validity; +use crate::IntoArray; + +pub struct VarBinBuilder { + offsets: Vec, + data: Vec, + validity: NullBufferBuilder, +} + +impl VarBinBuilder { + pub fn with_capacity(len: usize) -> Self { + let mut offsets = Vec::with_capacity(len + 1); + offsets.push(O::zero()); + Self { + offsets, + data: Vec::new(), + validity: NullBufferBuilder::new(len), + } + } + + #[inline] + pub fn push(&mut self, value: Option<&[u8]>) { + match value { + Some(v) => self.push_value(v), + None => self.push_null(), + } + } + + #[inline] + pub fn push_value(&mut self, value: &[u8]) { + self.offsets + .push(O::from(self.data.len() + value.len()).unwrap()); + self.data.extend_from_slice(value); + self.validity.append_non_null(); + } + + #[inline] + pub fn push_null(&mut self) { + self.offsets.push(self.offsets[self.offsets.len() - 1]); + self.validity.append_null(); + } + + pub fn finish(&mut self, dtype: DType) -> OwnedVarBinArray { + let offsets = PrimitiveArray::from(mem::take(&mut self.offsets)); + let data = PrimitiveArray::from(mem::take(&mut self.data)); + + let nulls = self.validity.finish(); + + let validity = if dtype.is_nullable() { + nulls.map(Validity::from).unwrap_or(Validity::AllValid) + } else { + assert!(nulls.is_none(), "dtype and validity mismatch"); + Validity::NonNullable + }; + + VarBinArray::new(offsets.into_array(), data.into_array(), dtype, validity) + } +} + +#[cfg(test)] +mod test { + use vortex::scalar::Utf8Scalar; + use vortex_schema::DType; + use vortex_schema::Nullability::Nullable; + + use crate::array::varbin::builder::VarBinBuilder; + use crate::compute::scalar_at::scalar_at; + use crate::IntoArray; + + #[test] + fn test_builder() { + let mut builder = VarBinBuilder::::with_capacity(0); + builder.push(Some(b"hello")); + builder.push(None); + builder.push(Some(b"world")); + let array = builder.finish(DType::Utf8(Nullable)).into_array(); + + assert_eq!(array.len(), 3); + assert_eq!(array.dtype().nullability(), Nullable); + assert_eq!( + scalar_at(&array, 0).unwrap(), + Utf8Scalar::nullable("hello".to_owned()).into() + ); + assert!(scalar_at(&array, 1).unwrap().is_null()); + } +} diff --git a/vortex-array2/src/array/varbin/compute/mod.rs b/vortex-array2/src/array/varbin/compute/mod.rs new file mode 100644 index 0000000000..c28a1ac5ce --- /dev/null +++ b/vortex-array2/src/array/varbin/compute/mod.rs @@ -0,0 +1,144 @@ +use std::sync::Arc; + +use arrow_array::{ + ArrayRef as ArrowArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, +}; +use itertools::Itertools; +use vortex::ptype::PType; +use vortex::scalar::Scalar; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_schema::DType; + +use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::{varbin_scalar, VarBinArray}; +use crate::arrow::wrappers::as_offset_buffer; +use crate::compute::as_arrow::AsArrowArray; +use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; +use crate::compute::cast::cast; +use crate::compute::scalar_at::ScalarAtFn; +use crate::compute::slice::SliceFn; +use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; +use crate::validity::{ArrayValidity, Validity}; +use crate::{Array, IntoArray, OwnedArray, ToArray}; + +mod slice; +mod take; + +impl ArrayCompute for VarBinArray<'_> { + fn as_arrow(&self) -> Option<&dyn AsArrowArray> { + Some(self) + } + + fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + Some(self) + } + + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + Some(self) + } + + fn slice(&self) -> Option<&dyn SliceFn> { + Some(self) + } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } +} + +impl AsContiguousFn for VarBinArray<'_> { + fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { + let bytes_chunks: Vec = arrays + .iter() + .map(|a| VarBinArray::try_from(a).unwrap().sliced_bytes()) + .try_collect()?; + let bytes = as_contiguous(&bytes_chunks)?; + + let validity = if self.dtype().is_nullable() { + Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) + } else { + Validity::NonNullable + }; + + let mut offsets = Vec::new(); + offsets.push(0); + for a in arrays.iter().map(|a| VarBinArray::try_from(a).unwrap()) { + let first_offset: u64 = a.first_offset()?; + let offsets_array = cast(&a.offsets(), PType::U64.into())?.flatten_primitive()?; + let shift = offsets.last().copied().unwrap_or(0); + offsets.extend( + offsets_array + .typed_data::() + .iter() + .skip(1) // Ignore the zero offset for each array + .map(|o| o + shift - first_offset), + ); + } + + let offsets_array = PrimitiveArray::from(offsets).into_array(); + + Ok(VarBinArray::new(offsets_array, bytes, self.dtype().clone(), validity).into_array()) + } +} + +impl AsArrowArray for VarBinArray<'_> { + fn as_arrow(&self) -> VortexResult { + // Ensure the offsets are either i32 or i64 + let offsets = self.offsets().flatten_primitive()?; + let offsets = match offsets.ptype() { + PType::I32 | PType::I64 => offsets, + // Unless it's u64, everything else can be converted into an i32. + // FIXME(ngates): do not copy offsets again + PType::U64 => cast(&offsets.to_array(), PType::I64.into())?.flatten_primitive()?, + _ => cast(&offsets.to_array(), PType::I32.into())?.flatten_primitive()?, + }; + let nulls = self.logical_validity().to_null_buffer()?; + + let data = self.bytes().flatten_primitive()?; + assert_eq!(data.ptype(), PType::U8); + let data = data.buffer(); + + // Switch on Arrow DType. + Ok(match self.dtype() { + DType::Binary(_) => match offsets.ptype() { + PType::I32 => Arc::new(BinaryArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + PType::I64 => Arc::new(LargeBinaryArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + _ => panic!("Invalid offsets type"), + }, + DType::Utf8(_) => match offsets.ptype() { + PType::I32 => Arc::new(StringArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + PType::I64 => Arc::new(LargeStringArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + _ => panic!("Invalid offsets type"), + }, + _ => vortex_bail!(MismatchedTypes: "utf8 or binary", self.dtype()), + }) + } +} + +impl ScalarAtFn for VarBinArray<'_> { + fn scalar_at(&self, index: usize) -> VortexResult { + if self.is_valid(index) { + self.bytes_at(index) + .map(|bytes| varbin_scalar(bytes, self.dtype())) + } else { + Ok(Scalar::null(self.dtype())) + } + } +} diff --git a/vortex-array2/src/array/varbin/compute/slice.rs b/vortex-array2/src/array/varbin/compute/slice.rs new file mode 100644 index 0000000000..8aecb1ce6d --- /dev/null +++ b/vortex-array2/src/array/varbin/compute/slice.rs @@ -0,0 +1,17 @@ +use vortex_error::VortexResult; + +use crate::array::varbin::VarBinArray; +use crate::compute::slice::{slice, SliceFn}; +use crate::{IntoArray, OwnedArray}; + +impl SliceFn for VarBinArray<'_> { + fn slice(&self, start: usize, stop: usize) -> VortexResult { + Ok(VarBinArray::new( + slice(&self.offsets(), start, stop + 1)?, + self.bytes().clone(), + self.dtype().clone(), + self.validity().slice(start, stop)?, + ) + .into_array()) + } +} diff --git a/vortex-array2/src/array/varbin/compute/take.rs b/vortex-array2/src/array/varbin/compute/take.rs new file mode 100644 index 0000000000..18d2b17901 --- /dev/null +++ b/vortex-array2/src/array/varbin/compute/take.rs @@ -0,0 +1,80 @@ +use arrow_buffer::NullBuffer; +use vortex::match_each_integer_ptype; +use vortex::ptype::NativePType; +use vortex_error::VortexResult; +use vortex_schema::DType; + +use crate::array::varbin::builder::VarBinBuilder; +use crate::array::varbin::{OwnedVarBinArray, VarBinArray}; +use crate::compute::take::TakeFn; +use crate::validity::Validity; +use crate::IntoArray; +use crate::{Array, OwnedArray}; + +impl TakeFn for VarBinArray<'_> { + fn take(&self, indices: &Array) -> VortexResult { + // TODO(ngates): support i64 indices. + assert!( + indices.len() < i32::MAX as usize, + "indices.len() must be less than i32::MAX" + ); + + let offsets = self.offsets().flatten_primitive()?; + let data = self.bytes().flatten_primitive()?; + let indices = indices.clone().flatten_primitive()?; + match_each_integer_ptype!(offsets.ptype(), |$O| { + match_each_integer_ptype!(indices.ptype(), |$I| { + Ok(take( + self.dtype().clone(), + offsets.typed_data::<$O>(), + data.typed_data::(), + indices.typed_data::<$I>(), + self.validity(), + )?.into_array()) + }) + }) + } +} + +fn take( + dtype: DType, + offsets: &[O], + data: &[u8], + indices: &[I], + validity: Validity, +) -> VortexResult { + let logical_validity = validity.to_logical(offsets.len() - 1); + if let Some(v) = logical_validity.to_null_buffer()? { + return Ok(take_nullable(dtype, offsets, data, indices, v)); + } + + let mut builder = VarBinBuilder::::with_capacity(indices.len()); + for &idx in indices { + let idx = idx.to_usize().unwrap(); + let start = offsets[idx].to_usize().unwrap(); + let stop = offsets[idx + 1].to_usize().unwrap(); + builder.push(Some(&data[start..stop])); + } + Ok(builder.finish(dtype)) +} + +fn take_nullable( + dtype: DType, + offsets: &[O], + data: &[u8], + indices: &[I], + null_buffer: NullBuffer, +) -> OwnedVarBinArray { + let mut builder = VarBinBuilder::::with_capacity(indices.len()); + for &idx in indices { + let idx = idx.to_usize().unwrap(); + if null_buffer.is_valid(idx) { + let start = offsets[idx].to_usize().unwrap(); + let stop = offsets[idx + 1].to_usize().unwrap(); + builder.push(Some(&data[start..stop])); + } else { + builder.push(None); + } + } + builder.finish(dtype) +} diff --git a/vortex-array2/src/array/varbin/flatten.rs b/vortex-array2/src/array/varbin/flatten.rs new file mode 100644 index 0000000000..569689b322 --- /dev/null +++ b/vortex-array2/src/array/varbin/flatten.rs @@ -0,0 +1,13 @@ +use vortex_error::VortexResult; + +use crate::array::varbin::VarBinArray; +use crate::{ArrayFlatten, Flattened}; + +impl ArrayFlatten for VarBinArray<'_> { + fn flatten<'a>(self) -> VortexResult> + where + Self: 'a, + { + Ok(Flattened::VarBin(self)) + } +} diff --git a/vortex-array2/src/array/varbin/mod.rs b/vortex-array2/src/array/varbin/mod.rs new file mode 100644 index 0000000000..18240197b4 --- /dev/null +++ b/vortex-array2/src/array/varbin/mod.rs @@ -0,0 +1,277 @@ +use std::collections::HashMap; + +use num_traits::AsPrimitive; +use serde::{Deserialize, Serialize}; +use vortex::match_each_native_ptype; +use vortex::ptype::NativePType; +use vortex::scalar::{BinaryScalar, Scalar, Utf8Scalar}; +use vortex_error::{vortex_bail, VortexResult}; +use vortex_schema::{DType, IntWidth, Nullability, Signedness}; + +use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::builder::VarBinBuilder; +use crate::compute::scalar_at::scalar_at; +use crate::compute::slice::slice; +use crate::validity::{Validity, ValidityMetadata}; +use crate::{impl_encoding, OwnedArray, ToArrayData}; + +mod accessor; +mod array; +pub mod builder; +mod compute; +mod flatten; +mod stats; + +impl_encoding!("vortex.varbin", VarBin); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VarBinMetadata { + validity: ValidityMetadata, + offsets_dtype: DType, +} + +impl VarBinArray<'_> { + pub fn new(offsets: Array, bytes: Array, dtype: DType, validity: Validity) -> Self { + Self::try_new(offsets, bytes, dtype, validity).unwrap() + } + + pub fn try_new( + offsets: Array, + bytes: Array, + dtype: DType, + validity: Validity, + ) -> VortexResult { + if !matches!(offsets.dtype(), DType::Int(_, _, Nullability::NonNullable)) { + vortex_bail!(MismatchedTypes: "non nullable int", offsets.dtype()); + } + if !matches!( + bytes.dtype(), + DType::Int(IntWidth::_8, Signedness::Unsigned, Nullability::NonNullable) + ) { + vortex_bail!(MismatchedTypes: "u8", bytes.dtype()); + } + if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) { + vortex_bail!(MismatchedTypes: "utf8 or binary", dtype); + } + if dtype.is_nullable() == (validity == Validity::NonNullable) { + vortex_bail!("incorrect validity {:?}", validity); + } + + let metadata = VarBinMetadata { + validity: validity.to_metadata(offsets.len() - 1)?, + offsets_dtype: offsets.dtype().clone(), + }; + + let mut children = Vec::with_capacity(3); + children.push(offsets.to_array_data()); + children.push(bytes.to_array_data()); + if let Some(a) = validity.into_array_data() { + children.push(a) + } + + Self::try_from_parts( + dtype, + metadata, + vec![].into(), + children.into(), + HashMap::default(), + ) + } + + #[inline] + pub fn offsets(&self) -> Array { + self.array() + .child(0, &self.metadata().offsets_dtype) + .expect("missing offsets") + } + + pub fn first_offset(&self) -> VortexResult { + scalar_at(&self.offsets(), 0)? + .cast(&DType::from(T::PTYPE))? + .try_into() + } + + #[inline] + pub fn bytes(&self) -> Array { + self.array().child(1, &DType::BYTES).expect("missing bytes") + } + + pub fn validity(&self) -> Validity { + self.metadata() + .validity + .to_validity(self.array().child(2, &Validity::DTYPE)) + } + + pub fn sliced_bytes(&self) -> VortexResult { + let first_offset: usize = scalar_at(&self.offsets(), 0)?.try_into()?; + let last_offset: usize = + scalar_at(&self.offsets(), self.offsets().len() - 1)?.try_into()?; + slice(&self.bytes(), first_offset, last_offset) + } + + pub fn from_vec>(vec: Vec, dtype: DType) -> Self { + let size: usize = vec.iter().map(|v| v.as_ref().len()).sum(); + if size < u32::MAX as usize { + Self::from_vec_sized::(vec, dtype) + } else { + Self::from_vec_sized::(vec, dtype) + } + } + + fn from_vec_sized(vec: Vec, dtype: DType) -> Self + where + K: NativePType, + T: AsRef<[u8]>, + { + let mut builder = VarBinBuilder::::with_capacity(vec.len()); + for v in vec { + builder.push_value(v.as_ref()); + } + builder.finish(dtype) + } + + pub fn from_iter, I: IntoIterator>>( + iter: I, + dtype: DType, + ) -> Self { + let iter = iter.into_iter(); + let mut builder = VarBinBuilder::::with_capacity(iter.size_hint().0); + for v in iter { + builder.push(v.as_ref().map(|o| o.as_ref())); + } + builder.finish(dtype) + } + + pub(self) fn offset_at(&self, index: usize) -> usize { + PrimitiveArray::try_from(self.offsets()) + .ok() + .map(|p| { + match_each_native_ptype!(p.ptype(), |$P| { + p.typed_data::<$P>()[index].as_() + }) + }) + .unwrap_or_else(|| { + scalar_at(&self.offsets(), index) + .unwrap() + .try_into() + .unwrap() + }) + } + + pub fn bytes_at(&self, index: usize) -> VortexResult> { + let start = self.offset_at(index); + let end = self.offset_at(index + 1); + let sliced = slice(&self.bytes(), start, end)?; + Ok(sliced.flatten_primitive()?.buffer().as_slice().to_vec()) + } +} + +impl From> for VarBinArray<'_> { + fn from(value: Vec<&[u8]>) -> Self { + VarBinArray::from_vec(value, DType::Binary(Nullability::NonNullable)) + } +} + +impl From>> for VarBinArray<'_> { + fn from(value: Vec>) -> Self { + VarBinArray::from_vec(value, DType::Binary(Nullability::NonNullable)) + } +} + +impl From> for VarBinArray<'_> { + fn from(value: Vec) -> Self { + VarBinArray::from_vec(value, DType::Utf8(Nullability::NonNullable)) + } +} + +impl From> for VarBinArray<'_> { + fn from(value: Vec<&str>) -> Self { + VarBinArray::from_vec(value, DType::Utf8(Nullability::NonNullable)) + } +} + +impl<'a> FromIterator> for VarBinArray<'_> { + fn from_iter>>(iter: T) -> Self { + VarBinArray::from_iter(iter, DType::Binary(Nullability::Nullable)) + } +} + +impl FromIterator>> for VarBinArray<'_> { + fn from_iter>>>(iter: T) -> Self { + VarBinArray::from_iter(iter, DType::Binary(Nullability::Nullable)) + } +} + +impl FromIterator> for VarBinArray<'_> { + fn from_iter>>(iter: T) -> Self { + VarBinArray::from_iter(iter, DType::Utf8(Nullability::Nullable)) + } +} + +impl<'a> FromIterator> for VarBinArray<'_> { + fn from_iter>>(iter: T) -> Self { + VarBinArray::from_iter(iter, DType::Utf8(Nullability::Nullable)) + } +} + +pub fn varbin_scalar(value: Vec, dtype: &DType) -> Scalar { + if matches!(dtype, DType::Utf8(_)) { + let str = unsafe { String::from_utf8_unchecked(value) }; + Utf8Scalar::try_new(Some(str), dtype.nullability()) + .unwrap() + .into() + } else { + BinaryScalar::try_new(Some(value), dtype.nullability()) + .unwrap() + .into() + } +} + +#[cfg(test)] +mod test { + use vortex_schema::{DType, Nullability}; + + use crate::array::primitive::PrimitiveArray; + use crate::array::varbin::VarBinArray; + use crate::compute::scalar_at::scalar_at; + use crate::compute::slice::slice; + use crate::validity::Validity; + use crate::{IntoArray, OwnedArray}; + + fn binary_array() -> OwnedArray { + let values = PrimitiveArray::from( + "hello worldhello world this is a long string" + .as_bytes() + .to_vec(), + ); + let offsets = PrimitiveArray::from(vec![0, 11, 44]); + + VarBinArray::new( + offsets.into_array(), + values.into_array(), + DType::Utf8(Nullability::NonNullable), + Validity::NonNullable, + ) + .into_array() + } + + #[test] + pub fn test_scalar_at() { + let binary_arr = binary_array(); + assert_eq!(binary_arr.len(), 2); + assert_eq!(scalar_at(&binary_arr, 0).unwrap(), "hello world".into()); + assert_eq!( + scalar_at(&binary_arr, 1).unwrap(), + "hello world this is a long string".into() + ) + } + + #[test] + pub fn slice_array() { + let binary_arr = slice(&binary_array(), 1, 2).unwrap(); + assert_eq!( + scalar_at(&binary_arr, 0).unwrap(), + "hello world this is a long string".into() + ); + } +} diff --git a/vortex-array2/src/array/varbin/stats.rs b/vortex-array2/src/array/varbin/stats.rs new file mode 100644 index 0000000000..1cd0f6697b --- /dev/null +++ b/vortex-array2/src/array/varbin/stats.rs @@ -0,0 +1,152 @@ +use std::cmp::Ordering; +use std::collections::HashMap; + +use vortex::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_schema::DType; + +use crate::accessor::ArrayAccessor; +use crate::array::varbin::{varbin_scalar, VarBinArray}; +use crate::stats::{ArrayStatisticsCompute, Stat}; + +impl ArrayStatisticsCompute for VarBinArray<'_> { + fn compute_statistics(&self, _stat: Stat) -> VortexResult> { + self.with_iterator(|iter| { + let mut acc = VarBinAccumulator::default(); + for next_val in iter { + acc.nullable_next(next_val) + } + acc.finish(self.dtype()) + }) + } +} + +pub struct VarBinAccumulator<'a> { + min: &'a [u8], + max: &'a [u8], + is_constant: bool, + is_sorted: bool, + is_strict_sorted: bool, + last_value: &'a [u8], + null_count: usize, + runs: usize, +} + +impl Default for VarBinAccumulator<'_> { + fn default() -> Self { + Self { + min: &[0xFF], + max: &[0x00], + is_constant: true, + is_sorted: true, + is_strict_sorted: true, + last_value: &[0x00], + runs: 0, + null_count: 0, + } + } +} + +impl<'a> VarBinAccumulator<'a> { + pub fn nullable_next(&mut self, val: Option<&'a [u8]>) { + match val { + None => self.null_count += 1, + Some(v) => self.next(v), + } + } + + pub fn next(&mut self, val: &'a [u8]) { + if val < self.min { + self.min.clone_from(&val); + } else if val > self.max { + self.max.clone_from(&val); + } + + match val.cmp(self.last_value) { + Ordering::Less => self.is_sorted = false, + Ordering::Equal => { + self.is_strict_sorted = false; + return; + } + Ordering::Greater => {} + } + self.is_constant = false; + self.last_value = val; + self.runs += 1; + } + + pub fn finish(&self, dtype: &DType) -> HashMap { + HashMap::from([ + (Stat::Min, varbin_scalar(self.min.to_vec(), dtype)), + (Stat::Max, varbin_scalar(self.max.to_vec(), dtype)), + (Stat::RunCount, self.runs.into()), + (Stat::IsSorted, self.is_sorted.into()), + (Stat::IsStrictSorted, self.is_strict_sorted.into()), + (Stat::IsConstant, self.is_constant.into()), + (Stat::NullCount, self.null_count.into()), + ]) + } +} + +#[cfg(test)] +mod test { + use vortex_schema::{DType, Nullability}; + + use crate::array::varbin::{OwnedVarBinArray, VarBinArray}; + use crate::stats::{ArrayStatistics, Stat}; + + fn array(dtype: DType) -> OwnedVarBinArray { + VarBinArray::from_vec( + vec!["hello world", "hello world this is a long string"], + dtype, + ) + } + + #[test] + fn utf8_stats() { + let arr = array(DType::Utf8(Nullability::NonNullable)); + assert_eq!( + arr.statistics().compute_as::(Stat::Min).unwrap(), + String::from("hello world") + ); + assert_eq!( + arr.statistics().compute_as::(Stat::Max).unwrap(), + String::from("hello world this is a long string") + ); + assert_eq!( + arr.statistics() + .compute_as::(Stat::RunCount) + .unwrap(), + 2 + ); + assert!(!arr + .statistics() + .compute_as::(Stat::IsConstant) + .unwrap()); + assert!(arr.statistics().compute_as::(Stat::IsSorted).unwrap()); + } + + #[test] + fn binary_stats() { + let arr = array(DType::Binary(Nullability::NonNullable)); + assert_eq!( + arr.statistics().compute_as::>(Stat::Min).unwrap(), + "hello world".as_bytes().to_vec() + ); + assert_eq!( + arr.statistics().compute_as::>(Stat::Max).unwrap(), + "hello world this is a long string".as_bytes().to_vec() + ); + assert_eq!( + arr.statistics() + .compute_as::(Stat::RunCount) + .unwrap(), + 2 + ); + assert!(!arr + .statistics() + .compute_as::(Stat::IsConstant) + .unwrap()); + assert!(arr.statistics().compute_as::(Stat::IsSorted).unwrap()); + } +} diff --git a/vortex-array2/src/arrow/mod.rs b/vortex-array2/src/arrow/mod.rs index 20e0143414..119a42e810 100644 --- a/vortex-array2/src/arrow/mod.rs +++ b/vortex-array2/src/arrow/mod.rs @@ -1,2 +1,3 @@ mod array; mod recordbatch; +pub mod wrappers; diff --git a/vortex-array2/src/arrow/wrappers.rs b/vortex-array2/src/arrow/wrappers.rs new file mode 100644 index 0000000000..3e7e8ae421 --- /dev/null +++ b/vortex-array2/src/arrow/wrappers.rs @@ -0,0 +1,13 @@ +use arrow_buffer::{Buffer as ArrowBuffer, OffsetBuffer, ScalarBuffer}; +use vortex::ptype::NativePType; + +use crate::array::primitive::PrimitiveArray; + +pub fn as_scalar_buffer(array: PrimitiveArray<'_>) -> ScalarBuffer { + assert_eq!(array.ptype(), T::PTYPE); + ScalarBuffer::from(ArrowBuffer::from(array.buffer())) +} + +pub fn as_offset_buffer(array: PrimitiveArray<'_>) -> OffsetBuffer { + OffsetBuffer::new(as_scalar_buffer(array)) +} diff --git a/vortex-array2/src/compute/mod.rs b/vortex-array2/src/compute/mod.rs index c46dda21d3..5ba54a0bda 100644 --- a/vortex-array2/src/compute/mod.rs +++ b/vortex-array2/src/compute/mod.rs @@ -5,6 +5,7 @@ use fill::FillForwardFn; use patch::PatchFn; use scalar_at::ScalarAtFn; use search_sorted::SearchSortedFn; +use slice::SliceFn; use take::TakeFn; pub mod as_arrow; @@ -14,6 +15,7 @@ pub mod fill; pub mod patch; pub mod scalar_at; pub mod search_sorted; +pub mod slice; pub mod take; pub trait ArrayCompute { @@ -45,6 +47,10 @@ pub trait ArrayCompute { None } + fn slice(&self) -> Option<&dyn SliceFn> { + None + } + fn take(&self) -> Option<&dyn TakeFn> { None } diff --git a/vortex-array2/src/compute/slice.rs b/vortex-array2/src/compute/slice.rs new file mode 100644 index 0000000000..e8358e8b23 --- /dev/null +++ b/vortex-array2/src/compute/slice.rs @@ -0,0 +1,31 @@ +use vortex_error::{vortex_bail, vortex_err, VortexResult}; + +use crate::{Array, OwnedArray}; + +/// Limit array to start..stop range +pub trait SliceFn { + fn slice(&self, start: usize, stop: usize) -> VortexResult; +} + +pub fn slice(array: &Array, start: usize, stop: usize) -> VortexResult { + check_slice_bounds(array, start, stop)?; + + array.with_dyn(|c| { + c.slice().map(|t| t.slice(start, stop)).unwrap_or_else(|| { + Err(vortex_err!( + NotImplemented: "slice", + array.encoding().id().name() + )) + }) + }) +} + +fn check_slice_bounds(array: &Array, start: usize, stop: usize) -> VortexResult<()> { + if start > array.len() { + vortex_bail!(OutOfBounds: start, 0, array.len()); + } + if stop > array.len() { + vortex_bail!(OutOfBounds: stop, 0, array.len()); + } + Ok(()) +} diff --git a/vortex-array2/src/data.rs b/vortex-array2/src/data.rs index 677fdbae69..8393d92ff1 100644 --- a/vortex-array2/src/data.rs +++ b/vortex-array2/src/data.rs @@ -9,7 +9,7 @@ use crate::buffer::{Buffer, OwnedBuffer}; use crate::encoding::EncodingRef; use crate::stats::Stat; use crate::stats::Statistics; -use crate::{Array, ArrayMetadata, ArrayParts, IntoArray, ToArray}; +use crate::{Array, ArrayMetadata, IntoArray, ToArray}; #[derive(Clone, Debug)] pub struct ArrayData { @@ -77,6 +77,10 @@ impl ArrayData { &self.children } + pub fn statistics(&self) -> &dyn Statistics { + self + } + pub fn depth_first_traversal(&self) -> ArrayDataIterator { ArrayDataIterator { stack: vec![self] } } @@ -131,28 +135,6 @@ impl IntoArray<'static> for ArrayData { } } -impl ArrayParts for ArrayData { - fn dtype(&self) -> &DType { - &self.dtype - } - - fn buffer(&self, idx: usize) -> Option<&Buffer> { - self.buffers().get(idx) - } - - fn child(&self, idx: usize, dtype: &DType) -> Option { - self.child(idx, dtype).map(move |a| a.to_array()) - } - - fn nchildren(&self) -> usize { - self.children.len() - } - - fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a) { - self - } -} - impl Statistics for ArrayData { fn compute(&self, stat: Stat) -> Option { let mut locked = self.stats_map.write().unwrap(); diff --git a/vortex-array2/src/flatten.rs b/vortex-array2/src/flatten.rs index 341c53a394..5e1241d874 100644 --- a/vortex-array2/src/flatten.rs +++ b/vortex-array2/src/flatten.rs @@ -4,6 +4,7 @@ use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; +use crate::array::varbin::VarBinArray; use crate::encoding::ArrayEncoding; use crate::{Array, IntoArray}; @@ -13,6 +14,7 @@ pub enum Flattened<'a> { Chunked(ChunkedArray<'a>), Primitive(PrimitiveArray<'a>), Struct(StructArray<'a>), + VarBin(VarBinArray<'a>), } pub trait ArrayFlatten { @@ -42,6 +44,7 @@ impl<'a> IntoArray<'a> for Flattened<'a> { Flattened::Primitive(a) => a.into_array(), Flattened::Struct(a) => a.into_array(), Flattened::Chunked(a) => a.into_array(), + Flattened::VarBin(a) => a.into_array(), } } } diff --git a/vortex-array2/src/lib.rs b/vortex-array2/src/lib.rs index 9c8b1283d0..70f6615495 100644 --- a/vortex-array2/src/lib.rs +++ b/vortex-array2/src/lib.rs @@ -1,5 +1,6 @@ extern crate core; +mod accessor; pub mod array; mod arrow; pub mod buffer; @@ -33,7 +34,7 @@ use vortex_schema::DType; use crate::buffer::Buffer; use crate::compute::ArrayCompute; use crate::encoding::{ArrayEncodingRef, EncodingRef}; -use crate::stats::{ArrayStatistics, ArrayStatisticsCompute, Statistics}; +use crate::stats::{ArrayStatistics, ArrayStatisticsCompute}; use crate::validity::ArrayValidity; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; @@ -81,9 +82,9 @@ impl Array<'_> { pub fn buffer(&self, idx: usize) -> Option<&Buffer> { match self { - Array::Data(d) => d.buffer(idx), - Array::DataRef(d) => d.buffer(idx), - Array::View(v) => v.buffer(idx), + Array::Data(d) => d.buffers().get(idx), + Array::DataRef(d) => d.buffers().get(idx), + Array::View(v) => v.buffers().get(idx), } } } @@ -118,19 +119,6 @@ pub trait ToStatic { fn to_static(&self) -> Self::Static; } -pub trait ArrayParts { - fn dtype(&self) -> &DType; - fn buffer(&self, idx: usize) -> Option<&Buffer>; - fn child<'a>(&'a self, idx: usize, dtype: &'a DType) -> Option; - fn nchildren(&self) -> usize; - fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a); -} - -// TODO(ngates): I think we should separate the parts and metadata lifetimes. -pub trait TryFromArrayParts<'v, M: ArrayMetadata>: Sized + 'v { - fn try_from_parts(parts: &'v dyn ArrayParts, metadata: &'v M) -> VortexResult; -} - /// Collects together the behaviour of an array. pub trait ArrayTrait: ArrayEncodingRef diff --git a/vortex-array2/src/typed.rs b/vortex-array2/src/typed.rs index 3ab142de9c..46dc133160 100644 --- a/vortex-array2/src/typed.rs +++ b/vortex-array2/src/typed.rs @@ -10,8 +10,8 @@ use crate::encoding::{ArrayEncodingRef, EncodingRef}; use crate::stats::{ArrayStatistics, Stat, Statistics}; use crate::visitor::ArrayVisitor; use crate::{ - Array, ArrayDType, ArrayData, ArrayDef, ArrayParts, IntoArray, IntoArrayData, ToArray, - ToArrayData, ToStatic, TryDeserializeArrayMetadata, + Array, ArrayDType, ArrayData, ArrayDef, IntoArray, IntoArrayData, ToArray, ToArrayData, + ToStatic, TryDeserializeArrayMetadata, }; #[derive(Debug)] @@ -39,10 +39,6 @@ impl TypedArray<'_, D> { Ok(Self { array, metadata }) } - pub fn array(&self) -> &Array { - &self.array - } - pub fn len(&self) -> usize { self.array.with_dyn(|a| a.len()) } @@ -60,6 +56,12 @@ impl TypedArray<'_, D> { } } +impl<'a, 'b, D: ArrayDef> TypedArray<'b, D> { + pub fn array(&'a self) -> &'a Array<'b> { + &self.array + } +} + impl Clone for TypedArray<'_, D> { fn clone(&self) -> Self { Self { diff --git a/vortex-array2/src/validity.rs b/vortex-array2/src/validity.rs index 433ab80056..9588c541b6 100644 --- a/vortex-array2/src/validity.rs +++ b/vortex-array2/src/validity.rs @@ -5,8 +5,9 @@ use vortex_schema::{DType, Nullability}; use crate::array::bool::BoolArray; use crate::compute::scalar_at::scalar_at; +use crate::compute::slice::slice; use crate::compute::take::take; -use crate::{Array, ArrayData, IntoArray, ToArray, ToArrayData}; +use crate::{Array, ArrayData, IntoArray, IntoArrayData, ToArray, ToArrayData}; pub trait ArrayValidity { fn is_valid(&self, index: usize) -> bool; @@ -27,12 +28,16 @@ impl ValidityMetadata { ValidityMetadata::NonNullable => Validity::NonNullable, ValidityMetadata::AllValid => Validity::AllValid, ValidityMetadata::AllInvalid => Validity::AllInvalid, - // TODO(ngates): should we return a result for this? - ValidityMetadata::Array => Validity::Array(array.unwrap()), + ValidityMetadata::Array => match array { + None => panic!("Missing validity array"), + Some(a) => Validity::Array(a), + }, } } } +pub type OwnedValidity = Validity<'static>; + #[derive(Clone, Debug)] pub enum Validity<'v> { NonNullable, @@ -44,9 +49,9 @@ pub enum Validity<'v> { impl<'v> Validity<'v> { pub const DTYPE: DType = DType::Bool(Nullability::NonNullable); - pub fn to_array_data(self) -> Option { + pub fn into_array_data(self) -> Option { match self { - Validity::Array(a) => Some(a.to_array_data()), + Validity::Array(a) => Some(a.into_array_data()), _ => None, } } @@ -93,6 +98,13 @@ impl<'v> Validity<'v> { } } + pub fn slice(&self, start: usize, stop: usize) -> VortexResult { + match self { + Validity::Array(a) => Ok(Validity::Array(slice(a, start, stop)?)), + _ => Ok(self.clone()), + } + } + pub fn take(&self, indices: &Array) -> VortexResult { match self { Validity::NonNullable => Ok(Validity::NonNullable), @@ -112,7 +124,7 @@ impl<'v> Validity<'v> { } } - pub fn to_static(&self) -> Validity<'static> { + pub fn to_static(&self) -> OwnedValidity { match self { Validity::NonNullable => Validity::NonNullable, Validity::AllValid => Validity::AllValid, @@ -137,7 +149,7 @@ impl PartialEq for Validity<'_> { } } -impl From> for Validity<'static> { +impl From> for OwnedValidity { fn from(bools: Vec) -> Self { if bools.iter().all(|b| *b) { Validity::AllValid @@ -149,7 +161,7 @@ impl From> for Validity<'static> { } } -impl From for Validity<'static> { +impl From for OwnedValidity { fn from(value: BooleanBuffer) -> Self { if value.count_set_bits() == value.len() { Validity::AllValid @@ -161,19 +173,25 @@ impl From for Validity<'static> { } } -impl<'a> FromIterator> for Validity<'static> { +impl From for OwnedValidity { + fn from(value: NullBuffer) -> Self { + value.into_inner().into() + } +} + +impl<'a> FromIterator> for OwnedValidity { fn from_iter>>(_iter: T) -> Self { todo!() } } -impl FromIterator for Validity<'static> { +impl FromIterator for OwnedValidity { fn from_iter>(_iter: T) -> Self { todo!() } } -impl<'a, E> FromIterator<&'a Option> for Validity<'static> { +impl<'a, E> FromIterator<&'a Option> for OwnedValidity { fn from_iter>>(iter: T) -> Self { let bools: Vec = iter.into_iter().map(|option| option.is_some()).collect(); Validity::from(bools) diff --git a/vortex-array2/src/view.rs b/vortex-array2/src/view.rs index 57dbd18c5b..44fa3af2a2 100644 --- a/vortex-array2/src/view.rs +++ b/vortex-array2/src/view.rs @@ -7,8 +7,8 @@ use vortex_schema::DType; use crate::buffer::Buffer; use crate::encoding::EncodingRef; use crate::stats::{EmptyStatistics, Statistics}; +use crate::SerdeContext; use crate::{Array, IntoArray, ToArray}; -use crate::{ArrayParts, SerdeContext}; #[derive(Clone)] pub struct ArrayView<'v> { @@ -134,6 +134,11 @@ impl<'v> ArrayView<'v> { // This is only true for the immediate current node? self.buffers[0..self.nbuffers()].as_ref() } + + pub fn statistics(&self) -> &dyn Statistics { + // TODO(ngates): store statistics in FlatBuffers + &EmptyStatistics + } } impl ToArray for ArrayView<'_> { @@ -147,25 +152,3 @@ impl<'v> IntoArray<'v> for ArrayView<'v> { Array::View(self) } } - -impl ArrayParts for ArrayView<'_> { - fn dtype(&self) -> &DType { - self.dtype - } - - fn buffer(&self, idx: usize) -> Option<&Buffer> { - self.buffers().get(idx) - } - - fn child<'a>(&'a self, idx: usize, dtype: &'a DType) -> Option { - self.child(idx, dtype).map(|a| a.into_array()) - } - - fn nchildren(&self) -> usize { - self.array.children().map(|c| c.len()).unwrap_or_default() - } - - fn statistics<'a>(&'a self) -> &'a (dyn Statistics + 'a) { - &EmptyStatistics - } -} diff --git a/vortex-schema/src/dtype.rs b/vortex-schema/src/dtype.rs index 7c046bb816..b3773150e8 100644 --- a/vortex-schema/src/dtype.rs +++ b/vortex-schema/src/dtype.rs @@ -136,6 +136,8 @@ pub enum DType { } impl DType { + pub const BYTES: DType = Int(IntWidth::_8, Signedness::Unsigned, Nullability::NonNullable); + /// The default DType for indices pub const IDX: DType = Int( IntWidth::_64,