Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stats in IPC messages #302

Merged
merged 20 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/fastlanez
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 17 additions & 0 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
include "vortex-scalar/flatbuffers/scalar.fbs";

namespace vortex.array;

enum Version: uint8 {
Expand All @@ -9,7 +11,22 @@ table Array {
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
}

table ArrayStats {
min: vortex.scalar.Scalar;
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
max: vortex.scalar.Scalar;
is_sorted: bool = null;
is_strict_sorted: bool = null;
is_constant: bool = null;
run_count: uint64 = null;
true_count: uint64 = null;
null_count: uint64 = null;
bit_width_freq: [uint64];
trailing_zero_freq: [uint64];
}


root_type Array;
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

pub mod flatbuffers {
pub use gen_array::vortex::*;
pub use generated::vortex::array::*;

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[allow(clippy::all)]
mod gen_array {
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/array.rs"));
}

Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ pub trait Statistics {
/// Computes the value of the stat if it's not present
fn compute(&self, stat: Stat) -> Option<Scalar>;

/// Applies the given function to the statistic if it's present
fn with_stat_value<'a>(
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()>;

/// Applies the given function to the statistic, computing it if it's not already present
fn with_computed_stat_value<'a>(
&self,
stat: Stat,
Expand Down
149 changes: 140 additions & 9 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use log::info;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_flatbuffers::ReadFlatBuffer;
use vortex_scalar::ListScalar;
use vortex_scalar::Scalar;
use vortex_scalar::Scalar::List;

use crate::encoding::{EncodingId, EncodingRef};
use crate::flatbuffers::array as fb;
use crate::stats::{EmptyStatistics, Statistics};
use crate::flatbuffers as fb;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::Context;
use crate::{Array, IntoArray, ToArray};

Expand Down Expand Up @@ -53,7 +59,6 @@ impl<'v> ArrayView<'v> {
Self::cumulative_nbuffers(array)
)
}

let view = Self {
encoding,
dtype,
Expand Down Expand Up @@ -136,8 +141,109 @@ impl<'v> ArrayView<'v> {
}

pub fn statistics(&self) -> &dyn Statistics {
// TODO(ngates): store statistics in FlatBuffers
&EmptyStatistics
self
}
}

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.array.stats()?.max();
max.as_ref()
.map(Scalar::read_flatbuffer)
.and_then(Result::ok)
}
Stat::Min => {
let min = self.array.stats()?.min();
min.as_ref()
.map(Scalar::read_flatbuffer)
.and_then(Result::ok)
}
Stat::IsConstant => self.array.stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.array.stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self.array.stats()?.is_strict_sorted().map(bool::into),
Stat::RunCount => self.array.stats()?.run_count().map(u64::into),
Stat::TrueCount => self.array.stats()?.true_count().map(u64::into),
Stat::NullCount => self.array.stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => self
.array
.stats()?
.bit_width_freq()
.map(|v| v.iter().map(u64::into).collect_vec())
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
Stat::TrailingZeroFreq => self
.array
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().map(u64::into).collect_vec())
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
}
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
/// In other implementations, this means returning the underlying stats map, but for the flatbuffer
/// implemetation, we have 'precalculated' stats in the flatbuffer itself, so we need to
/// alllocate a stats map and populate it with those fields.
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::new();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}

/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
info!("Cannot write stats to a view")
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}

let calculated = self
.to_array()
.with_dyn(|a| a.compute_statistics(stat))
.ok()?;

calculated.into_iter().for_each(|(k, v)| self.set(k, v));
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
self.get(stat)
}

fn with_stat_value<'a>(
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()> {
if let Some(existing) = self.get(stat) {
return f(&existing);
}
vortex_bail!(ComputeError: "statistic {} missing", stat);
}

fn with_computed_stat_value<'a>(
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()> {
self.compute(stat)
.map(|s| f(&s))
.unwrap_or_else(|| vortex_bail!(ComputeError: "statistic {} missing", stat))
}
}

Expand All @@ -156,11 +262,36 @@ impl<'v> IntoArray<'v> for ArrayView<'v> {
#[derive(Debug)]
pub struct ViewContext {
encodings: Vec<EncodingRef>,
stats: Vec<Stat>,
}

impl ViewContext {
pub fn new(encodings: Vec<EncodingRef>) -> Self {
Self { encodings }
pub fn new(encodings: Vec<EncodingRef>, stats: Vec<Stat>) -> Self {
Self { encodings, stats }
}

pub fn set_stats(&mut self, to_enable: &[Stat]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this set_stats method so that we can default to including stats but allow a caller to smash over this with any combination of stats they want.

The default mechanism is not implemented because there is an open question as to what the right set of default encodings is. I think using default stats in the from() method below is reasonable behavior, given that it can be modified afterwards.

self.stats.clear();
self.stats.extend(to_enable)
}

pub fn stats(&self) -> &[Stat] {
self.stats.as_ref()
}

pub fn default_stats() -> Vec<Stat> {
vec![
Stat::Max,
Stat::Min,
Stat::IsSorted,
Stat::IsStrictSorted,
Stat::IsConstant,
Stat::BitWidthFreq,
Stat::TrailingZeroFreq,
Stat::NullCount,
Stat::RunCount,
Stat::TrueCount,
]
}

pub fn encodings(&self) -> &[EncodingRef] {
Expand All @@ -187,6 +318,6 @@ impl Default for ViewContext {

impl From<&Context> for ViewContext {
fn from(value: &Context) -> Self {
ViewContext::new(value.encodings().collect_vec())
ViewContext::new(value.encodings().collect_vec(), Self::default_stats())
}
}
8 changes: 7 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ pub mod flatbuffers {

mod deps {
pub mod array {
pub use vortex::flatbuffers::array;
pub use vortex::flatbuffers as array;
}

pub mod dtype {
pub use vortex_dtype::flatbuffers as dtype;
}

pub mod scalar {
#[allow(unused_imports)]
pub use vortex_scalar::flatbuffers as scalar;
}
}
}

Expand Down
Loading
Loading