diff --git a/vortex-serde/src/file/reader/batch.rs b/vortex-serde/src/file/reader/batch.rs index 5c21a613ae..186bb0c0ca 100644 --- a/vortex-serde/src/file/reader/batch.rs +++ b/vortex-serde/src/file/reader/batch.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use vortex::array::struct_::StructArray; -use vortex::{Array, IntoArray}; +use vortex::{Array, Context, IntoArray}; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -28,10 +28,7 @@ impl BatchReader { schema, readers: column_info .map(|(name, dtype, layouts)| { - ( - name.clone(), - ColumnReader::new(name.clone(), dtype.clone(), layouts), - ) + (name.clone(), ColumnReader::new(dtype.clone(), layouts)) }) .collect(), } @@ -41,11 +38,7 @@ impl BatchReader { self.readers.values().all(|c| c.is_empty()) } - pub async fn load( - &mut self, - batch_size: usize, - context: Arc, - ) -> VortexResult<()> { + pub async fn load(&mut self, batch_size: usize, context: Arc) -> VortexResult<()> { for column_reader in self.readers.values_mut() { column_reader .load(&mut self.reader, batch_size, context.clone()) diff --git a/vortex-serde/src/file/reader/column.rs b/vortex-serde/src/file/reader/column.rs index 50ef466df4..d0b282516b 100644 --- a/vortex-serde/src/file/reader/column.rs +++ b/vortex-serde/src/file/reader/column.rs @@ -1,11 +1,10 @@ -use std::cmp::Ordering; use std::collections::VecDeque; use std::sync::Arc; use bytes::{Bytes, BytesMut}; use vortex::array::chunked::ChunkedArray; use vortex::compute::slice; -use vortex::{Array, IntoArray}; +use vortex::{Array, Context, IntoArray}; use vortex_dtype::DType; use vortex_error::{VortexError, VortexResult}; @@ -14,17 +13,14 @@ use crate::io::VortexReadAt; use crate::{ArrayBufferReader, ReadResult}; pub(super) struct ColumnReader { - #[allow(dead_code)] - name: Arc, dtype: DType, layouts: VecDeque, arrays: VecDeque, } impl ColumnReader { - pub fn new(name: Arc, dtype: DType, layouts: VecDeque) -> Self { + pub fn new(dtype: DType, layouts: VecDeque) -> Self { Self { - name, dtype, layouts, arrays: Default::default(), @@ -43,13 +39,9 @@ impl ColumnReader { &mut self, reader: &mut R, batch_size: usize, - context: Arc, + context: Arc, ) -> VortexResult<()> { - loop { - if self.buffered_row_count() >= batch_size { - return Ok(()); - } - + while self.buffered_row_count() < batch_size { if let Some(layout) = self.layouts.pop_front() { let byte_range = layout.as_flat().unwrap().range; let mut buffer = BytesMut::with_capacity(byte_range.len()); @@ -74,13 +66,15 @@ impl ColumnReader { self.arrays.push_back(array); } else { - break Ok(()); + break; } } + + Ok(()) } pub fn read_rows(&mut self, mut rows_needed: usize) -> VortexResult> { - if self.buffered_row_count() == 0 && self.layouts.is_empty() { + if self.is_empty() { return Ok(None); } @@ -90,26 +84,21 @@ impl ColumnReader { let mut result = Vec::default(); - loop { - if rows_needed == 0 { - break; - } - + while rows_needed != 0 { match self.arrays.pop_front() { None => break, - Some(array) => match array.len().cmp(&rows_needed) { - Ordering::Greater => { + Some(array) => { + if array.len() > rows_needed { let taken = slice(&array, 0, rows_needed)?; let leftover = slice(&array, rows_needed, array.len())?; self.arrays.push_front(leftover); rows_needed -= taken.len(); result.push(taken); - } - Ordering::Equal | Ordering::Less => { + } else { rows_needed -= array.len(); result.push(array); } - }, + } } }