Skip to content

Commit

Permalink
Only deserialize the required dtypes by projection from the footer (#569
Browse files Browse the repository at this point in the history
)
  • Loading branch information
AdamGS authored Aug 8, 2024
1 parent 2e6e6ba commit 98561a3
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
10 changes: 8 additions & 2 deletions vortex-serde/src/layouts/reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
let projection = self.projection.unwrap_or_default();
let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

let projected_dtype = match &projection {
Projection::All => footer.dtype()?,
Projection::Partial(projection) => footer.projected_dtype(projection)?,
};

let scan = Scan {
projection,
indices: self.indices,
Expand All @@ -83,11 +88,12 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layouts_cache = RelativeLayoutCache::new(message_cache.clone(), footer.dtype()?);
let layouts_cache =
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone());

let layout = footer.layout(scan.clone(), layouts_cache)?;

VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, footer.dtype()?, scan)
VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, projected_dtype, scan)
}

async fn len(&self) -> usize {
Expand Down
43 changes: 41 additions & 2 deletions vortex-serde/src/layouts/reader/footer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use bytes::Bytes;
use flatbuffers::root;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_dtype::{DType, StructDType};
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::dtype::Struct_;
use vortex_flatbuffers::ReadFlatBuffer;

use crate::layouts::reader::context::LayoutDeserializer;
Expand Down Expand Up @@ -54,4 +57,40 @@ impl Footer {
.0,
)
}

pub fn projected_dtype(&self, projection: &[usize]) -> VortexResult<DType> {
let start_offset = self.leftovers_schema_offset();
let end_offset = self.leftovers_footer_offset();
let dtype_bytes = &self.leftovers[start_offset..end_offset];

let fb_schema = root::<vortex_flatbuffers::message::Schema>(dtype_bytes)?;
let fb_dtype = fb_schema
.dtype()
.ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?;

let fb_struct = fb_dtype
.type__as_struct_()
.expect("The top-level type should be a struct");
let nullability = fb_struct.nullable().into();

let (names, dtypes): (Vec<Arc<str>>, Vec<DType>) = projection
.iter()
.map(|idx| Self::read_field(fb_struct, *idx))
.collect::<VortexResult<Vec<_>>>()?
.into_iter()
.unzip();

Ok(DType::Struct(
StructDType::new(names.into(), dtypes),
nullability,
))
}

fn read_field(fb_struct: Struct_, idx: usize) -> VortexResult<(Arc<str>, DType)> {
let name = fb_struct.names().unwrap().get(idx);
let fb_dtype = fb_struct.dtypes().unwrap().get(idx);
let dtype = DType::try_from(fb_dtype)?;

Ok((name.into(), dtype))
}
}
15 changes: 11 additions & 4 deletions vortex-serde/src/layouts/reader/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ impl ColumnLayout {
&self,
idx: usize,
children: Vector<ForwardsUOffset<fb::Layout>>,
st_dtype: &Arc<[DType]>,
dtype: DType,
) -> VortexResult<Box<dyn Layout>> {
let layout = children.get(idx);
let dtype = st_dtype[idx].clone();

// TODO: Figure out complex nested schema projections
let mut child_scan = self.scan.clone();
child_scan.projection = Projection::All;
Expand All @@ -185,11 +185,18 @@ impl Layout for ColumnLayout {

let column_layouts = match self.scan.projection {
Projection::All => (0..fb_children.len())
.map(|idx| self.read_child(idx, fb_children, s.dtypes()))
.map(|idx| self.read_child(idx, fb_children, s.dtypes()[idx].clone()))
.collect::<VortexResult<Vec<_>>>()?,
Projection::Partial(ref v) => v
.iter()
.map(|&idx| self.read_child(idx, fb_children, s.dtypes()))
.enumerate()
.map(|(position, &projection_idx)| {
self.read_child(
projection_idx,
fb_children,
s.dtypes()[position].clone(),
)
})
.collect::<VortexResult<Vec<_>>>()?,
};

Expand Down

0 comments on commit 98561a3

Please sign in to comment.