Skip to content

Commit

Permalink
Fix binary stats for arrays containing null bytes and match stats beh…
Browse files Browse the repository at this point in the history
…aviour between varbin and primitive arrays (#233)

Made the bevaiour match the primitive stats as well where min/max is
null if whole array is null
  • Loading branch information
robert3005 authored Apr 13, 2024
1 parent 3bffff9 commit 4e85688
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 48 deletions.
5 changes: 5 additions & 0 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ impl PrimitiveArray {
&self.buffer
}

#[inline]
pub fn into_buffer(self) -> Buffer {
self.buffer
}

pub fn scalar_buffer<T: NativePType>(&self) -> ScalarBuffer<T> {
ScalarBuffer::from(self.buffer().clone())
}
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl StatsCompute for PrimitiveArray {
match_each_native_ptype!(self.ptype(), |$P| {
match self.logical_validity() {
Validity::Valid(_) => self.typed_data::<$P>().compute(stat),
Validity::Invalid(_) => all_null_stats::<$P>(),
Validity::Invalid(v) => all_null_stats::<$P>(v),
Validity::Array(a) => {
NullableValues(self.typed_data::<$P>(), flatten_bool(&a)?.buffer()).compute(stat)
}
Expand All @@ -38,15 +38,15 @@ impl<T: NativePType> StatsCompute for &[T] {
}
}

fn all_null_stats<T: NativePType>() -> VortexResult<StatsSet> {
fn all_null_stats<T: NativePType>(len: usize) -> VortexResult<StatsSet> {
Ok(StatsSet::from(HashMap::from([
(Stat::Min, Option::<T>::None.into()),
(Stat::Max, Option::<T>::None.into()),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, true.into()),
(Stat::IsStrictSorted, (len < 2).into()),
(Stat::RunCount, 1.into()),
(Stat::NullCount, 1.into()),
(Stat::NullCount, len.into()),
(
Stat::BitWidthFreq,
ListScalarVec(vec![0; size_of::<T>() * 8 + 1]).into(),
Expand Down
6 changes: 5 additions & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::{Arc, RwLock};

use linkme::distributed_slice;
use num_traits::AsPrimitive;
pub use stats::compute_stats;
pub use stats::VarBinAccumulator;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_schema::{DType, IntWidth, Nullability, Signedness};
Expand Down Expand Up @@ -173,7 +174,10 @@ impl VarBinArray {
let start = self.offset_at(index);
let end = self.offset_at(index + 1);
let sliced = slice(self.bytes(), start, end)?;
Ok(flatten_primitive(sliced.as_ref())?.buffer().to_vec())
Ok(flatten_primitive(sliced.as_ref())?
.into_buffer()
.into_vec()
.unwrap_or_else(|buf| buf.to_vec()))
}
}

Expand Down
93 changes: 66 additions & 27 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use vortex_schema::DType;

use crate::array::varbin::{varbin_scalar, VarBinArray};
use crate::array::Array;
use crate::scalar::Scalar;
use crate::stats::{Stat, StatsCompute, StatsSet};

impl StatsCompute for VarBinArray {
Expand All @@ -15,22 +16,52 @@ impl StatsCompute for VarBinArray {
return Ok(StatsSet::new());
}

let mut acc = VarBinAccumulator::new();
self.iter_primitive()
.map(|prim_iter| {
for next_val in prim_iter {
acc.nullable_next(next_val.map(Cow::from));
}
})
Ok(self
.iter_primitive()
.map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype()))
.unwrap_or_else(|_| {
for next_val in self.iter() {
acc.nullable_next(next_val.map(Cow::from));
}
});
Ok(acc.finish(self.len(), self.dtype()))
compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype())
}))
}
}

pub fn compute_stats(
iter: &mut dyn Iterator<Item = Option<Cow<'_, [u8]>>>,
dtype: &DType,
) -> StatsSet {
let mut leading_nulls: usize = 0;
let mut first_value: Option<Cow<'_, [u8]>> = None;
for v in &mut *iter {
if v.is_none() {
leading_nulls += 1;
} else {
first_value = v;
break;
}
}

if let Some(first_non_null) = first_value {
let mut acc = VarBinAccumulator::new(first_non_null);
iter.for_each(|n| acc.nullable_next(n));
acc.n_nulls(leading_nulls);
acc.finish(dtype)
} else {
all_null_stats(leading_nulls, dtype)
}
}

fn all_null_stats(len: usize, dtype: &DType) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, Scalar::null(dtype)),
(Stat::Max, Scalar::null(dtype)),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, (len < 2).into()),
(Stat::RunCount, 1.into()),
(Stat::NullCount, len.into()),
]))
}

#[derive(Debug, Default)]
pub struct VarBinAccumulator<'a> {
min: Cow<'a, [u8]>,
Expand All @@ -44,15 +75,15 @@ pub struct VarBinAccumulator<'a> {
}

