Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 8, 2024
1 parent c33819a commit 4cf5ab9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
8 changes: 4 additions & 4 deletions vortex-serde/src/layouts/reader/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl BatchReader {
}

pub fn read(&mut self) -> VortexResult<Option<ReadResult>> {
let mut rr1 = Vec::new();
let mut messages = Vec::new();
for (i, child_array) in self
.arrays
.iter_mut()
Expand All @@ -35,7 +35,7 @@ impl BatchReader {
match self.children[i].read()? {
Some(rr) => match rr {
ReadResult::GetMsgs(r1) => {
rr1.extend(r1);
messages.extend(r1);
}
ReadResult::Batch(a) => *child_array = Some(a),
},
Expand All @@ -49,7 +49,7 @@ impl BatchReader {
}
}

if rr1.is_empty() {
if messages.is_empty() {
let child_arrays = mem::replace(&mut self.arrays, vec![None; self.children.len()])
.into_iter()
.map(|a| a.unwrap());
Expand All @@ -58,7 +58,7 @@ impl BatchReader {
.into_array(),
)));
} else {
Ok(Some(ReadResult::GetMsgs(rr1)))
Ok(Some(ReadResult::GetMsgs(messages)))
}
}
}
22 changes: 14 additions & 8 deletions vortex-serde/src/layouts/reader/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex::Context;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use super::projections::Projection;
use crate::flatbuffers::footer as fb;
use crate::layouts::reader::batch::BatchReader;
use crate::layouts::reader::buffered::BufferedReader;
Expand Down Expand Up @@ -129,7 +130,6 @@ impl ColumnLayout {
fb_bytes: Bytes,
fb_loc: usize,
scan: Scan,

layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> Self {
Expand Down Expand Up @@ -166,13 +166,19 @@ impl Layout for ColumnLayout {
.into_iter()
.enumerate()
.zip(s.dtypes().iter().cloned())
.map(|((idx, child), dtype)| {
self.layout_serde.read_layout(
self.fb_bytes.clone(),
child._tab.loc(),
self.scan.clone(),
self.message_cache.relative(idx as u16, dtype),
)
.filter_map(|((idx, child), dtype)| {
self.scan.projection.contains_idx(idx).then(|| {
// TODO: This is needed to support more complex nested layouts
let mut child_scan = self.scan.clone();
child_scan.projection = Projection::All;

self.layout_serde.read_layout(
self.fb_bytes.clone(),
child._tab.loc(),
child_scan,
self.message_cache.relative(idx as u16, dtype),
)
})
})
.collect::<VortexResult<Vec<_>>>()?;

Expand Down
7 changes: 7 additions & 0 deletions vortex-serde/src/layouts/reader/projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ impl Projection {
pub fn new(indices: impl AsRef<[usize]>) -> Self {
Self::Partial(Vec::from(indices.as_ref()))
}

pub fn contains_idx(&self, idx: usize) -> bool {
match self {
Projection::All => true,
Projection::Partial(idxs) => idxs.contains(&idx),
}
}
}

impl From<Vec<usize>> for Projection {
Expand Down
15 changes: 3 additions & 12 deletions vortex-serde/src/layouts/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use bytes::{Bytes, BytesMut};
use futures::Stream;
use futures_util::future::BoxFuture;
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use vortex::array::StructArray;
use vortex::compute::unary::subtract_scalar;
use vortex::compute::{filter, filter_indices, search_sorted, slice, take, SearchSortedSide};
use vortex::{Array, IntoArray, IntoArrayVariant};
Expand All @@ -16,7 +15,6 @@ use vortex_error::{VortexError, VortexResult};
use vortex_scalar::Scalar;

use crate::io::VortexReadAt;
use crate::layouts::reader::projections::Projection;
use crate::layouts::reader::schema::Schema;
use crate::layouts::reader::{Layout, LayoutMessageCache, MessageId, ReadResult, Scan};
use crate::writer::ByteRange;
Expand Down Expand Up @@ -94,10 +92,10 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexLayoutBatchStrea
StreamingState::Init => {
if let Some(read) = self.layout.read()? {
match read {
ReadResult::GetMsgs(r1) => {
ReadResult::GetMsgs(messages) => {
let reader =
mem::take(&mut self.reader).expect("Invalid state transition");
let read_future = read_ranges(reader, r1).boxed();
let read_future = read_ranges(reader, messages).boxed();
self.state = StreamingState::Reading(read_future);
}
ReadResult::Batch(a) => self.state = StreamingState::Decoding(a),
Expand All @@ -117,15 +115,8 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexLayoutBatchStrea
batch = filter(&batch, &mask)?;
}

let projected = match &self.scan.projection {
Projection::All => batch,
Projection::Partial(indices) => StructArray::try_from(batch.clone())?
.project(indices.as_ref())?
.into_array(),
};

self.state = StreamingState::Init;
return Poll::Ready(Some(Ok(projected)));
return Poll::Ready(Some(Ok(batch)));
}
StreamingState::Reading(f) => match ready!(f.poll_unpin(cx)) {
Ok((read, buffers)) => {
Expand Down

0 comments on commit 4cf5ab9

Please sign in to comment.