Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stats in IPC messages #302

Merged
merged 20 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
include "vortex-scalar/flatbuffers/scalar.fbs";

namespace vortex.array;

enum Version: uint8 {
Expand All @@ -9,7 +11,22 @@ table Array {
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
}

table ArrayStats {
min: vortex.scalar.ScalarValue;
max: vortex.scalar.ScalarValue;
is_sorted: bool = null;
is_strict_sorted: bool = null;
is_constant: bool = null;
run_count: uint64 = null;
true_count: uint64 = null;
null_count: uint64 = null;
bit_width_freq: [uint64];
trailing_zero_freq: [uint64];
}


root_type Array;
5 changes: 3 additions & 2 deletions vortex-array/src/array/chunked/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ impl ArrayStatisticsCompute for ChunkedArray<'_> {
s.compute(stat);
s.to_set()
})
.fold(StatsSet::new(), |mut acc, x| {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
.reduce(|mut acc, x| {
acc.merge(&x);
acc
}))
})
.unwrap_or_else(StatsSet::new))
}
}
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

pub mod flatbuffers {
pub use gen_array::vortex::*;
pub use generated::vortex::array::*;

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[allow(clippy::all)]
mod gen_array {
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/array.rs"));
}

Expand Down
49 changes: 49 additions & 0 deletions vortex-array/src/stats/flatbuffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex_flatbuffers::WriteFlatBuffer;

use crate::stats::{Stat, Statistics};

impl WriteFlatBuffer for &dyn Statistics {
type Target<'t> = crate::flatbuffers::ArrayStats<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let trailing_zero_freq = self
.get_as::<Vec<u64>>(Stat::TrailingZeroFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let bit_width_freq = self
.get_as::<Vec<u64>>(Stat::BitWidthFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let min = self
.get(Stat::Min)
.map(|min| min.value().write_flatbuffer(fbb));

let max = self
.get(Stat::Max)
.map(|max| max.value().write_flatbuffer(fbb));

let stat_args = &crate::flatbuffers::ArrayStatsArgs {
min,
max,
is_sorted: self.get_as::<bool>(Stat::IsSorted).ok(),
is_strict_sorted: self.get_as::<bool>(Stat::IsStrictSorted).ok(),
is_constant: self.get_as::<bool>(Stat::IsConstant).ok(),
run_count: self.get_as_cast::<u64>(Stat::RunCount).ok(),
true_count: self.get_as_cast::<u64>(Stat::TrueCount).ok(),
null_count: self.get_as_cast::<u64>(Stat::NullCount).ok(),
bit_width_freq,
trailing_zero_freq,
};

crate::flatbuffers::ArrayStats::create(fbb, stat_args)
}
}
11 changes: 11 additions & 0 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use vortex_dtype::{DType, NativePType};
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_scalar::Scalar;

pub mod flatbuffers;
mod statsset;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence)]
Expand Down Expand Up @@ -93,6 +94,16 @@ impl dyn Statistics + '_ {
.and_then(|s| U::try_from(&s))
}

pub fn get_as_cast<U: NativePType + for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
&self,
stat: Stat,
) -> VortexResult<U> {
self.get(stat)
.ok_or_else(|| vortex_err!(ComputeError: "statistic {} missing", stat))
.and_then(|s| s.cast(&DType::Primitive(U::PTYPE, NonNullable)))
.and_then(|s| U::try_from(&s))
}

pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
&self,
stat: Stat,
Expand Down
89 changes: 83 additions & 6 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use log::warn;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::{PValue, Scalar, ScalarValue};

use crate::encoding::{EncodingId, EncodingRef};
use crate::flatbuffers::array as fb;
use crate::stats::{EmptyStatistics, Statistics};
use crate::flatbuffers as fb;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::Context;
use crate::{Array, IntoArray, ToArray};

Expand Down Expand Up @@ -53,7 +56,6 @@ impl<'v> ArrayView<'v> {
Self::cumulative_nbuffers(array)
)
}

let view = Self {
encoding,
dtype,
Expand Down Expand Up @@ -136,8 +138,83 @@ impl<'v> ArrayView<'v> {
}

pub fn statistics(&self) -> &dyn Statistics {
// TODO(ngates): store statistics in FlatBuffers
&EmptyStatistics
self
}
}

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.array.stats()?.max();
max.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Min => {
let min = self.array.stats()?.min();
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::IsConstant => self.array.stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.array.stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self.array.stats()?.is_strict_sorted().map(bool::into),
Stat::RunCount => self.array.stats()?.run_count().map(u64::into),
Stat::TrueCount => self.array.stats()?.true_count().map(u64::into),
Stat::NullCount => self.array.stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => self
.array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.map(|v| ScalarValue::Primitive(PValue::U64(v)))
.collect_vec()
})
.map(|v| {
Scalar::list(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
v,
)
}),
Stat::TrailingZeroFreq => self
.array
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().collect_vec())
.map(|v| v.into()),
}
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
/// In other implementations, this means returning the underlying stats map, but for the flatbuffer
/// implemetation, we have 'precalculated' stats in the flatbuffer itself, so we need to
/// alllocate a stats map and populate it with those fields.
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::new();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}

/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
warn!("Cannot write stats to a view")
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}

self.to_array()
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
.with_dyn(|a| a.compute_statistics(stat))
.ok()?
.get(stat)
.cloned()
}
}

Expand Down
8 changes: 7 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ pub mod flatbuffers {

mod deps {
pub mod array {
pub use vortex::flatbuffers::array;
pub use vortex::flatbuffers as array;
}

pub mod dtype {
pub use vortex_dtype::flatbuffers as dtype;
}

pub mod scalar {
#[allow(unused_imports)]
pub use vortex_scalar::flatbuffers as scalar;
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers::array as fba;
use vortex::flatbuffers as fba;
use vortex::{ArrayData, Context, ViewContext};
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError};
Expand All @@ -17,11 +17,15 @@ pub(crate) enum IPCMessage<'a> {
}

pub(crate) struct IPCContext<'a>(pub &'a ViewContext);

pub(crate) struct IPCSchema<'a>(pub &'a DType);

pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData);

pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);

impl FlatBufferRoot for IPCMessage<'_> {}

impl WriteFlatBuffer for IPCMessage<'_> {
type Target<'a> = fb::Message<'a>;

Expand Down Expand Up @@ -186,13 +190,16 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
.collect_vec();
let children = Some(fbb.create_vector(&children));

let stats = Some(self.1.statistics().write_flatbuffer(fbb));

fba::Array::create(
fbb,
&fba::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
encoding,
metadata,
stats,
children,
},
)
Expand Down
Loading