Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primitive stats #66

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions vortex-fastlanes/src/bitpacking/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,14 @@ impl EncodingCompression for BitPackedEncoding {
}

// Check that the min > zero
if parray
.stats()
.get_or_compute_cast::<i64>(&Stat::Min)
.unwrap()
< 0
{
if parray.stats().get_or_compute_cast::<i64>(&Stat::Min)? < 0 {
debug!("Skipping BitPacking: min is zero");
return None;
}

let bit_width_freq = parray
.stats()
.get_or_compute_as::<ListScalarVec<usize>>(&Stat::BitWidthFreq)
.unwrap()
.get_or_compute_as::<ListScalarVec<usize>>(&Stat::BitWidthFreq)?
.0;
let bit_width = best_bit_width(parray.ptype(), &bit_width_freq);

Expand Down
7 changes: 1 addition & 6 deletions vortex-fastlanes/src/for/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ impl EncodingCompression for FoREncoding {
}

// Nothing for us to do if the min is already zero.
if parray
.stats()
.get_or_compute_cast::<i64>(&Stat::Min)
.unwrap()
== 0
{
if parray.stats().get_or_compute_cast::<i64>(&Stat::Min)? == 0 {
debug!("Skipping BitPacking: min is zero");
return None;
}
Expand Down
270 changes: 152 additions & 118 deletions vortex/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
@@ -1,150 +1,175 @@
use arrow::buffer::BooleanBuffer;
use std::collections::HashMap;
use std::marker::PhantomData;

use arrow::datatypes::ArrowNativeType;
use half::f16;
use num_traits::{NumCast, PrimInt};
use std::mem::size_of;

use crate::array::primitive::PrimitiveArray;
use crate::compute::cast::cast_bool;
use crate::error::VortexResult;
use crate::ptype::match_each_native_ptype;
use crate::scalar::{ListScalarVec, ScalarRef};
use crate::match_each_native_ptype;
use crate::ptype::NativePType;
use crate::scalar::{ListScalarVec, NullableScalar, PScalar, Scalar};
use crate::stats::{Stat, StatsCompute, StatsSet};

impl StatsCompute for PrimitiveArray {
fn compute(&self, stat: &Stat) -> VortexResult<StatsSet> {
match_each_native_ptype!(self.ptype(), |$P| {
WrappedPrimitive::<$P>::new(self).compute(stat)
match self.validity() {
None => self.typed_data::<$P>().compute(stat),
Some(validity_array) => {
let validity = cast_bool(validity_array)?;
NullableValues(self.typed_data::<$P>(), validity.buffer()).compute(stat)
}
}
})
}
}

struct WrappedPrimitive<'a, P>(&'a PrimitiveArray, PhantomData<P>);

impl<'a, P> WrappedPrimitive<'a, P> {
pub fn new(array: &'a PrimitiveArray) -> Self {
Self(array, PhantomData)
impl<T: NativePType> StatsCompute for &[T] {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
if self.is_empty() {
return Ok(StatsSet::default());
}
let mut stats = StatsAccumulator::new(self[0]);
self.iter().skip(1).for_each(|next| stats.next(*next));
Ok(stats.into_set())
}
}

macro_rules! integer_stats {
($T:ty) => {
impl StatsCompute for WrappedPrimitive<'_, $T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
integer_stats::<$T>(self.0)
}
struct NullableValues<'a, T: NativePType>(&'a [T], &'a BooleanBuffer);

impl<'a, T: NativePType> StatsCompute for NullableValues<'a, T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
let values = self.0;
if values.is_empty() {
return Ok(StatsSet::default());
}

let first_non_null = self
.1
.iter()
.enumerate()
.skip_while(|(_, valid)| !*valid)
.map(|(idx, _)| values[idx])
.next();

if first_non_null.is_none() {
return Ok(StatsSet::from(HashMap::from([
(Stat::Min, NullableScalar::None(T::PTYPE.into()).boxed()),
(Stat::Max, NullableScalar::None(T::PTYPE.into()).boxed()),
(Stat::IsConstant, true.into()),
(Stat::IsSorted, true.into()),
(Stat::IsStrictSorted, true.into()),
(Stat::RunCount, 1.into()),
(Stat::NullCount, 1.into()),
(
Stat::BitWidthFreq,
ListScalarVec(vec![0; size_of::<T>() * 8 + 1]).into(),
),
])));
}
};

let mut stats = StatsAccumulator::new(first_non_null.unwrap());
values
.iter()
.zip(self.1.iter())
.skip(1)
.map(|(next, valid)| valid.then_some(*next))
.for_each(|next| stats.nullable_next(next));
Ok(stats.into_set())
}
Comment on lines +47 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a way to make this nicer but can leave for later

}

