Skip to content

Commit

Permalink
Primitive stats (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Mar 5, 2024
1 parent 5cf6e33 commit db705e1
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 151 deletions.
10 changes: 2 additions & 8 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,14 @@ impl EncodingCompression for BitPackedEncoding {
}

// Check that the min > zero
if parray
.stats()
.get_or_compute_cast::<i64>(&Stat::Min)
.unwrap()
< 0
{
if parray.stats().get_or_compute_cast::<i64>(&Stat::Min)? < 0 {
debug!("Skipping BitPacking: min is zero");
return None;
}

let bit_width_freq = parray
.stats()
.get_or_compute_as::<ListScalarVec<usize>>(&Stat::BitWidthFreq)
.unwrap()
.get_or_compute_as::<ListScalarVec<usize>>(&Stat::BitWidthFreq)?
.0;
let bit_width = best_bit_width(parray.ptype(), &bit_width_freq);

Expand Down
7 changes: 1 addition & 6 deletions vortex-fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ impl EncodingCompression for FoREncoding {
}

// Nothing for us to do if the min is already zero.
if parray
.stats()
.get_or_compute_cast::<i64>(&Stat::Min)
.unwrap()
== 0
{
if parray.stats().get_or_compute_cast::<i64>(&Stat::Min)? == 0 {
debug!("Skipping BitPacking: min is zero");
return None;
}
Expand Down
270 changes: 152 additions & 118 deletions vortex/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
@@ -1,150 +1,175 @@
use arrow::buffer::BooleanBuffer;
use std::collections::HashMap;
use std::marker::PhantomData;

use arrow::datatypes::ArrowNativeType;
use half::f16;
use num_traits::{NumCast, PrimInt};
use std::mem::size_of;

use crate::array::primitive::PrimitiveArray;
use crate::compute::cast::cast_bool;
use crate::error::VortexResult;
use crate::ptype::match_each_native_ptype;
use crate::scalar::{ListScalarVec, ScalarRef};
use crate::match_each_native_ptype;
use crate::ptype::NativePType;
use crate::scalar::{ListScalarVec, NullableScalar, PScalar, Scalar};
use crate::stats::{Stat, StatsCompute, StatsSet};

impl StatsCompute for PrimitiveArray {
fn compute(&self, stat: &Stat) -> VortexResult<StatsSet> {
match_each_native_ptype!(self.ptype(), |$P| {
WrappedPrimitive::<$P>::new(self).compute(stat)
match self.validity() {
None => self.typed_data::<$P>().compute(stat),
Some(validity_array) => {
let validity = cast_bool(validity_array)?;
NullableValues(self.typed_data::<$P>(), validity.buffer()).compute(stat)
}
}
})
}
}

struct WrappedPrimitive<'a, P>(&'a PrimitiveArray, PhantomData<P>);

impl<'a, P> WrappedPrimitive<'a, P> {
pub fn new(array: &'a PrimitiveArray) -> Self {
Self(array, PhantomData)
impl<T: NativePType> StatsCompute for &[T] {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
if self.is_empty() {
return Ok(StatsSet::default());
}
let mut stats = StatsAccumulator::new(self[0]);
self.iter().skip(1).for_each(|next| stats.next(*next));
Ok(stats.into_set())
}
}

macro_rules! integer_stats {
($T:ty) => {
impl StatsCompute for WrappedPrimitive<'_, $T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
integer_stats::<$T>(self.0)
}
struct NullableValues<'a, T: NativePType>(&'a [T], &'a BooleanBuffer);

impl<'a, T: NativePType> StatsCompute for NullableValues<'a, T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
let values = self.0;
if values.is_empty() {
return Ok(StatsSet::default());
}

let first_non_null = self
.1
.iter()
.enumerate()
.skip_while(|(_, valid)| !*valid)
.map(|(idx, _)| values[idx])
.next();

if first_non_null.is_none() {
return Ok(StatsSet::from(HashMap::from([
(Stat::Min, NullableScalar::None(T::PTYPE.into()).boxed()),
(Stat::Max, NullableScalar::None(T::PTYPE.into()).boxed()),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, true.into()),
(Stat::RunCount, 1.into()),
(Stat::NullCount, 1.into()),
(
Stat::BitWidthFreq,
ListScalarVec(vec![0; size_of::<T>() * 8 + 1]).into(),
),
])));
}
};

let mut stats = StatsAccumulator::new(first_non_null.unwrap());
values
.iter()
.zip(self.1.iter())
.skip(1)
.map(|(next, valid)| valid.then_some(*next))
.for_each(|next| stats.nullable_next(next));
Ok(stats.into_set())
}
}

