diff --git a/Cargo.lock b/Cargo.lock index 63a7496f2f..4d427e48b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4713,6 +4713,7 @@ dependencies = [ "itertools 0.13.0", "log", "num-traits", + "num_enum 0.7.3", "paste", "pin-project", "rand", diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 0abf6b198f..172f718d0c 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -42,6 +42,7 @@ humansize = { workspace = true } itertools = { workspace = true } log = { workspace = true } num-traits = { workspace = true } +num_enum = { workspace = true } paste = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index d91639f927..673fb81e44 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -2,13 +2,16 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; +use std::sync::Arc; -use enum_iterator::Sequence; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, MutableBuffer}; +use enum_iterator::{cardinality, Sequence}; use enum_map::Enum; use itertools::Itertools; +use num_enum::{IntoPrimitive, TryFromPrimitive}; pub use statsset::*; use vortex_dtype::Nullability::NonNullable; -use vortex_dtype::{DType, NativePType}; +use vortex_dtype::{DType, NativePType, PType}; use vortex_error::{vortex_err, vortex_panic, VortexError, VortexResult}; use vortex_scalar::Scalar; @@ -21,7 +24,10 @@ mod statsset; /// Statistics that are used for pruning files (i.e., we want to ensure they are computed when compressing/writing). pub const PRUNING_STATS: &[Stat] = &[Stat::Min, Stat::Max, Stat::TrueCount, Stat::NullCount]; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, Enum)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, Enum, IntoPrimitive, TryFromPrimitive, +)] +#[repr(u8)] pub enum Stat { /// Frequency of each bit width (nulls are treated as 0) BitWidthFreq, @@ -69,6 +75,53 @@ impl Stat { pub fn has_same_dtype_as_array(&self) -> bool { matches!(self, Stat::Min | Stat::Max) } + + pub fn dtype(&self, data_type: &DType) -> DType { + match self { + Stat::BitWidthFreq => DType::List( + Arc::new(DType::Primitive(PType::U64, NonNullable)), + NonNullable, + ), + Stat::TrailingZeroFreq => DType::List( + Arc::new(DType::Primitive(PType::U64, NonNullable)), + NonNullable, + ), + Stat::IsConstant => DType::Bool(NonNullable), + Stat::IsSorted => DType::Bool(NonNullable), + Stat::IsStrictSorted => DType::Bool(NonNullable), + Stat::Max => data_type.clone(), + Stat::Min => data_type.clone(), + Stat::RunCount => DType::Primitive(PType::U64, NonNullable), + Stat::TrueCount => DType::Primitive(PType::U64, NonNullable), + Stat::NullCount => DType::Primitive(PType::U64, NonNullable), + Stat::UncompressedSizeInBytes => DType::Primitive(PType::U64, NonNullable), + } + } +} + +pub fn as_stat_bitset_bytes(stats: &[Stat]) -> Vec { + let stat_count = cardinality::(); + let mut stat_bitset = BooleanBufferBuilder::new_from_buffer( + MutableBuffer::from_len_zeroed(stat_count.div_ceil(8)), + stat_count, + ); + for stat in stats { + stat_bitset.set_bit(u8::from(*stat) as usize, true); + } + + stat_bitset + .finish() + .into_inner() + .into_vec() + .unwrap_or_else(|b| b.to_vec()) +} + +pub fn stats_from_bitset_bytes(bytes: &[u8]) -> Vec { + BooleanBuffer::new(Buffer::from(bytes), 0, bytes.len() * 8) + .set_indices() + // Filter out indices failing conversion, these are stats written by newer version of library + .filter_map(|i| Stat::try_from(i as u8).ok()) + .collect::>() } impl Display for Stat { diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 481823fe34..923b9cb315 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -171,8 +171,6 @@ mod forever_constant { pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2); /// The layout ID for a column layout pub const COLUMNAR_LAYOUT_ID: LayoutId = LayoutId(3); - /// The layout ID for an inline schema layout - pub const INLINE_SCHEMA_LAYOUT_ID: LayoutId = LayoutId(4); #[cfg(test)] mod test { @@ -187,7 +185,6 @@ mod forever_constant { assert_eq!(FLAT_LAYOUT_ID, LayoutId(1)); assert_eq!(CHUNKED_LAYOUT_ID, LayoutId(2)); assert_eq!(COLUMNAR_LAYOUT_ID, LayoutId(3)); - assert_eq!(INLINE_SCHEMA_LAYOUT_ID, LayoutId(4)); } } } diff --git a/vortex-file/src/read/context.rs b/vortex-file/src/read/context.rs index e8c027c34f..75e269bd9b 100644 --- a/vortex-file/src/read/context.rs +++ b/vortex-file/src/read/context.rs @@ -7,7 +7,7 @@ use vortex_array::Context; use vortex_error::{vortex_err, VortexResult}; use vortex_flatbuffers::footer as fb; -use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout, InlineDTypeLayout}; +use crate::layouts::{ChunkedLayout, ColumnarLayout, FlatLayout}; use crate::{LayoutReader, RelativeLayoutCache, Scan}; #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] @@ -52,15 +52,10 @@ impl LayoutContext { impl Default for LayoutContext { fn default() -> Self { Self::new( - [ - &ColumnarLayout as LayoutRef, - &ChunkedLayout, - &InlineDTypeLayout, - &FlatLayout, - ] - .into_iter() - .map(|l| (l.id(), l)) - .collect(), + [&ColumnarLayout as LayoutRef, &ChunkedLayout, &FlatLayout] + .into_iter() + .map(|l| (l.id(), l)) + .collect(), ) } } diff --git a/vortex-file/src/read/layouts/chunked.rs b/vortex-file/src/read/layouts/chunked.rs index 92ff5e7589..515d515bc7 100644 --- a/vortex-file/src/read/layouts/chunked.rs +++ b/vortex-file/src/read/layouts/chunked.rs @@ -1,14 +1,16 @@ use std::collections::BTreeSet; -use std::sync::{OnceLock, RwLock}; +use std::sync::{Arc, OnceLock, RwLock}; use bytes::Bytes; use itertools::Itertools; use vortex_array::aliases::hash_map::HashMap; use vortex_array::array::ChunkedArray; use vortex_array::compute::{scalar_at, take, TakeOptions}; -use vortex_array::stats::{ArrayStatistics as _, Stat}; +use vortex_array::stats::{stats_from_bitset_bytes, ArrayStatistics as _, Stat}; use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; +use vortex_dtype::{DType, Nullability, StructDType}; use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult}; +use vortex_expr::Select; use vortex_flatbuffers::footer; use crate::layouts::RangedLayoutReader; @@ -16,8 +18,8 @@ use crate::pruning::PruningPredicate; use crate::read::cache::RelativeLayoutCache; use crate::read::mask::RowMask; use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, - MetadataRead, PruningRead, Scan, CHUNKED_LAYOUT_ID, + BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LazyDType, + MessageLocator, MetadataRead, PruningRead, Scan, CHUNKED_LAYOUT_ID, }; #[derive(Default, Debug)] @@ -76,38 +78,44 @@ impl ChunkedLayoutBuilder { } fn metadata_layout(&self) -> VortexResult>> { - self.has_metadata() - .then(|| { + self.flatbuffer() + .metadata() + .map(|m| { + let set_stats = stats_from_bitset_bytes(m.bytes()); let metadata_fb = self .flatbuffer() .children() - .ok_or_else(|| vortex_err!("must have metadata"))? + .ok_or_else(|| vortex_err!("Must have children if layout has metadata"))? .get(0); self.layout_builder.read_layout( self.fb_bytes.clone(), metadata_fb._tab.loc(), - // TODO(robert): Create stats projection - Scan::empty(), - self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID), + Scan::new(Some(Arc::new(Select::include( + set_stats.iter().map(|s| s.to_string().into()).collect(), + )))), + self.message_cache.relative( + METADATA_LAYOUT_PART_ID, + Arc::new(LazyDType::from_dtype(stats_table_dtype( + &set_stats, + self.message_cache.dtype().value()?, + ))), + ), ) }) .transpose() } - fn has_metadata(&self) -> bool { - self.flatbuffer() - .metadata() - .map(|b| b.bytes()[0] != 0) - .unwrap_or(false) - } - fn children(&self) -> impl Iterator { self.flatbuffer() .children() .unwrap_or_default() .iter() .enumerate() - .skip(if self.has_metadata() { 1 } else { 0 }) + .skip(if self.flatbuffer().metadata().is_some() { + 1 + } else { + 0 + }) } fn children_ranges(&self) -> Vec<(usize, usize)> { @@ -146,6 +154,15 @@ impl ChunkedLayoutBuilder { } } +fn stats_table_dtype(stats: &[Stat], dtype: &DType) -> DType { + let dtypes = stats.iter().map(|s| s.dtype(dtype).as_nullable()).collect(); + + DType::Struct( + StructDType::new(stats.iter().map(|s| s.to_string().into()).collect(), dtypes), + Nullability::NonNullable, + ) +} + #[derive(Debug, Default, Clone)] enum ChildRead { #[default] @@ -457,7 +474,7 @@ mod tests { let written = writer.into_inner(); let mut fb = FlatBufferBuilder::new(); - let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, false); + let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, None); let flat_buf = chunked_layout.write_flatbuffer(&mut fb); fb.finish_minimal(flat_buf); let fb_bytes = Bytes::copy_from_slice(fb.finished_data()); diff --git a/vortex-file/src/read/layouts/inline_dtype.rs b/vortex-file/src/read/layouts/inline_dtype.rs deleted file mode 100644 index 4d52737986..0000000000 --- a/vortex-file/src/read/layouts/inline_dtype.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::collections::BTreeSet; -use std::sync::Arc; - -use bytes::Bytes; -use flatbuffers::root; -use once_cell::sync::OnceCell; -use vortex_error::{vortex_bail, VortexResult}; -use vortex_flatbuffers::{footer, message}; -use vortex_ipc::stream_writer::ByteRange; - -use crate::read::cache::{LazyDType, RelativeLayoutCache}; -use crate::read::mask::RowMask; -use crate::{ - BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator, - MetadataRead, PruningRead, Scan, INLINE_SCHEMA_LAYOUT_ID, -}; - -#[derive(Debug)] -pub struct InlineDTypeLayout; - -impl Layout for InlineDTypeLayout { - fn id(&self) -> LayoutId { - INLINE_SCHEMA_LAYOUT_ID - } - - fn reader( - &self, - fb_bytes: Bytes, - fb_loc: usize, - scan: Scan, - layout_reader: LayoutDeserializer, - message_cache: RelativeLayoutCache, - ) -> VortexResult> { - Ok(Box::new(InlineDTypeLayoutReader::new( - fb_bytes, - fb_loc, - scan, - layout_reader, - message_cache, - ))) - } -} - -/// Layout that contains its own DType. -#[derive(Debug)] -pub struct InlineDTypeLayoutReader { - fb_bytes: Bytes, - fb_loc: usize, - scan: Scan, - layout_builder: LayoutDeserializer, - message_cache: RelativeLayoutCache, - child_layout: OnceCell>, -} - -const INLINE_DTYPE_BUFFER_IDX: LayoutPartId = 0; -const INLINE_DTYPE_CHILD_IDX: LayoutPartId = 1; - -impl InlineDTypeLayoutReader { - pub fn new( - fb_bytes: Bytes, - fb_loc: usize, - scan: Scan, - layout_builder: LayoutDeserializer, - message_cache: RelativeLayoutCache, - ) -> Self { - Self { - fb_bytes, - fb_loc, - scan, - layout_builder, - message_cache, - child_layout: OnceCell::new(), - } - } - - fn flatbuffer(&self) -> footer::Layout { - unsafe { - let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc); - footer::Layout::init_from_table(tab) - } - } - - fn dtype_message(&self) -> VortexResult { - let buffers = self.flatbuffer().buffers().unwrap_or_default(); - if buffers.is_empty() { - vortex_bail!("Missing buffers for inline dtype layout") - } - let dtype_buf = buffers.get(0); - Ok(MessageLocator( - self.message_cache.absolute_id(&[INLINE_DTYPE_BUFFER_IDX]), - ByteRange::new(dtype_buf.begin(), dtype_buf.end()), - )) - } - - fn dtype(&self) -> VortexResult> { - if let Some(dt_bytes) = self.message_cache.get(&[INLINE_DTYPE_BUFFER_IDX]) { - root::(&dt_bytes)?; - Ok(Arc::new(unsafe { LazyDType::from_schema_bytes(dt_bytes) })) - } else { - Ok(Arc::new(LazyDType::unknown())) - } - } - - fn child_reader(&self) -> VortexResult> { - self.layout_builder.read_layout( - self.fb_bytes.clone(), - self.child_layout()?._tab.loc(), - self.scan.clone(), - self.message_cache - .relative(INLINE_DTYPE_CHILD_IDX, self.dtype()?), - ) - } - - fn child_layout(&self) -> VortexResult { - let children = self.flatbuffer().children().unwrap_or_default(); - if children.is_empty() { - vortex_bail!("Missing children for inline dtype layout") - } - Ok(children.get(0)) - } -} - -impl LayoutReader for InlineDTypeLayoutReader { - fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet) -> VortexResult<()> { - self.child_reader()?.add_splits(row_offset, splits) - } - - fn read_selection(&self, selector: &RowMask) -> VortexResult> { - if let Some(cr) = self.child_layout.get() { - cr.read_selection(selector) - } else { - if self.message_cache.get(&[INLINE_DTYPE_BUFFER_IDX]).is_some() { - self.child_layout.get_or_try_init(|| self.child_reader())?; - return self.read_selection(selector); - } - Ok(Some(BatchRead::ReadMore(vec![self.dtype_message()?]))) - } - } - - fn read_metadata(&self) -> VortexResult> { - Ok(None) - } - - fn can_prune(&self, _begin: usize, _end: usize) -> VortexResult { - Ok(PruningRead::Value(false)) - } -} diff --git a/vortex-file/src/read/layouts/mod.rs b/vortex-file/src/read/layouts/mod.rs index b1925b7901..338e7f914c 100644 --- a/vortex-file/src/read/layouts/mod.rs +++ b/vortex-file/src/read/layouts/mod.rs @@ -1,14 +1,12 @@ mod chunked; mod columnar; mod flat; -mod inline_dtype; #[cfg(test)] mod test_read; pub use chunked::ChunkedLayout; pub use columnar::ColumnarLayout; pub use flat::FlatLayout; -pub use inline_dtype::InlineDTypeLayout; use crate::LayoutReader; diff --git a/vortex-file/src/read/metadata.rs b/vortex-file/src/read/metadata.rs index b0eafd8f4b..4a6ed3762e 100644 --- a/vortex-file/src/read/metadata.rs +++ b/vortex-file/src/read/metadata.rs @@ -153,7 +153,7 @@ mod test { assert!(metadata_table.is_some()); let metadata_table = metadata_table.unwrap(); - assert!(metadata_table.len() == 2); + assert_eq!(metadata_table.len(), 2); assert!(metadata_table.iter().all(Option::is_some)); let name_metadata_table = metadata_table[0] diff --git a/vortex-file/src/write/layout.rs b/vortex-file/src/write/layout.rs index 7a62b00c64..5ed8cc085c 100644 --- a/vortex-file/src/write/layout.rs +++ b/vortex-file/src/write/layout.rs @@ -3,9 +3,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset}; use vortex_flatbuffers::{footer as fb, WriteFlatBuffer}; use vortex_ipc::stream_writer::ByteRange; -use crate::{ - LayoutId, CHUNKED_LAYOUT_ID, COLUMNAR_LAYOUT_ID, FLAT_LAYOUT_ID, INLINE_SCHEMA_LAYOUT_ID, -}; +use crate::{LayoutId, CHUNKED_LAYOUT_ID, COLUMNAR_LAYOUT_ID, FLAT_LAYOUT_ID}; #[derive(Debug, Clone)] pub struct LayoutSpec { @@ -30,13 +28,13 @@ impl LayoutSpec { /// Create a chunked layout with children. /// /// has_metadata indicates whether first child is a layout containing metadata about other children. - pub fn chunked(children: Vec, row_count: u64, has_metadata: bool) -> Self { + pub fn chunked(children: Vec, row_count: u64, metadata: Option) -> Self { Self { id: CHUNKED_LAYOUT_ID, buffers: None, children: Some(children), row_count, - metadata: Some(Bytes::copy_from_slice(&[has_metadata as u8])), + metadata, } } @@ -49,20 +47,6 @@ impl LayoutSpec { metadata: None, } } - - pub fn inlined_schema( - children: Vec, - row_count: u64, - dtype_buffer: ByteRange, - ) -> Self { - Self { - id: INLINE_SCHEMA_LAYOUT_ID, - buffers: Some(vec![dtype_buffer]), - children: Some(children), - row_count, - metadata: None, - } - } } impl WriteFlatBuffer for LayoutSpec { diff --git a/vortex-file/src/write/stats_accumulator.rs b/vortex-file/src/write/stats_accumulator.rs index 40be43dc9d..d93bf893be 100644 --- a/vortex-file/src/write/stats_accumulator.rs +++ b/vortex-file/src/write/stats_accumulator.rs @@ -1,15 +1,12 @@ //! Metadata accumulators track the per-chunk-of-a-column metadata, layout locations, and row counts. -use std::sync::Arc; - use itertools::Itertools; use vortex_array::array::StructArray; use vortex_array::builders::{builder_with_capacity, ArrayBuilder, ArrayBuilderExt}; use vortex_array::stats::{ArrayStatistics as _, Stat}; use vortex_array::validity::{ArrayValidity, Validity}; use vortex_array::{ArrayData, IntoArrayData}; -use vortex_dtype::Nullability::{NonNullable, Nullable}; -use vortex_dtype::{DType, PType}; +use vortex_dtype::DType; use vortex_error::VortexResult; pub struct StatsAccumulator { @@ -19,31 +16,12 @@ pub struct StatsAccumulator { } impl StatsAccumulator { - pub fn new(dtype: &DType, stats: Vec) -> Self { + pub fn new(dtype: &DType, mut stats: Vec) -> Self { + // Sort stats by their ordinal so we can recreate their dtype from bitset + stats.sort_by_key(|s| u8::from(*s)); let builders = stats .iter() - .map(|s| { - let dtype = match s { - Stat::BitWidthFreq => DType::List( - Arc::new(DType::Primitive(PType::U64, NonNullable)), - Nullable, - ), - Stat::TrailingZeroFreq => DType::List( - Arc::new(DType::Primitive(PType::U64, NonNullable)), - Nullable, - ), - Stat::IsConstant => DType::Bool(Nullable), - Stat::IsSorted => DType::Bool(Nullable), - Stat::IsStrictSorted => DType::Bool(Nullable), - Stat::Max => dtype.as_nullable(), - Stat::Min => dtype.as_nullable(), - Stat::RunCount => DType::Primitive(PType::U64, Nullable), - Stat::TrueCount => DType::Primitive(PType::U64, Nullable), - Stat::NullCount => DType::Primitive(PType::U64, Nullable), - Stat::UncompressedSizeInBytes => DType::Primitive(PType::U64, Nullable), - }; - builder_with_capacity(&dtype, 1024) - }) + .map(|s| builder_with_capacity(&s.dtype(dtype).as_nullable(), 1024)) .collect(); Self { stats, @@ -64,20 +42,22 @@ impl StatsAccumulator { Ok(()) } - pub fn into_array(mut self) -> VortexResult> { - let mut names = vec![]; - let mut fields = vec![]; + pub fn into_array(mut self) -> VortexResult> { + let mut names = Vec::new(); + let mut fields = Vec::new(); + let mut stats = Vec::new(); for (stat, builder) in self.stats.iter().zip(self.builders.iter_mut()) { - let values = builder.finish().map_err(|e| { - e.with_context(format!("Failed to finish stat builder for {}", stat)) - })?; + let values = builder + .finish() + .map_err(|e| e.with_context(format!("Failed to finish stat builder for {stat}")))?; // We drop any all-null stats columns if values.logical_validity().null_count()? == values.len() { continue; } + stats.push(*stat); names.push(stat.to_string().into()); fields.push(values); } @@ -86,9 +66,12 @@ impl StatsAccumulator { return Ok(None); } - Ok(Some( + Ok(Some(StatArray( StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)? .into_array(), - )) + stats, + ))) } } + +pub struct StatArray(pub ArrayData, pub Vec); diff --git a/vortex-file/src/write/writer.rs b/vortex-file/src/write/writer.rs index 1228f5be50..715910f931 100644 --- a/vortex-file/src/write/writer.rs +++ b/vortex-file/src/write/writer.rs @@ -1,12 +1,13 @@ -use std::{io, mem}; +use std::{io, iter, mem}; +use bytes::Bytes; use flatbuffers::FlatBufferBuilder; use futures::TryStreamExt; use itertools::Itertools; use vortex_array::array::{ChunkedArray, StructArray}; -use vortex_array::stats::{ArrayStatistics, Stat}; +use vortex_array::stats::{as_stat_bitset_bytes, ArrayStatistics, Stat}; use vortex_array::stream::ArrayStream; -use vortex_array::{ArrayDType, ArrayData, ArrayLen}; +use vortex_array::{ArrayData, ArrayLen}; use vortex_buffer::io_buf::IoBuf; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect as _, VortexResult}; @@ -17,7 +18,7 @@ use vortex_ipc::messages::IPCSchema; use vortex_ipc::stream_writer::ByteRange; use crate::write::postscript::Postscript; -use crate::write::stats_accumulator::StatsAccumulator; +use crate::write::stats_accumulator::{StatArray, StatsAccumulator}; use crate::{LayoutSpec, EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION}; const STATS_TO_WRITE: &[Stat] = &[ @@ -271,24 +272,19 @@ impl ColumnWriter { .map(|(range, len)| LayoutSpec::flat(range, len)) }); - if let Some(metadata_array) = self.metadata.into_array()? { + if let Some(StatArray(metadata_array, present_stats)) = self.metadata.into_array()? { let expected_n_data_chunks = metadata_array.len(); - let dtype_begin = msgs.tell(); - msgs.write_dtype_raw(metadata_array.dtype()).await?; - let dtype_end = msgs.tell(); + let stat_bitset = as_stat_bitset_bytes(&present_stats); + + let metadata_array_begin = msgs.tell(); msgs.write_batch(metadata_array).await?; let metadata_array_end = msgs.tell(); - let layouts = [LayoutSpec::inlined_schema( - vec![LayoutSpec::flat( - ByteRange::new(dtype_end, metadata_array_end), - expected_n_data_chunks as u64, - )], + let layouts = iter::once(LayoutSpec::flat( + ByteRange::new(metadata_array_begin, metadata_array_end), expected_n_data_chunks as u64, - ByteRange::new(dtype_begin, dtype_end), - )] - .into_iter() + )) .chain(data_chunks) .collect::>(); @@ -299,9 +295,14 @@ impl ColumnWriter { layouts.len() ); } - Ok(LayoutSpec::chunked(layouts, row_count, true)) + + Ok(LayoutSpec::chunked( + layouts, + row_count, + Some(Bytes::from(stat_bitset)), + )) } else { - Ok(LayoutSpec::chunked(data_chunks.collect(), row_count, false)) + Ok(LayoutSpec::chunked(data_chunks.collect(), row_count, None)) } } }