From c153bd34337167dc6ad3b01e3e6f778ac5a16b6e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 1 Aug 2024 14:06:57 +0100 Subject: [PATCH] Add identity projection to the file reader (#532) --- vortex-serde/src/file/layouts.rs | 15 +++++++---- vortex-serde/src/file/reader/mod.rs | 29 ++++++++++----------- vortex-serde/src/file/reader/projections.rs | 18 +++++-------- vortex-serde/src/file/reader/schema.rs | 5 +++- 4 files changed, 35 insertions(+), 32 deletions(-) diff --git a/vortex-serde/src/file/layouts.rs b/vortex-serde/src/file/layouts.rs index 3f79ea20d8..1350a31430 100644 --- a/vortex-serde/src/file/layouts.rs +++ b/vortex-serde/src/file/layouts.rs @@ -193,13 +193,18 @@ impl StructLayout { #[allow(dead_code)] pub(crate) fn project(&self, projection: &Projection) -> StructLayout { - let mut new_children = VecDeque::with_capacity(projection.indices().len()); + match projection { + Projection::All => self.clone(), + Projection::Partial(indices) => { + let mut new_children = VecDeque::with_capacity(indices.len()); - for &idx in projection.indices() { - new_children.push_back(self.children[idx].clone()); - } + for &idx in indices.iter() { + new_children.push_back(self.children[idx].clone()); + } - StructLayout::new(new_children) + StructLayout::new(new_children) + } + } } } diff --git a/vortex-serde/src/file/reader/mod.rs b/vortex-serde/src/file/reader/mod.rs index 3bae2d0b68..f6a4905bdf 100644 --- a/vortex-serde/src/file/reader/mod.rs +++ b/vortex-serde/src/file/reader/mod.rs @@ -30,6 +30,7 @@ pub mod projections; pub mod schema; const DEFAULT_BATCH_SIZE: usize = 65536; +const DEFAULT_PROJECTION: Projection = Projection::All; pub struct VortexBatchReaderBuilder { reader: R, @@ -102,6 +103,7 @@ impl VortexBatchReaderBuilder { }; let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); + let projection = self.projection.unwrap_or(DEFAULT_PROJECTION); VortexBatchStream::try_new( self.reader, @@ -109,7 +111,7 @@ impl VortexBatchReaderBuilder { dtype, self.row_filter.unwrap_or_default(), batch_size, - self.projection, + projection, self.take_indices, ) } @@ -170,8 +172,7 @@ impl VortexBatchReaderBuilder { pub struct VortexBatchStream { dtype: StructDType, - // TODO(robert): Have identity projection - projection: Option, + projection: Projection, take_indices: Option, row_filter: RowFilter, batch_reader: Option>, @@ -190,7 +191,7 @@ impl VortexBatchStream { dtype: StructDType, row_filter: RowFilter, batch_size: usize, - projection: Option, + projection: Projection, take_indices: Option, ) -> VortexResult { let schema = Schema(dtype.clone()); @@ -297,17 +298,15 @@ impl Stream for VortexBatchStream { } batch = filter(&batch, ¤t_predicate)?; - let projected = self - .projection - .as_ref() - .map(|p| { - StructArray::try_from(batch.clone()) - .unwrap() - .project(p.indices()) - .unwrap() - .into_array() - }) - .unwrap_or(batch); + + let projected = match &self.projection { + Projection::All => batch, + Projection::Partial(indices) => StructArray::try_from(batch.clone()) + .unwrap() + .project(indices.as_ref()) + .unwrap() + .into_array(), + }; return Poll::Ready(Some(Ok(projected))); } diff --git a/vortex-serde/src/file/reader/projections.rs b/vortex-serde/src/file/reader/projections.rs index 5c009ba1e1..30f5a41b8b 100644 --- a/vortex-serde/src/file/reader/projections.rs +++ b/vortex-serde/src/file/reader/projections.rs @@ -1,22 +1,18 @@ -#[derive(Clone)] -pub struct Projection { - indices: Vec, +#[derive(Clone, Default)] +pub enum Projection { + #[default] + All, + Partial(Vec), } impl Projection { pub fn new(indices: impl AsRef<[usize]>) -> Self { - Projection { - indices: Vec::from(indices.as_ref()), - } - } - - pub fn indices(&self) -> &[usize] { - self.indices.as_ref() + Self::Partial(Vec::from(indices.as_ref())) } } impl From> for Projection { fn from(indices: Vec) -> Self { - Self { indices } + Self::Partial(indices) } } diff --git a/vortex-serde/src/file/reader/schema.rs b/vortex-serde/src/file/reader/schema.rs index 0a7326d5b9..4e87797666 100644 --- a/vortex-serde/src/file/reader/schema.rs +++ b/vortex-serde/src/file/reader/schema.rs @@ -16,6 +16,9 @@ impl Schema { } pub fn project(&self, projection: Projection) -> VortexResult { - self.0.project(projection.indices()).map(Self) + match projection { + Projection::All => Ok(self.clone()), + Projection::Partial(indicies) => self.0.project(indicies.as_ref()).map(Self), + } } }