Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include stats in IPC messages #302

Merged
merged 20 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/fastlanez
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 17 additions & 0 deletions vortex-array/flatbuffers/array.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
include "vortex-scalar/flatbuffers/scalar.fbs";

namespace vortex.array;

enum Version: uint8 {
Expand All @@ -9,7 +11,22 @@ table Array {
has_buffer: bool;
encoding: uint16;
metadata: [ubyte];
stats: ArrayStats;
children: [Array];
}

table ArrayStats {
min: vortex.scalar.Scalar;
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
max: vortex.scalar.Scalar;
is_sorted: vortex.scalar.Scalar;
is_strict_sorted: vortex.scalar.Scalar;
is_constant: vortex.scalar.Scalar;
run_count: vortex.scalar.Scalar;
true_count: vortex.scalar.Scalar;
null_count: vortex.scalar.Scalar;
bit_width_freq: [vortex.scalar.Scalar];
trailing_zero_freq: [vortex.scalar.Scalar];
}


root_type Array;
14 changes: 13 additions & 1 deletion vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::array::primitive::PrimitiveArray;
use crate::compute::scalar_at::scalar_at;
use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn};
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::stats::ArrayStatistics;
use crate::validity::Validity::NonNullable;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
Expand Down Expand Up @@ -46,7 +47,18 @@ impl ChunkedArray<'_> {
let mut children = vec![chunk_ends.into_array_data()];
children.extend(chunks.iter().map(|a| a.to_array_data()));

Self::try_from_parts(dtype, ChunkedMetadata, children.into(), StatsSet::new())
// NB: this reports whatever stats we can correctly deduce for the top-level logical array
// If we'd prefer not to report stats for non-primitive arrays we can remove this
let merged_stats = chunks
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(|chunk| chunk.statistics().to_set())
.reduce(|mut a, b| {
a.merge(&b);
a
})
.unwrap_or_else(StatsSet::new);

Self::try_from_parts(dtype, ChunkedMetadata, children.into(), merged_stats)
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ use crate::validity::ArrayValidity;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};

pub mod flatbuffers {
pub use gen_array::vortex::*;
pub use generated::vortex::array::*;

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(non_camel_case_types)]
#[allow(clippy::all)]
mod gen_array {
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/array.rs"));
}

Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ pub trait Statistics {
/// Computes the value of the stat if it's not present
fn compute(&self, stat: Stat) -> Option<Scalar>;

/// Applies the given function to the statistic if it's present
fn with_stat_value<'a>(
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()>;

/// Applies the given function to the statistic, computing it if it's not already present
fn with_computed_stat_value<'a>(
&self,
stat: Stat,
Expand Down
207 changes: 201 additions & 6 deletions vortex-array/src/view.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::fmt::{Debug, Formatter};

use enum_iterator::all;
use itertools::Itertools;
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_dtype::flatbuffers::PType;
use vortex_dtype::half::f16;
use vortex_dtype::{DType, Nullability};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_scalar::flatbuffers::Primitive;
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
use vortex_scalar::Scalar::List;
use vortex_scalar::{ListScalar, Scalar};

use crate::encoding::{EncodingId, EncodingRef};
use crate::flatbuffers::array as fb;
use crate::stats::{EmptyStatistics, Statistics};
use crate::flatbuffers as fb;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::Context;
use crate::{Array, IntoArray, ToArray};

Expand Down Expand Up @@ -53,7 +59,6 @@ impl<'v> ArrayView<'v> {
Self::cumulative_nbuffers(array)
)
}

let view = Self {
encoding,
dtype,
Expand Down Expand Up @@ -136,8 +141,198 @@ impl<'v> ArrayView<'v> {
}

pub fn statistics(&self) -> &dyn Statistics {
// TODO(ngates): store statistics in FlatBuffers
&EmptyStatistics
self
}
}

impl Statistics for ArrayView<'_> {
fn get(&self, stat: Stat) -> Option<Scalar> {
// fb fetch is just a pointer dereference, so we check that first
let from_fb = get_from_flatbuffer_array(self.array, stat);
if from_fb.is_some() {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
return from_fb;
}
None
}