impl<'a> VarBinAccumulator<'a> {
pub fn new() -> Self {
pub fn new(value: Cow<'a, [u8]>) -> Self {
Self {
min: Cow::from(&[0xFF]),
max: Cow::from(&[0x00]),
min: value.clone(),
max: value.clone(),
is_constant: true,
is_sorted: true,
is_strict_sorted: true,
last_value: Cow::from(&[0x00]),
runs: 0,
last_value: value,
runs: 1,
null_count: 0,
}
}
Expand All @@ -64,6 +95,10 @@ impl<'a> VarBinAccumulator<'a> {
}
}

pub fn n_nulls(&mut self, null_count: usize) {
self.null_count += null_count;
}

pub fn next(&mut self, val: Cow<'a, [u8]>) {
if val < self.min {
self.min.clone_from(&val);
Expand All @@ -84,19 +119,16 @@ impl<'a> VarBinAccumulator<'a> {
self.runs += 1;
}

pub fn finish(&self, len: usize, dtype: &DType) -> StatsSet {
let mut stats = StatsSet::from(HashMap::from([
pub fn finish(&self, dtype: &DType) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, varbin_scalar(self.min.to_vec(), dtype)),
(Stat::Max, varbin_scalar(self.max.to_vec(), dtype)),
(Stat::RunCount, self.runs.into()),
(Stat::IsSorted, self.is_sorted.into()),
(Stat::IsStrictSorted, self.is_strict_sorted.into()),
(Stat::IsConstant, self.is_constant.into()),
(Stat::NullCount, self.null_count.into()),
]));
if self.null_count < len {
stats.set(Stat::Min, varbin_scalar(self.min.to_vec(), dtype));
stats.set(Stat::Max, varbin_scalar(self.max.to_vec(), dtype));
}
stats
]))
}
}

Expand All @@ -106,6 +138,7 @@ mod test {

use crate::array::varbin::VarBinArray;
use crate::array::Array;
use crate::scalar::Utf8Scalar;
use crate::stats::Stat;

fn array(dtype: DType) -> VarBinArray {
Expand Down Expand Up @@ -206,7 +239,13 @@ mod test {
vec![Option::<&str>::None, None, None],
DType::Utf8(Nullability::Nullable),
);
assert!(array.stats().get_or_compute(&Stat::Min).is_none());
assert!(array.stats().get_or_compute(&Stat::Max).is_none());
assert_eq!(
array.stats().get_or_compute(&Stat::Min).unwrap(),
Utf8Scalar::none().into()
);
assert_eq!(
array.stats().get_or_compute(&Stat::Max).unwrap(),
Utf8Scalar::none().into()
);
}
}
8 changes: 5 additions & 3 deletions vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,15 @@ impl VarBinViewArray {
let view = self.view_at(index);
unsafe {
if view.inlined.size > 12 {
let arrow_data_buffer = flatten_primitive(&slice(
let data_buf = flatten_primitive(&slice(
self.data.get(view._ref.buffer_index as usize).unwrap(),
view._ref.offset as usize,
(view._ref.size + view._ref.offset) as usize,
)?)?;
// TODO(ngates): can we avoid returning a copy?
Ok(arrow_data_buffer.typed_data::<u8>().to_vec())
Ok(data_buf
.into_buffer()
.into_vec()
.unwrap_or_else(|buf| buf.to_vec()))
} else {
Ok(view.inlined.data[..view.inlined.size as usize].to_vec())
}
Expand Down
19 changes: 6 additions & 13 deletions vortex-array/src/array/varbinview/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;

use vortex_error::VortexResult;

use crate::array::varbin::VarBinAccumulator;
use crate::array::varbin::compute_stats;
use crate::array::varbinview::VarBinViewArray;
use crate::array::Array;
use crate::stats::{Stat, StatsCompute, StatsSet};
Expand All @@ -13,18 +13,11 @@ impl StatsCompute for VarBinViewArray {
return Ok(StatsSet::new());
}

let mut acc = VarBinAccumulator::new();
self.iter_primitive()
.map(|prim_iter| {
for next_val in prim_iter {
acc.nullable_next(next_val.map(Cow::from));
}
})
Ok(self
.iter_primitive()
.map(|prim_iter| compute_stats(&mut prim_iter.map(|s| s.map(Cow::from)), self.dtype()))
.unwrap_or_else(|_| {
for next_val in self.iter() {
acc.nullable_next(next_val.map(Cow::from));
}
});
Ok(acc.finish(self.len(), self.dtype()))
compute_stats(&mut self.iter().map(|s| s.map(Cow::from)), self.dtype())
}))
}
}

0 comments on commit 4e85688

Please sign in to comment.