diff --git a/vortex-array/src/array/primitive/mod.rs b/vortex-array/src/array/primitive/mod.rs index 5ddaa608dd..cff70808ce 100644 --- a/vortex-array/src/array/primitive/mod.rs +++ b/vortex-array/src/array/primitive/mod.rs @@ -141,6 +141,11 @@ impl PrimitiveArray { &self.buffer } + #[inline] + pub fn into_buffer(self) -> Buffer { + self.buffer + } + pub fn scalar_buffer(&self) -> ScalarBuffer { ScalarBuffer::from(self.buffer().clone()) } diff --git a/vortex-array/src/array/primitive/stats.rs b/vortex-array/src/array/primitive/stats.rs index 271b163aae..ecc85713fc 100644 --- a/vortex-array/src/array/primitive/stats.rs +++ b/vortex-array/src/array/primitive/stats.rs @@ -18,7 +18,7 @@ impl StatsCompute for PrimitiveArray { match_each_native_ptype!(self.ptype(), |$P| { match self.logical_validity() { Validity::Valid(_) => self.typed_data::<$P>().compute(stat), - Validity::Invalid(_) => all_null_stats::<$P>(), + Validity::Invalid(v) => all_null_stats::<$P>(v), Validity::Array(a) => { NullableValues(self.typed_data::<$P>(), flatten_bool(&a)?.buffer()).compute(stat) } @@ -38,15 +38,15 @@ impl StatsCompute for &[T] { } } -fn all_null_stats() -> VortexResult { +fn all_null_stats(len: usize) -> VortexResult { Ok(StatsSet::from(HashMap::from([ (Stat::Min, Option::::None.into()), (Stat::Max, Option::::None.into()), (Stat::IsConstant, true.into()), (Stat::IsSorted, true.into()), - (Stat::IsStrictSorted, true.into()), + (Stat::IsStrictSorted, (len < 2).into()), (Stat::RunCount, 1.into()), - (Stat::NullCount, 1.into()), + (Stat::NullCount, len.into()), ( Stat::BitWidthFreq, ListScalarVec(vec![0; size_of::() * 8 + 1]).into(), diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index cded81dc60..99f89d9c41 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, RwLock}; use linkme::distributed_slice; use num_traits::AsPrimitive; +pub use stats::compute_stats; pub use stats::VarBinAccumulator; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_schema::{DType, IntWidth, Nullability, Signedness}; @@ -173,7 +174,10 @@ impl VarBinArray { let start = self.offset_at(index); let end = self.offset_at(index + 1); let sliced = slice(self.bytes(), start, end)?; - Ok(flatten_primitive(sliced.as_ref())?.buffer().to_vec()) + Ok(flatten_primitive(sliced.as_ref())? + .into_buffer() + .into_vec() + .unwrap_or_else(|buf| buf.to_vec())) } } diff --git a/vortex-array/src/array/varbin/stats.rs b/vortex-array/src/array/varbin/stats.rs index 2919187b25..e92e61f5c5 100644 --- a/vortex-array/src/array/varbin/stats.rs +++ b/vortex-array/src/array/varbin/stats.rs @@ -7,6 +7,7 @@ use vortex_schema::DType; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::array::Array; +use crate::scalar::Scalar; use crate::stats::{Stat, StatsCompute, StatsSet}; impl StatsCompute for VarBinArray { @@ -15,22 +16,52 @@ impl StatsCompute for VarBinArray { return Ok(StatsSet::new()); } - let mut acc = VarBinAccumulator::new(); - self.iter_primitive() - .map(|prim_iter| { - for next_val in prim_iter { - acc.nullable_next(next_val.map(Cow::from)); - } - }) + Ok(self + .iter_primitive() + .map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype())) .unwrap_or_else(|_| { - for next_val in self.iter() { - acc.nullable_next(next_val.map(Cow::from)); - } - }); - Ok(acc.finish(self.len(), self.dtype())) + compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype()) + })) } } +pub fn compute_stats( + iter: &mut dyn Iterator>>, + dtype: &DType, +) -> StatsSet { + let mut leading_nulls: usize = 0; + let mut first_value: Option> = None; + for v in &mut *iter { + if v.is_none() { + leading_nulls += 1; + } else { + first_value = v; + break; + } + } + + if let Some(first_non_null) = first_value { + let mut acc = VarBinAccumulator::new(first_non_null); + iter.for_each(|n| acc.nullable_next(n)); + acc.n_nulls(leading_nulls); + acc.finish(dtype) + } else { + all_null_stats(leading_nulls, dtype) + } +} + +fn all_null_stats(len: usize, dtype: &DType) -> StatsSet { + StatsSet::from(HashMap::from([ + (Stat::Min, Scalar::null(dtype)), + (Stat::Max, Scalar::null(dtype)), + (Stat::IsConstant, true.into()), + (Stat::IsSorted, true.into()), + (Stat::IsStrictSorted, (len < 2).into()), + (Stat::RunCount, 1.into()), + (Stat::NullCount, len.into()), + ])) +} + #[derive(Debug, Default)] pub struct VarBinAccumulator<'a> { min: Cow<'a, [u8]>, @@ -44,15 +75,15 @@ pub struct VarBinAccumulator<'a> { } impl<'a> VarBinAccumulator<'a> { - pub fn new() -> Self { + pub fn new(value: Cow<'a, [u8]>) -> Self { Self { - min: Cow::from(&[0xFF]), - max: Cow::from(&[0x00]), + min: value.clone(), + max: value.clone(), is_constant: true, is_sorted: true, is_strict_sorted: true, - last_value: Cow::from(&[0x00]), - runs: 0, + last_value: value, + runs: 1, null_count: 0, } } @@ -64,6 +95,10 @@ impl<'a> VarBinAccumulator<'a> { } } + pub fn n_nulls(&mut self, null_count: usize) { + self.null_count += null_count; + } + pub fn next(&mut self, val: Cow<'a, [u8]>) { if val < self.min { self.min.clone_from(&val); @@ -84,19 +119,16 @@ impl<'a> VarBinAccumulator<'a> { self.runs += 1; } - pub fn finish(&self, len: usize, dtype: &DType) -> StatsSet { - let mut stats = StatsSet::from(HashMap::from([ + pub fn finish(&self, dtype: &DType) -> StatsSet { + StatsSet::from(HashMap::from([ + (Stat::Min, varbin_scalar(self.min.to_vec(), dtype)), + (Stat::Max, varbin_scalar(self.max.to_vec(), dtype)), (Stat::RunCount, self.runs.into()), (Stat::IsSorted, self.is_sorted.into()), (Stat::IsStrictSorted, self.is_strict_sorted.into()), (Stat::IsConstant, self.is_constant.into()), (Stat::NullCount, self.null_count.into()), - ])); - if self.null_count < len { - stats.set(Stat::Min, varbin_scalar(self.min.to_vec(), dtype)); - stats.set(Stat::Max, varbin_scalar(self.max.to_vec(), dtype)); - } - stats + ])) } } @@ -106,6 +138,7 @@ mod test { use crate::array::varbin::VarBinArray; use crate::array::Array; + use crate::scalar::Utf8Scalar; use crate::stats::Stat; fn array(dtype: DType) -> VarBinArray { @@ -206,7 +239,13 @@ mod test { vec![Option::<&str>::None, None, None], DType::Utf8(Nullability::Nullable), ); - assert!(array.stats().get_or_compute(&Stat::Min).is_none()); - assert!(array.stats().get_or_compute(&Stat::Max).is_none()); + assert_eq!( + array.stats().get_or_compute(&Stat::Min).unwrap(), + Utf8Scalar::none().into() + ); + assert_eq!( + array.stats().get_or_compute(&Stat::Max).unwrap(), + Utf8Scalar::none().into() + ); } } diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 6ba88aab49..30394fea1a 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -221,13 +221,15 @@ impl VarBinViewArray { let view = self.view_at(index); unsafe { if view.inlined.size > 12 { - let arrow_data_buffer = flatten_primitive(&slice( + let data_buf = flatten_primitive(&slice( self.data.get(view._ref.buffer_index as usize).unwrap(), view._ref.offset as usize, (view._ref.size + view._ref.offset) as usize, )?)?; - // TODO(ngates): can we avoid returning a copy? - Ok(arrow_data_buffer.typed_data::().to_vec()) + Ok(data_buf + .into_buffer() + .into_vec() + .unwrap_or_else(|buf| buf.to_vec())) } else { Ok(view.inlined.data[..view.inlined.size as usize].to_vec()) } diff --git a/vortex-array/src/array/varbinview/stats.rs b/vortex-array/src/array/varbinview/stats.rs index eedae29049..b01ac64ded 100644 --- a/vortex-array/src/array/varbinview/stats.rs +++ b/vortex-array/src/array/varbinview/stats.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use vortex_error::VortexResult; -use crate::array::varbin::VarBinAccumulator; +use crate::array::varbin::compute_stats; use crate::array::varbinview::VarBinViewArray; use crate::array::Array; use crate::stats::{Stat, StatsCompute, StatsSet}; @@ -13,18 +13,11 @@ impl StatsCompute for VarBinViewArray { return Ok(StatsSet::new()); } - let mut acc = VarBinAccumulator::new(); - self.iter_primitive() - .map(|prim_iter| { - for next_val in prim_iter { - acc.nullable_next(next_val.map(Cow::from)); - } - }) + Ok(self + .iter_primitive() + .map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype())) .unwrap_or_else(|_| { - for next_val in self.iter() { - acc.nullable_next(next_val.map(Cow::from)); - } - }); - Ok(acc.finish(self.len(), self.dtype())) + compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype()) + })) } }