Skip to content

Commit

Permalink
CR notes
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 1, 2024
1 parent bda8ea0 commit aeda6ce
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 34 deletions.
13 changes: 3 additions & 10 deletions vortex-serde/src/file/reader/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use vortex::array::struct_::StructArray;
use vortex::{Array, IntoArray};
use vortex::{Array, Context, IntoArray};
use vortex_dtype::DType;
use vortex_error::VortexResult;

Expand All @@ -28,10 +28,7 @@ impl<R: VortexReadAt> BatchReader<R> {
schema,
readers: column_info
.map(|(name, dtype, layouts)| {
(
name.clone(),
ColumnReader::new(name.clone(), dtype.clone(), layouts),
)
(name.clone(), ColumnReader::new(dtype.clone(), layouts))
})
.collect(),
}
Expand All @@ -41,11 +38,7 @@ impl<R: VortexReadAt> BatchReader<R> {
self.readers.values().all(|c| c.is_empty())
}

pub async fn load(
&mut self,
batch_size: usize,
context: Arc<vortex::Context>,
) -> VortexResult<()> {
pub async fn load(&mut self, batch_size: usize, context: Arc<Context>) -> VortexResult<()> {
for column_reader in self.readers.values_mut() {
column_reader
.load(&mut self.reader, batch_size, context.clone())
Expand Down
37 changes: 13 additions & 24 deletions vortex-serde/src/file/reader/column.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use vortex::array::chunked::ChunkedArray;
use vortex::compute::slice;
use vortex::{Array, IntoArray};
use vortex::{Array, Context, IntoArray};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};

Expand All @@ -14,17 +13,14 @@ use crate::io::VortexReadAt;
use crate::{ArrayBufferReader, ReadResult};

pub(super) struct ColumnReader {
#[allow(dead_code)]
name: Arc<str>,
dtype: DType,
layouts: VecDeque<Layout>,
arrays: VecDeque<Array>,
}

impl ColumnReader {
pub fn new(name: Arc<str>, dtype: DType, layouts: VecDeque<Layout>) -> Self {
pub fn new(dtype: DType, layouts: VecDeque<Layout>) -> Self {
Self {
name,
dtype,
layouts,
arrays: Default::default(),
Expand All @@ -43,13 +39,9 @@ impl ColumnReader {
&mut self,
reader: &mut R,
batch_size: usize,
context: Arc<vortex::Context>,
context: Arc<Context>,
) -> VortexResult<()> {
loop {
if self.buffered_row_count() >= batch_size {
return Ok(());
}

while self.buffered_row_count() < batch_size {
if let Some(layout) = self.layouts.pop_front() {
let byte_range = layout.as_flat().unwrap().range;
let mut buffer = BytesMut::with_capacity(byte_range.len());
Expand All @@ -74,13 +66,15 @@ impl ColumnReader {

self.arrays.push_back(array);
} else {
break Ok(());
break;
}
}

Ok(())
}

pub fn read_rows(&mut self, mut rows_needed: usize) -> VortexResult<Option<Array>> {
if self.buffered_row_count() == 0 && self.layouts.is_empty() {
if self.is_empty() {
return Ok(None);
}

Expand All @@ -90,26 +84,21 @@ impl ColumnReader {

let mut result = Vec::default();

loop {
if rows_needed == 0 {
break;
}

while rows_needed != 0 {
match self.arrays.pop_front() {
None => break,
Some(array) => match array.len().cmp(&rows_needed) {
Ordering::Greater => {
Some(array) => {
if array.len() > rows_needed {
let taken = slice(&array, 0, rows_needed)?;
let leftover = slice(&array, rows_needed, array.len())?;
self.arrays.push_front(leftover);
rows_needed -= taken.len();
result.push(taken);
}
Ordering::Equal | Ordering::Less => {
} else {
rows_needed -= array.len();
result.push(array);
}
},
}
}
}

Expand Down

0 comments on commit aeda6ce

Please sign in to comment.