/// NB: part of the contract for to_set is that it does not do any expensive computation.
/// In other implementations, this means returning the underlying stats map, but for the flatbuffer
/// implemetation, we have 'precalculated' stats in the flatbuffer itself, so we need to
/// alllocate a stats map and populate it with those fields.
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::new();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}

/// We want to avoid any sort of allocation on instantiation of the ArrayView, so we
/// do not allocate a stats_set to cache values.
fn set(&self, _stat: Stat, _value: Scalar) {
unimplemented!()
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
}

fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}

let calculated = self
.to_array()
.with_dyn(|a| a.compute_statistics(stat))
.ok()?;

calculated.into_iter().for_each(|(k, v)| self.set(k, v));
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
self.get(stat)
}

fn with_stat_value<'a>(
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()> {
if let Some(existing) = self.get(stat) {
return f(&existing);
}
vortex_bail!(ComputeError: "statistic {} missing", stat);
}

fn with_computed_stat_value<'a>(
&self,
stat: Stat,
f: &'a mut dyn FnMut(&Scalar) -> VortexResult<()>,
) -> VortexResult<()> {
self.compute(stat)
.map(|s| f(&s))
.unwrap_or_else(|| vortex_bail!(ComputeError: "statistic {} missing", stat))
}
}

fn get_from_flatbuffer_array(array: fb::Array<'_>, stat: Stat) -> Option<Scalar> {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
match stat {
Stat::IsConstant => {
let is_constant = array.stats()?.is_constant();
is_constant
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into())
}
Stat::IsSorted => array
.stats()?
.is_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::IsStrictSorted => array
.stats()?
.is_strict_sorted()
.and_then(|v| v.type__as_bool())
.map(|v| v.value().into()),
Stat::Max => {
let max = array.stats()?.max();
max.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::Min => {
let min = array.stats()?.min();
min.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::RunCount => {
let rc = array.stats()?.run_count();
rc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::TrueCount => {
let tc = array.stats()?.true_count();
tc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::NullCount => {
let nc = array.stats()?.null_count();
nc.and_then(|v| v.type__as_primitive())
.and_then(primitive_to_scalar)
}
Stat::BitWidthFreq => array
.stats()?
.bit_width_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
Stat::TrailingZeroFreq => array
.stats()?
.trailing_zero_freq()
.map(|v| {
v.iter()
.flat_map(|v| {
primitive_to_scalar(
v.type__as_primitive()
.expect("Should only ever produce primitives"),
)
})
.collect_vec()
})
.map(|v| {
List(ListScalar::new(
DType::Primitive(vortex_dtype::PType::U64, Nullability::NonNullable),
Some(v),
))
}),
}
}

// TODO(@jcasale): move this to serde and make serde crate public?
fn primitive_to_scalar(v: Primitive) -> Option<Scalar> {
jdcasale marked this conversation as resolved.
Show resolved Hide resolved
let err_msg = "failed to deserialize invalid primitive scalar";
match v.ptype() {
PType::U8 => v
.bytes()
.map(|bytes| u8::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::U16 => v
.bytes()
.map(|bytes| u16::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::U32 => v
.bytes()
.map(|bytes| u32::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::U64 => v
.bytes()
.map(|bytes| u64::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::I8 => v
.bytes()
.map(|bytes| i8::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::I16 => v
.bytes()
.map(|bytes| i16::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::I32 => v
.bytes()
.map(|bytes| i32::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::I64 => v
.bytes()
.map(|bytes| i64::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::F16 => v
.bytes()
.map(|bytes| f16::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::F32 => v
.bytes()
.map(|bytes| f32::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
PType::F64 => v
.bytes()
.map(|bytes| f64::from_le_bytes(bytes.bytes().try_into().expect(err_msg)).into()),
_ => unreachable!(),
}
}

Expand Down
8 changes: 7 additions & 1 deletion vortex-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ pub mod flatbuffers {

mod deps {
pub mod array {
pub use vortex::flatbuffers::array;
pub use vortex::flatbuffers as array;
}

pub mod dtype {
pub use vortex_dtype::flatbuffers as dtype;
}

pub mod scalar {
#[allow(unused_imports)]
pub use vortex_scalar::flatbuffers as scalar;
}
}
}

Expand Down
Loading