integer_stats!(i8);
integer_stats!(i16);
integer_stats!(i32);
integer_stats!(i64);
integer_stats!(u8);
integer_stats!(u16);
integer_stats!(u32);
integer_stats!(u64);

macro_rules! float_stats {
($T:ty) => {
impl StatsCompute for WrappedPrimitive<'_, $T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
float_stats::<$T>(self.0)
}
trait BitWidth {
fn bit_width(self) -> usize;
}

impl<T: NativePType + Into<PScalar>> BitWidth for T {
fn bit_width(self) -> usize {
let bit_width = size_of::<T>() * 8;
let scalar: PScalar = self.into();
match scalar {
PScalar::U8(i) => bit_width - i.leading_zeros() as usize,
PScalar::U16(i) => bit_width - i.leading_zeros() as usize,
PScalar::U32(i) => bit_width - i.leading_zeros() as usize,
PScalar::U64(i) => bit_width - i.leading_zeros() as usize,
PScalar::I8(i) => bit_width - i.leading_zeros() as usize,
PScalar::I16(i) => bit_width - i.leading_zeros() as usize,
PScalar::I32(i) => bit_width - i.leading_zeros() as usize,
PScalar::I64(i) => bit_width - i.leading_zeros() as usize,
PScalar::F16(_) => bit_width,
PScalar::F32(_) => bit_width,
PScalar::F64(_) => bit_width,
}
};
}
}

float_stats!(f16);
float_stats!(f32);
float_stats!(f64);

