Skip to content

Commit

Permalink
Store present stats as a bitset in metadata of chunked layout and rem…
Browse files Browse the repository at this point in the history
…ove inline dtype layout (#1555)
  • Loading branch information
robert3005 authored Dec 4, 2024
1 parent a017de9 commit b5127b8
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 257 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ humansize = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
num_enum = { workspace = true }
paste = { workspace = true }
pin-project = { workspace = true }
rand = { workspace = true }
Expand Down
59 changes: 56 additions & 3 deletions vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;

use enum_iterator::Sequence;
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, MutableBuffer};
use enum_iterator::{cardinality, Sequence};
use enum_map::Enum;
use itertools::Itertools;
use num_enum::{IntoPrimitive, TryFromPrimitive};
pub use statsset::*;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, NativePType};
use vortex_dtype::{DType, NativePType, PType};
use vortex_error::{vortex_err, vortex_panic, VortexError, VortexResult};
use vortex_scalar::Scalar;

Expand All @@ -21,7 +24,10 @@ mod statsset;
/// Statistics that are used for pruning files (i.e., we want to ensure they are computed when compressing/writing).
pub const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::TrueCount, Stat::NullCount];

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, Enum)]
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, Enum, IntoPrimitive, TryFromPrimitive,
)]
#[repr(u8)]
pub enum Stat {
/// Frequency of each bit width (nulls are treated as 0)
BitWidthFreq,
Expand Down Expand Up @@ -69,6 +75,53 @@ impl Stat {
pub fn has_same_dtype_as_array(&self) -> bool {
matches!(self, Stat::Min | Stat::Max)
}

pub fn dtype(&self, data_type: &DType) -> DType {
match self {
Stat::BitWidthFreq => DType::List(
Arc::new(DType::Primitive(PType::U64, NonNullable)),
NonNullable,
),
Stat::TrailingZeroFreq => DType::List(
Arc::new(DType::Primitive(PType::U64, NonNullable)),
NonNullable,
),
Stat::IsConstant => DType::Bool(NonNullable),
Stat::IsSorted => DType::Bool(NonNullable),
Stat::IsStrictSorted => DType::Bool(NonNullable),
Stat::Max => data_type.clone(),
Stat::Min => data_type.clone(),
Stat::RunCount => DType::Primitive(PType::U64, NonNullable),
Stat::TrueCount => DType::Primitive(PType::U64, NonNullable),
Stat::NullCount => DType::Primitive(PType::U64, NonNullable),
Stat::UncompressedSizeInBytes => DType::Primitive(PType::U64, NonNullable),
}
}
}

pub fn as_stat_bitset_bytes(stats: &[Stat]) -> Vec<u8> {
let stat_count = cardinality::<Stat>();
let mut stat_bitset = BooleanBufferBuilder::new_from_buffer(
MutableBuffer::from_len_zeroed(stat_count.div_ceil(8)),
stat_count,
);
for stat in stats {
stat_bitset.set_bit(u8::from(*stat) as usize, true);
}

stat_bitset
.finish()
.into_inner()
.into_vec()
.unwrap_or_else(|b| b.to_vec())
}

pub fn stats_from_bitset_bytes(bytes: &[u8]) -> Vec<Stat> {
BooleanBuffer::new(Buffer::from(bytes), 0, bytes.len() * 8)
.set_indices()
// Filter out indices failing conversion, these are stats written by newer version of library
.filter_map(|i| Stat::try_from(i as u8).ok())
.collect::<Vec<_>>()
}

impl Display for Stat {
Expand Down
3 changes: 0 additions & 3 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ mod forever_constant {
pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2);
/// The layout ID for a column layout
pub const COLUMNAR_LAYOUT_ID: LayoutId = LayoutId(3);
/// The layout ID for an inline schema layout
pub const INLINE_SCHEMA_LAYOUT_ID: LayoutId = LayoutId(4);

#[cfg(test)]
mod test {
Expand All @@ -187,7 +185,6 @@ mod forever_constant {
assert_eq!(FLAT_LAYOUT_ID, LayoutId(1));
assert_eq!(CHUNKED_LAYOUT_ID, LayoutId(2));
assert_eq!(COLUMNAR_LAYOUT_ID, LayoutId(3));
assert_eq!(INLINE_SCHEMA_LAYOUT_ID, LayoutId(4));
}
}
}
Expand Down
15 changes: 5 additions & 10 deletions vortex-file/src/read/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vortex_array::Context;
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::footer as fb;

use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout, InlineDTypeLayout};
use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout};
use crate::{LayoutReader, RelativeLayoutCache, Scan};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -52,15 +52,10 @@ impl LayoutContext {
impl Default for LayoutContext {
fn default() -> Self {
Self::new(
[
&ColumnarLayout as LayoutRef,
&ChunkedLayout,
&InlineDTypeLayout,
&FlatLayout,
]
.into_iter()
.map(|l| (l.id(), l))
.collect(),
[&ColumnarLayout as LayoutRef, &ChunkedLayout, &FlatLayout]
.into_iter()
.map(|l| (l.id(), l))
.collect(),
)
}
}
Expand Down
55 changes: 36 additions & 19 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::collections::BTreeSet;
use std::sync::{OnceLock, RwLock};
use std::sync::{Arc, OnceLock, RwLock};

use bytes::Bytes;
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
use vortex_array::array::ChunkedArray;
use vortex_array::compute::{scalar_at, take, TakeOptions};
use vortex_array::stats::{ArrayStatistics as _, Stat};
use vortex_array::stats::{stats_from_bitset_bytes, ArrayStatistics as _, Stat};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult};
use vortex_expr::Select;
use vortex_flatbuffers::footer;

