Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stats in IPC messages #302

Merged
merged 20 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
include "vortex-scalar/flatbuffers/scalar.fbs";

namespace vortex.array;

enum Version: uint8 {
Expand All @@ -9,7 +11,22 @@ table Array {
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
}

table ArrayStats {
min: vortex.scalar.ScalarValue;
max: vortex.scalar.ScalarValue;
is_sorted: bool = null;
is_strict_sorted: bool = null;
is_constant: bool = null;
run_count: uint64 = null;
true_count: uint64 = null;
null_count: uint64 = null;
bit_width_freq: [uint64];
trailing_zero_freq: [uint64];
}


root_type Array;
5 changes: 3 additions & 2 deletions vortex-array/src/array/chunked/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ impl ArrayStatisticsCompute for ChunkedArray<'_> {
s.compute(stat);
s.to_set()
})
.fold(StatsSet::new(), |mut acc, x| {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
.reduce(|mut acc, x| {
acc.merge(&x);
acc
}))
})
.unwrap_or_else(StatsSet::new))
}
}
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

pub mod flatbuffers {
pub use gen_array::vortex::*;
pub use generated::vortex::array::*;

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[allow(clippy::all)]
mod gen_array {
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/array.rs"));
}

Expand Down
89 changes: 83 additions & 6 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use log::warn;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::{PValue, Scalar, ScalarValue};

use crate::encoding::{EncodingId, EncodingRef};
use crate::flatbuffers::array as fb;
use crate::stats::{EmptyStatistics, Statistics};
use crate::flatbuffers as fb;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::Context;
use crate::{Array, IntoArray, ToArray};

Expand Down Expand Up @@ -53,7 +56,6 @@ impl<'v> ArrayView<'v> {
Self::cumulative_nbuffers(array)
)
}

let view = Self {
encoding,
dtype,
Expand Down Expand Up @@ -136,8 +138,83 @@ impl<'v> ArrayView<'v> {
}

pub fn statistics(&self) -> &dyn Statistics {
// TODO(ngates): store statistics in FlatBuffers
&EmptyStatistics
self
}
}

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.array.stats()?.max();
max.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Min => {
let min = self.array.stats()?.min();
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::IsConstant => self.array.stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.array.stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self.array.stats()?.is_strict_sorted().map(bool::into),
Stat::RunCount => self.array.stats()?.run_count().map(u64::into),
Stat::TrueCount => self.array.stats()?.true_count().map(u64::into),
Stat::NullCount => self.array.stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => self
.array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.map(|v| ScalarValue::Primitive(PValue::U64(v)))
.collect_vec()
})
.map(|v| {
Scalar::list(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
v,
)
}),
Stat::TrailingZeroFreq => self
.array
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().collect_vec())
.map(|v| v.into()),
}
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
/// In other implementations, this means returning the underlying stats map, but for the flatbuffer
/// implemetation, we have 'precalculated' stats in the flatbuffer itself, so we need to
/// alllocate a stats map and populate it with those fields.
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::new();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}

/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
warn!("Cannot write stats to a view")
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}

self.to_array()
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
.with_dyn(|a| a.compute_statistics(stat))
.ok()?;

self.get(stat)
}
}

Expand Down
8 changes: 7 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ pub mod flatbuffers {

mod deps {
pub mod array {
pub use vortex::flatbuffers::array;
pub use vortex::flatbuffers as array;
}

pub mod dtype {
pub use vortex_dtype::flatbuffers as dtype;
}

pub mod scalar {
#[allow(unused_imports)]
pub use vortex_scalar::flatbuffers as scalar;
}
}
}

Expand Down
61 changes: 57 additions & 4 deletions vortex-ipc/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex::flatbuffers::array as fba;
use vortex::flatbuffers as vfb;
use vortex::stats::Stat;
use vortex::{ArrayData, Context, ViewContext};
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError};
Expand All @@ -17,11 +18,15 @@ pub(crate) enum IPCMessage<'a> {
}

pub(crate) struct IPCContext<'a>(pub &'a ViewContext);

pub(crate) struct IPCSchema<'a>(pub &'a DType);

pub(crate) struct IPCChunk<'a>(pub &'a ViewContext, pub &'a ArrayData);

pub(crate) struct IPCArray<'a>(pub &'a ViewContext, pub &'a ArrayData);

impl FlatBufferRoot for IPCMessage<'_> {}

impl WriteFlatBuffer for IPCMessage<'_> {
type Target<'a> = fb::Message<'a>;

Expand Down Expand Up @@ -154,7 +159,7 @@ impl<'a> WriteFlatBuffer for IPCChunk<'a> {
}

impl<'a> WriteFlatBuffer for IPCArray<'a> {
type Target<'t> = fba::Array<'t>;
type Target<'t> = vfb::Array<'t>;

fn write_flatbuffer<'fb>(
&self,
Expand Down Expand Up @@ -186,15 +191,63 @@ impl<'a> WriteFlatBuffer for IPCArray<'a> {
.collect_vec();
let children = Some(fbb.create_vector(&children));

fba::Array::create(
let stats = collect_array_stats(fbb, self.1);

vfb::Array::create(
fbb,
&fba::ArrayArgs {
&vfb::ArrayArgs {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
version: Default::default(),
has_buffer: column_data.buffer().is_some(),
encoding,
metadata,
stats: Some(stats),
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
children,
},
)
}
}

/// Computes all stats and uses the results to create an ArrayStats table for the flatbuffer message
fn collect_array_stats<'a>(
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
fbb: &'_ mut FlatBufferBuilder<'a>,
array: &'_ ArrayData,
) -> WIPOffset<vfb::ArrayStats<'a>> {
let trailing_zero_freq = array
.statistics()
.get_as::<Vec<u64>>(Stat::TrailingZeroFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let bit_width_freq = array
.statistics()
.get_as::<Vec<u64>>(Stat::BitWidthFreq)
.ok()
.map(|v| v.iter().copied().collect_vec())
.map(|v| fbb.create_vector(v.as_slice()));

let min = array
.statistics()
.get(Stat::Min)
.map(|min| min.value().write_flatbuffer(fbb));

let max = array
.statistics()
.get(Stat::Max)
.map(|max| max.value().write_flatbuffer(fbb));

let stat_args = &vfb::ArrayStatsArgs {
min,
max,
is_sorted: array.statistics().get_as::<bool>(Stat::IsSorted).ok(),
is_strict_sorted: array.statistics().get_as::<bool>(Stat::IsStrictSorted).ok(),
is_constant: array.statistics().get_as::<bool>(Stat::IsConstant).ok(),
run_count: array.statistics().get_as::<u64>(Stat::RunCount).ok(),
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
true_count: array.statistics().get_as::<u64>(Stat::TrueCount).ok(),
null_count: array.statistics().get_as::<u64>(Stat::NullCount).ok(),
bit_width_freq,
trailing_zero_freq,
};

vfb::ArrayStats::create(fbb, stat_args)
}
Loading