From 9fb33276fa13ca47f5ee141f53b47b6a49bfe32d Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 8 Aug 2024 13:35:44 +0100 Subject: [PATCH] Push column projections down to the file IO layer (#568) Part of #567 --------- Co-authored-by: Robert Kruszewski --- vortex-serde/src/layouts/reader/batch.rs | 10 ++--- vortex-serde/src/layouts/reader/layouts.rs | 50 ++++++++++++++-------- vortex-serde/src/layouts/reader/stream.rs | 15 ++----- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/vortex-serde/src/layouts/reader/batch.rs b/vortex-serde/src/layouts/reader/batch.rs index e0e59dfa5e..f64671b5c3 100644 --- a/vortex-serde/src/layouts/reader/batch.rs +++ b/vortex-serde/src/layouts/reader/batch.rs @@ -25,7 +25,7 @@ impl BatchReader { } pub fn read(&mut self) -> VortexResult> { - let mut rr1 = Vec::new(); + let mut messages = Vec::new(); for (i, child_array) in self .arrays .iter_mut() @@ -34,8 +34,8 @@ impl BatchReader { { match self.children[i].read()? { Some(rr) => match rr { - ReadResult::GetMsgs(r1) => { - rr1.extend(r1); + ReadResult::GetMsgs(message) => { + messages.extend(message); } ReadResult::Batch(a) => *child_array = Some(a), }, @@ -49,7 +49,7 @@ impl BatchReader { } } - if rr1.is_empty() { + if messages.is_empty() { let child_arrays = mem::replace(&mut self.arrays, vec![None; self.children.len()]) .into_iter() .map(|a| a.unwrap()); @@ -58,7 +58,7 @@ impl BatchReader { .into_array(), ))); } else { - Ok(Some(ReadResult::GetMsgs(rr1))) + Ok(Some(ReadResult::GetMsgs(messages))) } } } diff --git a/vortex-serde/src/layouts/reader/layouts.rs b/vortex-serde/src/layouts/reader/layouts.rs index 344f89b1b7..80554ed9b6 100644 --- a/vortex-serde/src/layouts/reader/layouts.rs +++ b/vortex-serde/src/layouts/reader/layouts.rs @@ -2,11 +2,13 @@ use std::collections::VecDeque; use std::sync::Arc; use bytes::Bytes; +use flatbuffers::{ForwardsUOffset, Vector}; use vortex::Context; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_flatbuffers::footer as fb; +use super::projections::Projection; use crate::layouts::reader::batch::BatchReader; use crate::layouts::reader::buffered::BufferedReader; use crate::layouts::reader::context::{LayoutDeserializer, LayoutId, LayoutSpec}; @@ -129,7 +131,6 @@ impl ColumnLayout { fb_bytes: Bytes, fb_loc: usize, scan: Scan, - layout_serde: LayoutDeserializer, message_cache: RelativeLayoutCache, ) -> Self { @@ -150,6 +151,26 @@ impl ColumnLayout { }; fb_layout.layout_as_nested_layout().expect("must be nested") } + + fn read_child( + &self, + idx: usize, + children: Vector>, + st_dtype: &Arc<[DType]>, + ) -> VortexResult> { + let layout = children.get(idx); + let dtype = st_dtype[idx].clone(); + // TODO: Figure out complex nested schema projections + let mut child_scan = self.scan.clone(); + child_scan.projection = Projection::All; + + self.layout_serde.read_layout( + self.fb_bytes.clone(), + layout._tab.loc(), + child_scan, + self.message_cache.relative(idx as u16, dtype), + ) + } } impl Layout for ColumnLayout { @@ -162,24 +183,19 @@ impl Layout for ColumnLayout { let fb_children = self.flatbuffer().children().expect("must have children"); - let columns = fb_children - .into_iter() - .enumerate() - .zip(s.dtypes().iter().cloned()) - .map(|((idx, child), dtype)| { - self.layout_serde.read_layout( - self.fb_bytes.clone(), - child._tab.loc(), - self.scan.clone(), - self.message_cache.relative(idx as u16, dtype), - ) - }) - .collect::>>()?; + let column_layouts = match self.scan.projection { + Projection::All => (0..fb_children.len()) + .map(|idx| self.read_child(idx, fb_children, s.dtypes())) + .collect::>>()?, + Projection::Partial(ref v) => v + .iter() + .map(|&idx| self.read_child(idx, fb_children, s.dtypes())) + .collect::>>()?, + }; - let mut reader = BatchReader::new(s.names().clone(), columns); - let rr = reader.read()?; + let reader = BatchReader::new(s.names().clone(), column_layouts); self.state = ColumnLayoutState::ReadColumns(reader); - Ok(rr) + self.read() } ColumnLayoutState::ReadColumns(br) => br.read(), } diff --git a/vortex-serde/src/layouts/reader/stream.rs b/vortex-serde/src/layouts/reader/stream.rs index 4f507d6c2a..dd0531a4d7 100644 --- a/vortex-serde/src/layouts/reader/stream.rs +++ b/vortex-serde/src/layouts/reader/stream.rs @@ -7,7 +7,6 @@ use bytes::{Bytes, BytesMut}; use futures::Stream; use futures_util::future::BoxFuture; use futures_util::{stream, FutureExt, StreamExt, TryStreamExt}; -use vortex::array::StructArray; use vortex::compute::unary::subtract_scalar; use vortex::compute::{filter, filter_indices, search_sorted, slice, take, SearchSortedSide}; use vortex::{Array, IntoArray, IntoArrayVariant}; @@ -16,7 +15,6 @@ use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; use crate::io::VortexReadAt; -use crate::layouts::reader::projections::Projection; use crate::layouts::reader::schema::Schema; use crate::layouts::reader::{Layout, LayoutMessageCache, MessageId, ReadResult, Scan}; use crate::writer::ByteRange; @@ -94,10 +92,10 @@ impl Stream for VortexLayoutBatchStrea StreamingState::Init => { if let Some(read) = self.layout.read()? { match read { - ReadResult::GetMsgs(r1) => { + ReadResult::GetMsgs(messages) => { let reader = mem::take(&mut self.reader).expect("Invalid state transition"); - let read_future = read_ranges(reader, r1).boxed(); + let read_future = read_ranges(reader, messages).boxed(); self.state = StreamingState::Reading(read_future); } ReadResult::Batch(a) => self.state = StreamingState::Decoding(a), @@ -117,15 +115,8 @@ impl Stream for VortexLayoutBatchStrea batch = filter(&batch, &mask)?; } - let projected = match &self.scan.projection { - Projection::All => batch, - Projection::Partial(indices) => StructArray::try_from(batch.clone())? - .project(indices.as_ref())? - .into_array(), - }; - self.state = StreamingState::Init; - return Poll::Ready(Some(Ok(projected))); + return Poll::Ready(Some(Ok(batch))); } StreamingState::Reading(f) => match ready!(f.poll_unpin(cx)) { Ok((read, buffers)) => {