Skip to content

Commit

Permalink
Include stats in IPC messages (#302)
Browse files Browse the repository at this point in the history
- Modifies the flatbuffer array type to include an ArrayStats table
- Array Table contains optional fields corresponding to each currently
available stats type
  - Current implementation  populates all stats
-  Implementation of the Statistics trait for ArrayView
- Implementation does no allocations or computation, only references
values that exist in the underlying flatbuffer
- For demonstrative purposes, I've [written (and removed) an
implementation](de4077c)
that allocates only if someone calls set(stat, value) to populate
additional, possibly missing, stats. I've removed this because we don't
currently have a use for it, but it's easy enough to do without any
unsafe shenanigans.
- Callers can specify which stats should be included with a serialized
IPC array when constructing a ViewContext. By default, all stats are
included.
- Tests demonstrating the presence of correct stats after a round-trip
through IPC for primitive and chunked arrays


~I've included a mechanism to configure all of the statistics by default
here because the overhead they add to the flatbuffer message is
relatively small, given that the arrays themselves are sufficiently
large. I considered adding a mechanism to check the length of the arrays
[here](https://github.com/spiraldb/vortex/pull/302/files#diff-b7cc44a4bd1e1c769cb029b5ecaa98f080fdb7aa48b79566a9c8bb1306b84149R212)
to choose a subset of stats based on the size of the array (probably
just drop the two frequency arrays, because they're much larger than
everything else), but decided against it for now. I don't think we
expect to frequently see arrays small enough that these stats would add
a relatively significant amount of wire overhead~

---------

Co-authored-by: Nicholas Gates <[email protected]>
  • Loading branch information
jdcasale and gatesn authored May 8, 2024
1 parent c09d2d6 commit 59d4db0
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 13 deletions.
17 changes: 17 additions & 0 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
include "vortex-scalar/flatbuffers/scalar.fbs";

namespace vortex.array;

enum Version: uint8 {
Expand All @@ -9,7 +11,22 @@ table Array {
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
}

table ArrayStats {
min: vortex.scalar.ScalarValue;
max: vortex.scalar.ScalarValue;
is_sorted: bool = null;
is_strict_sorted: bool = null;
is_constant: bool = null;
run_count: uint64 = null;
true_count: uint64 = null;
null_count: uint64 = null;
bit_width_freq: [uint64];
trailing_zero_freq: [uint64];
}


root_type Array;
5 changes: 3 additions & 2 deletions vortex-array/src/array/chunked/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ impl ArrayStatisticsCompute for ChunkedArray<'_> {
s.compute(stat);
s.to_set()
})
.fold(StatsSet::new(), |mut acc, x| {
.reduce(|mut acc, x| {
acc.merge(&x);
acc
}))
})
.unwrap_or_else(StatsSet::new))
}
}
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

pub mod flatbuffers {
pub use gen_array::vortex::*;
pub use generated::vortex::array::*;

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[allow(clippy::all)]
mod gen_array {
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/array.rs"));
}

Expand Down
49 changes: 49 additions & 0 deletions vortex-array/src/stats/flatbuffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex_flatbuffers::WriteFlatBuffer;

use crate::stats::{Stat, Statistics};

impl WriteFlatBuffer for &dyn Statistics {
type Target<'t> = crate::flatbuffers::ArrayStats<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let trailing_zero_freq = self
.get_as::<Vec<u64>>(Stat::TrailingZeroFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let bit_width_freq = self
.get_as::<Vec<u64>>(Stat::BitWidthFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let min = self
.get(Stat::Min)
.map(|min| min.value().write_flatbuffer(fbb));

let max = self
.get(Stat::Max)
.map(|max| max.value().write_flatbuffer(fbb));

let stat_args = &crate::flatbuffers::ArrayStatsArgs {
min,
max,
is_sorted: self.get_as::<bool>(Stat::IsSorted).ok(),
is_strict_sorted: self.get_as::<bool>(Stat::IsStrictSorted).ok(),
is_constant: self.get_as::<bool>(Stat::IsConstant).ok(),
run_count: self.get_as_cast::<u64>(Stat::RunCount).ok(),
true_count: self.get_as_cast::<u64>(Stat::TrueCount).ok(),
null_count: self.get_as_cast::<u64>(Stat::NullCount).ok(),
bit_width_freq,
trailing_zero_freq,
};

crate::flatbuffers::ArrayStats::create(fbb, stat_args)
}
}
11 changes: 11 additions & 0 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use vortex_dtype::{DType, NativePType};
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_scalar::Scalar;

pub mod flatbuffers;
mod statsset;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence)]
Expand Down Expand Up @@ -93,6 +94,16 @@ impl dyn Statistics + '_ {
.and_then(|s| U::try_from(&s))
}