use crate::layouts::RangedLayoutReader;
use crate::pruning::PruningPredicate;
use crate::read::cache::RelativeLayoutCache;
use crate::read::mask::RowMask;
use crate::{
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator,
MetadataRead, PruningRead, Scan, CHUNKED_LAYOUT_ID,
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LazyDType,
MessageLocator, MetadataRead, PruningRead, Scan, CHUNKED_LAYOUT_ID,
};

#[derive(Default, Debug)]
Expand Down Expand Up @@ -76,38 +78,44 @@ impl ChunkedLayoutBuilder {
}

fn metadata_layout(&self) -> VortexResult<Option<Box<dyn LayoutReader>>> {
self.has_metadata()
.then(|| {
self.flatbuffer()
.metadata()
.map(|m| {
let set_stats = stats_from_bitset_bytes(m.bytes());
let metadata_fb = self
.flatbuffer()
.children()
.ok_or_else(|| vortex_err!("must have metadata"))?
.ok_or_else(|| vortex_err!("Must have children if layout has metadata"))?
.get(0);
self.layout_builder.read_layout(
self.fb_bytes.clone(),
metadata_fb._tab.loc(),
// TODO(robert): Create stats projection
Scan::empty(),
self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID),
Scan::new(Some(Arc::new(Select::include(
set_stats.iter().map(|s| s.to_string().into()).collect(),
)))),
self.message_cache.relative(
METADATA_LAYOUT_PART_ID,
Arc::new(LazyDType::from_dtype(stats_table_dtype(
&set_stats,
self.message_cache.dtype().value()?,
))),
),
)
})
.transpose()
}

fn has_metadata(&self) -> bool {
self.flatbuffer()
.metadata()
.map(|b| b.bytes()[0] != 0)
.unwrap_or(false)
}

fn children(&self) -> impl Iterator<Item = (usize, footer::Layout)> {
self.flatbuffer()
.children()
.unwrap_or_default()
.iter()
.enumerate()
.skip(if self.has_metadata() { 1 } else { 0 })
.skip(if self.flatbuffer().metadata().is_some() {
1
} else {
0
})
}

fn children_ranges(&self) -> Vec<(usize, usize)> {
Expand Down Expand Up @@ -146,6 +154,15 @@ impl ChunkedLayoutBuilder {
}
}

fn stats_table_dtype(stats: &[Stat], dtype: &DType) -> DType {
let dtypes = stats.iter().map(|s| s.dtype(dtype).as_nullable()).collect();

DType::Struct(
StructDType::new(stats.iter().map(|s| s.to_string().into()).collect(), dtypes),
Nullability::NonNullable,
)
}

#[derive(Debug, Default, Clone)]
enum ChildRead {
#[default]
Expand Down Expand Up @@ -457,7 +474,7 @@ mod tests {
let written = writer.into_inner();

let mut fb = FlatBufferBuilder::new();
let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, false);
let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, None);
let flat_buf = chunked_layout.write_flatbuffer(&mut fb);
fb.finish_minimal(flat_buf);
let fb_bytes = Bytes::copy_from_slice(fb.finished_data());
Expand Down
Loading

0 comments on commit b5127b8

Please sign in to comment.