From 98561a33a5eedf7b2de4b2b4cff5640e1f915a9f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 8 Aug 2024 17:15:44 +0100 Subject: [PATCH] Only deserialize the required dtypes by projection from the footer (#569) --- vortex-serde/src/layouts/reader/builder.rs | 10 ++++- vortex-serde/src/layouts/reader/footer.rs | 43 +++++++++++++++++++++- vortex-serde/src/layouts/reader/layouts.rs | 15 ++++++-- 3 files changed, 60 insertions(+), 8 deletions(-) diff --git a/vortex-serde/src/layouts/reader/builder.rs b/vortex-serde/src/layouts/reader/builder.rs index e1a7af5a42..f12f173dec 100644 --- a/vortex-serde/src/layouts/reader/builder.rs +++ b/vortex-serde/src/layouts/reader/builder.rs @@ -75,6 +75,11 @@ impl VortexLayoutReaderBuilder { let projection = self.projection.unwrap_or_default(); let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + let projected_dtype = match &projection { + Projection::All => footer.dtype()?, + Projection::Partial(projection) => footer.projected_dtype(projection)?, + }; + let scan = Scan { projection, indices: self.indices, @@ -83,11 +88,12 @@ impl VortexLayoutReaderBuilder { }; let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default())); - let layouts_cache = RelativeLayoutCache::new(message_cache.clone(), footer.dtype()?); + let layouts_cache = + RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone()); let layout = footer.layout(scan.clone(), layouts_cache)?; - VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, footer.dtype()?, scan) + VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, projected_dtype, scan) } async fn len(&self) -> usize { diff --git a/vortex-serde/src/layouts/reader/footer.rs b/vortex-serde/src/layouts/reader/footer.rs index 1c0cf26b43..8998209ea0 100644 --- a/vortex-serde/src/layouts/reader/footer.rs +++ b/vortex-serde/src/layouts/reader/footer.rs @@ -1,7 +1,10 @@ +use std::sync::Arc; + use bytes::Bytes; use flatbuffers::root; -use vortex_dtype::DType; -use vortex_error::VortexResult; +use vortex_dtype::{DType, StructDType}; +use vortex_error::{vortex_err, VortexResult}; +use vortex_flatbuffers::dtype::Struct_; use vortex_flatbuffers::ReadFlatBuffer; use crate::layouts::reader::context::LayoutDeserializer; @@ -54,4 +57,40 @@ impl Footer { .0, ) } + + pub fn projected_dtype(&self, projection: &[usize]) -> VortexResult { + let start_offset = self.leftovers_schema_offset(); + let end_offset = self.leftovers_footer_offset(); + let dtype_bytes = &self.leftovers[start_offset..end_offset]; + + let fb_schema = root::(dtype_bytes)?; + let fb_dtype = fb_schema + .dtype() + .ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?; + + let fb_struct = fb_dtype + .type__as_struct_() + .expect("The top-level type should be a struct"); + let nullability = fb_struct.nullable().into(); + + let (names, dtypes): (Vec>, Vec) = projection + .iter() + .map(|idx| Self::read_field(fb_struct, *idx)) + .collect::>>()? + .into_iter() + .unzip(); + + Ok(DType::Struct( + StructDType::new(names.into(), dtypes), + nullability, + )) + } + + fn read_field(fb_struct: Struct_, idx: usize) -> VortexResult<(Arc, DType)> { + let name = fb_struct.names().unwrap().get(idx); + let fb_dtype = fb_struct.dtypes().unwrap().get(idx); + let dtype = DType::try_from(fb_dtype)?; + + Ok((name.into(), dtype)) + } } diff --git a/vortex-serde/src/layouts/reader/layouts.rs b/vortex-serde/src/layouts/reader/layouts.rs index 80554ed9b6..468b8fca0b 100644 --- a/vortex-serde/src/layouts/reader/layouts.rs +++ b/vortex-serde/src/layouts/reader/layouts.rs @@ -156,10 +156,10 @@ impl ColumnLayout { &self, idx: usize, children: Vector>, - st_dtype: &Arc<[DType]>, + dtype: DType, ) -> VortexResult> { let layout = children.get(idx); - let dtype = st_dtype[idx].clone(); + // TODO: Figure out complex nested schema projections let mut child_scan = self.scan.clone(); child_scan.projection = Projection::All; @@ -185,11 +185,18 @@ impl Layout for ColumnLayout { let column_layouts = match self.scan.projection { Projection::All => (0..fb_children.len()) - .map(|idx| self.read_child(idx, fb_children, s.dtypes())) + .map(|idx| self.read_child(idx, fb_children, s.dtypes()[idx].clone())) .collect::>>()?, Projection::Partial(ref v) => v .iter() - .map(|&idx| self.read_child(idx, fb_children, s.dtypes())) + .enumerate() + .map(|(position, &projection_idx)| { + self.read_child( + projection_idx, + fb_children, + s.dtypes()[position].clone(), + ) + }) .collect::>>()?, };