fn integer_stats<T: ArrowNativeType + NumCast + PrimInt>(
array: &PrimitiveArray,
) -> VortexResult<StatsSet>
where
ScalarRef: From<T>,
{
let typed_buf: &[T] = array.buffer().typed_data();
// TODO(ngates): bail out on empty stats

let bitwidth = std::mem::size_of::<T>() * 8;
let mut bit_widths: Vec<u64> = vec![0; bitwidth + 1];
bit_widths[bitwidth - typed_buf[0].leading_zeros() as usize] += 1;

let mut is_sorted = true;
let mut is_strict_sorted = true;
let mut min = typed_buf[0];
let mut max = typed_buf[0];
let mut last_val = typed_buf[0];
let mut run_count: usize = 0;

for v in &typed_buf[1..] {
bit_widths[bitwidth - v.leading_zeros() as usize] += 1;
if last_val == *v {
is_strict_sorted = false;
} else {
if *v < last_val {
is_sorted = false;
struct StatsAccumulator<T: NativePType> {
prev: T,
min: T,
max: T,
is_sorted: bool,
is_strict_sorted: bool,
run_count: usize,
bit_widths: Vec<usize>,
}

impl<T: NativePType> StatsAccumulator<T> {
fn new(first_value: T) -> Self {
let mut stats = Self {
prev: first_value,
min: first_value,
max: first_value,
is_sorted: true,
is_strict_sorted: true,
run_count: 1,
bit_widths: vec![0; size_of::<T>() * 8 + 1],
};
stats.bit_widths[first_value.bit_width()] += 1;
stats
}

pub fn nullable_next(&mut self, next: Option<T>) {
match next {
Some(n) => self.next(n),
None => {
self.bit_widths[0] += 1;
}
run_count += 1;
}
if *v < min {
min = *v;
} else if *v > max {
max = *v;
}
last_val = *v;
}
run_count += 1;

Ok(StatsSet::from(HashMap::from([
(Stat::Min, min.into()),
(Stat::Max, max.into()),
(Stat::IsConstant, (min == max).into()),
(Stat::BitWidthFreq, ListScalarVec(bit_widths).into()),
(Stat::IsSorted, is_sorted.into()),
(Stat::IsStrictSorted, (is_sorted && is_strict_sorted).into()),
(Stat::RunCount, run_count.into()),
])))
}

fn float_stats<T: ArrowNativeType + NumCast>(array: &PrimitiveArray) -> VortexResult<StatsSet>
where
ScalarRef: From<T>,
{
let typed_buf: &[T] = array.buffer().typed_data();
// TODO: bail out on empty stats

let mut min = typed_buf[0];
let mut max = typed_buf[0];
let mut last_val: T = typed_buf[0];
let mut is_sorted = true;
let mut run_count: usize = 0;
for v in &typed_buf[1..] {
if last_val != *v {
run_count += 1;
if *v < last_val {
is_sorted = false;
pub fn next(&mut self, next: T) {
self.bit_widths[next.bit_width()] += 1;

if self.prev == next {
self.is_strict_sorted = false;
} else {
if next < self.prev {
self.is_sorted = false;
}
self.run_count += 1;
}
if *v < min {
min = *v;
} else if *v > max {
max = *v;
if next < self.min {
self.min = next;
} else if next > self.max {
self.max = next;
}
last_val = *v;
self.prev = next;
}

pub fn into_set(self) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, self.min.into()),
(Stat::Max, self.max.into()),
(Stat::IsConstant, (self.min == self.max).into()),
(Stat::BitWidthFreq, ListScalarVec(self.bit_widths).into()),
(Stat::IsSorted, self.is_sorted.into()),
(
Stat::IsStrictSorted,
(self.is_sorted && self.is_strict_sorted).into(),
),
(Stat::RunCount, self.run_count.into()),
]))
}
run_count += 1;

Ok(StatsSet::from(HashMap::from([
(Stat::Min, min.into()),
(Stat::Max, max.into()),
(Stat::IsConstant, (min == max).into()),
(Stat::IsSorted, is_sorted.into()),
(Stat::RunCount, run_count.into()),
])))
}

#[cfg(test)]
Expand Down Expand Up @@ -188,10 +213,19 @@ mod test {

#[test]
fn stats_u8() {
let arr = PrimitiveArray::from_vec::<u8>(vec![1, 2, 3, 4, 5]);
let arr = PrimitiveArray::from_vec(vec![1u8, 2, 3, 4, 5]);
let min: u8 = arr.stats().get_or_compute_as(&Stat::Min).unwrap();
let max: u8 = arr.stats().get_or_compute_as(&Stat::Max).unwrap();
assert_eq!(min, 1);
assert_eq!(max, 5);
}

#[test]
fn nullable_stats_u8() {
let arr = PrimitiveArray::from_iter(vec![None, Some(1i32), None, Some(2)]);
let min: Option<i32> = arr.stats().get_or_compute_as(&Stat::Min);
let max: Option<i32> = arr.stats().get_or_compute_as(&Stat::Max);
assert_eq!(min, Some(1));
assert_eq!(max, Some(2));
}
}
5 changes: 4 additions & 1 deletion vortex/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use num_traits::NumCast;

use crate::dtype::{DType, FloatWidth, IntWidth, Signedness};
use crate::error::{VortexError, VortexResult};
use crate::scalar::ScalarRef;
use crate::scalar::{PScalar, ScalarRef};

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Hash)]
pub enum PType {
Expand All @@ -31,11 +31,14 @@ pub trait NativePType:
+ Debug
+ Display
+ PartialEq
+ PartialOrd
+ Default
+ ArrowNativeType
+ RefUnwindSafe
+ NumCast
+ Into<ScalarRef>
+ TryFrom<ScalarRef, Error = VortexError>
+ Into<PScalar>
{
const PTYPE: PType;
}
Expand Down
12 changes: 0 additions & 12 deletions vortex/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}

impl<T: NativePType> From<Option<T>> for Box<dyn Scalar>
where
Box<dyn Scalar>: From<T>,
{
fn from(value: Option<T>) -> Self {
match value {
Some(value) => value.into(),
None => Box::new(NullableScalar::None(DType::from(T::PTYPE))),
}
}
}

impl<T: NativePType> AsBytes for [T] {
#[inline]
fn as_bytes(&self) -> &[u8] {
Expand Down
Loading

0 comments on commit db705e1

Please sign in to comment.