integer_stats!(i8);
integer_stats!(i16);
integer_stats!(i32);
integer_stats!(i64);
integer_stats!(u8);
integer_stats!(u16);
integer_stats!(u32);
integer_stats!(u64);

macro_rules! float_stats {
($T:ty) => {
impl StatsCompute for WrappedPrimitive<'_, $T> {
fn compute(&self, _stat: &Stat) -> VortexResult<StatsSet> {
float_stats::<$T>(self.0)
}
trait BitWidth {
fn bit_width(self) -> usize;
}

impl<T: NativePType + Into<PScalar>> BitWidth for T {
fn bit_width(self) -> usize {
let bit_width = size_of::<T>() * 8;
let scalar: PScalar = self.into();
match scalar {
PScalar::U8(i) => bit_width - i.leading_zeros() as usize,
PScalar::U16(i) => bit_width - i.leading_zeros() as usize,
PScalar::U32(i) => bit_width - i.leading_zeros() as usize,
PScalar::U64(i) => bit_width - i.leading_zeros() as usize,
PScalar::I8(i) => bit_width - i.leading_zeros() as usize,
PScalar::I16(i) => bit_width - i.leading_zeros() as usize,
PScalar::I32(i) => bit_width - i.leading_zeros() as usize,
PScalar::I64(i) => bit_width - i.leading_zeros() as usize,
PScalar::F16(_) => bit_width,
PScalar::F32(_) => bit_width,
PScalar::F64(_) => bit_width,
}
};
}
}

float_stats!(f16);
float_stats!(f32);
float_stats!(f64);

fn integer_stats<T: ArrowNativeType + NumCast + PrimInt>(
array: &PrimitiveArray,
) -> VortexResult<StatsSet>
where
ScalarRef: From<T>,
{
let typed_buf: &[T] = array.buffer().typed_data();
// TODO(ngates): bail out on empty stats

let bitwidth = std::mem::size_of::<T>() * 8;
let mut bit_widths: Vec<u64> = vec![0; bitwidth + 1];
bit_widths[bitwidth - typed_buf[0].leading_zeros() as usize] += 1;

let mut is_sorted = true;
let mut is_strict_sorted = true;
let mut min = typed_buf[0];
let mut max = typed_buf[0];
let mut last_val = typed_buf[0];
let mut run_count: usize = 0;

for v in &typed_buf[1..] {
bit_widths[bitwidth - v.leading_zeros() as usize] += 1;
if last_val == *v {
is_strict_sorted = false;
} else {
if *v < last_val {
is_sorted = false;
struct StatsAccumulator<T: NativePType> {
prev: T,
min: T,
max: T,
is_sorted: bool,
is_strict_sorted: bool,
run_count: usize,
bit_widths: Vec<usize>,
}

impl<T: NativePType> StatsAccumulator<T> {
fn new(first_value: T) -> Self {
let mut stats = Self {
prev: first_value,
min: first_value,
max: first_value,
is_sorted: true,
is_strict_sorted: true,
run_count: 1,
bit_widths: vec![0; size_of::<T>() * 8 + 1],
};
stats.bit_widths[first_value.bit_width()] += 1;
stats
}

pub fn nullable_next(&mut self, next: Option<T>) {
match next {
Some(n) => self.next(n),
None => {
self.bit_widths[0] += 1;
}
run_count += 1;
}
if *v < min {
min = *v;
} else if *v > max {
max = *v;
}
last_val = *v;
}
run_count += 1;

Ok(StatsSet::from(HashMap::from([
(Stat::Min, min.into()),
(Stat::Max, max.into()),
(Stat::IsConstant, (min == max).into()),
(Stat::BitWidthFreq, ListScalarVec(bit_widths).into()),
(Stat::IsSorted, is_sorted.into()),
(Stat::IsStrictSorted, (is_sorted && is_strict_sorted).into()),
(Stat::RunCount, run_count.into()),
])))
}

fn float_stats<T: ArrowNativeType + NumCast>(array: &PrimitiveArray) -> VortexResult<StatsSet>
where
ScalarRef: From<T>,
{
let typed_buf: &[T] = array.buffer().typed_data();
// TODO: bail out on empty stats

let mut min = typed_buf[0];
let mut max = typed_buf[0];
let mut last_val: T = typed_buf[0];
let mut is_sorted = true;
let mut run_count: usize = 0;
for v in &typed_buf[1..] {
if last_val != *v {
run_count += 1;
if *v < last_val {
is_sorted = false;
pub fn next(&mut self, next: T) {
self.bit_widths[next.bit_width()] += 1;

if self.prev == next {
self.is_strict_sorted = false;
} else {
if next < self.prev {
self.is_sorted = false;
}
self.run_count += 1;
}
if *v < min {
min = *v;
} else if *v > max {
max = *v;
if next < self.min {
self.min = next;
} else if next > self.max {
self.max = next;
}
last_val = *v;
self.prev = next;
}

pub fn into_set(self) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, self.min.into()),
(Stat::Max, self.max.into()),
(Stat::IsConstant, (self.min == self.max).into()),
(Stat::BitWidthFreq, ListScalarVec(self.bit_widths).into()),
(Stat::IsSorted, self.is_sorted.into()),
(
Stat::IsStrictSorted,
(self.is_sorted && self.is_strict_sorted).into(),
),
(Stat::RunCount, self.run_count.into()),
]))
}
run_count += 1;

Ok(StatsSet::from(HashMap::from([
(Stat::Min, min.into()),
(Stat::Max, max.into()),
(Stat::IsConstant, (min == max).into()),
(Stat::IsSorted, is_sorted.into()),
(Stat::RunCount, run_count.into()),
])))
}

#[cfg(test)]
Expand Down Expand Up @@ -188,10 +213,19 @@ mod test {

#[test]
fn stats_u8() {
let arr = PrimitiveArray::from_vec::<u8>(vec![1, 2, 3, 4, 5]);
let arr = PrimitiveArray::from_vec(vec![1u8, 2, 3, 4, 5]);
let min: u8 = arr.stats().get_or_compute_as(&Stat::Min).unwrap();
let max: u8 = arr.stats().get_or_compute_as(&Stat::Max).unwrap();
assert_eq!(min, 1);
assert_eq!(max, 5);
}

#[test]
fn nullable_stats_u8() {
let arr = PrimitiveArray::from_iter(vec![None, Some(1i32), None, Some(2)]);
let min: Option<i32> = arr.stats().get_or_compute_as(&Stat::Min);
let max: Option<i32> = arr.stats().get_or_compute_as(&Stat::Max);
assert_eq!(min, Some(1));
assert_eq!(max, Some(2));
}
}
5 changes: 4 additions & 1 deletion vortex/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use num_traits::NumCast;

use crate::dtype::{DType, FloatWidth, IntWidth, Signedness};
use crate::error::{VortexError, VortexResult};
use crate::scalar::ScalarRef;
use crate::scalar::{PScalar, ScalarRef};

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Hash)]
pub enum PType {
Expand All @@ -31,11 +31,14 @@ pub trait NativePType:
+ Debug
+ Display
+ PartialEq
+ PartialOrd
+ Default
+ ArrowNativeType
+ RefUnwindSafe
+ NumCast
+ Into<ScalarRef>
+ TryFrom<ScalarRef, Error = VortexError>
+ Into<PScalar>
{
const PTYPE: PType;
}
Expand Down
12 changes: 0 additions & 12 deletions vortex/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}

impl<T: NativePType> From<Option<T>> for Box<dyn Scalar>
where
Box<dyn Scalar>: From<T>,
{
fn from(value: Option<T>) -> Self {
match value {
Some(value) => value.into(),
None => Box::new(NullableScalar::None(DType::from(T::PTYPE))),
}
}
}

impl<T: NativePType> AsBytes for [T] {
#[inline]
fn as_bytes(&self) -> &[u8] {
Expand Down
Loading