pub fn get_as_cast<U: NativePType + for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
&self,
stat: Stat,
) -> VortexResult<U> {
self.get(stat)
.ok_or_else(|| vortex_err!(ComputeError: "statistic {} missing", stat))
.and_then(|s| s.cast(&DType::Primitive(U::PTYPE, NonNullable)))
.and_then(|s| U::try_from(&s))
}

pub fn compute_as<U: for<'a> TryFrom<&'a Scalar, Error = VortexError>>(
&self,
stat: Stat,
Expand Down
89 changes: 83 additions & 6 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use log::warn;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::{PValue, Scalar, ScalarValue};

use crate::encoding::{EncodingId, EncodingRef};
use crate::flatbuffers::array as fb;
use crate::stats::{EmptyStatistics, Statistics};
use crate::flatbuffers as fb;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::Context;
use crate::{Array, IntoArray, ToArray};

Expand Down Expand Up @@ -53,7 +56,6 @@ impl<'v> ArrayView<'v> {
Self::cumulative_nbuffers(array)
)
}

let view = Self {
encoding,
dtype,
Expand Down Expand Up @@ -136,8 +138,83 @@ impl<'v> ArrayView<'v> {
}

pub fn statistics(&self) -> &dyn Statistics {
// TODO(ngates): store statistics in FlatBuffers
&EmptyStatistics
self
}
}

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.array.stats()?.max();
max.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Min => {
let min = self.array.stats()?.min();
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::IsConstant => self.array.stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.array.stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self.array.stats()?.is_strict_sorted().map(bool::into),
Stat::RunCount => self.array.stats()?.run_count().map(u64::into),
Stat::TrueCount => self.array.stats()?.true_count().map(u64::into),
Stat::NullCount => self.array.stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => self
.array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.map(|v| ScalarValue::Primitive(PValue::U64(v)))
.collect_vec()
})
.map(|v| {
Scalar::list(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
v,
)
}),
Stat::TrailingZeroFreq => self
.array
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().collect_vec())
.map(|v| v.into()),
}
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
/// In other implementations, this means returning the underlying stats map, but for the flatbuffer
/// implemetation, we have 'precalculated' stats in the flatbuffer itself, so we need to
/// alllocate a stats map and populate it with those fields.
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::new();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}

/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
warn!("Cannot write stats to a view")
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}

self.to_array()
.with_dyn(|a| a.compute_statistics(stat))
.ok()?
.get(stat)
.cloned()
}
}

Expand Down
8 changes: 7 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ pub mod flatbuffers {

mod deps {
pub mod array {
pub use vortex::flatbuffers::array;
pub use vortex::flatbuffers as array;
}

pub mod dtype {
pub use vortex_dtype::flatbuffers as dtype;
}

pub mod scalar {
#[allow(unused_imports)]
pub use vortex_scalar::flatbuffers as scalar;
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers::array as fba;
use vortex::flatbuffers as fba;
use vortex::{ArrayData, Context, ViewContext};
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError};
Expand All @@ -17,11 +17,15 @@ pub(crate) enum IPCMessage<'a> {
}

pub(crate) struct IPCContext<'a>(pub &'a ViewContext);

pub(crate) struct IPCSchema<'a>(pub &'a DType);

pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData);

pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);

impl FlatBufferRoot for IPCMessage<'_> {}

impl WriteFlatBuffer for IPCMessage<'_> {
type Target<'a> = fb::Message<'a>;

Expand Down Expand Up @@ -186,13 +190,16 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
.collect_vec();
let children = Some(fbb.create_vector(&children));

let stats = Some(self.1.statistics().write_flatbuffer(fbb));

fba::Array::create(
fbb,
&fba::ArrayArgs {
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
encoding,
metadata,
stats,
children,
},
)
Expand Down
Loading

0 comments on commit 59d4db0

Please sign in to comment.