Skip to content

Commit

Permalink
read
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Jul 29, 2024
1 parent d6daeb8 commit c36f6c2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
4 changes: 3 additions & 1 deletion vortex-serde/src/file/reader/filtering.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use vortex::Array;
use vortex_error::VortexResult;

use super::projections::Projection;

pub trait FilteringPredicate {
fn projection(&self) -> &Projection;
fn evaluate(&mut self, array: &Array) -> Array;
fn evaluate(&mut self, array: &Array) -> VortexResult<Array>;
}

#[derive(Default)]
pub struct RowFilter {
pub(crate) _filters: Vec<Box<dyn FilteringPredicate>>,
}
Expand Down
63 changes: 56 additions & 7 deletions vortex-serde/src/file/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
use projections::Projection;
use schema::Schema;
use vortex::array::constant::ConstantArray;
use vortex::array::struct_::StructArray;
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::{DType, StructDType};
use vortex::compute::unary::subtract_scalar;
use vortex::compute::{and, filter, search_sorted, slice, take, SearchSortedSide};
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_dtype::{match_each_integer_ptype, DType, StructDType};
use vortex_error::{vortex_bail, VortexError, VortexResult};
use vortex_scalar::Scalar;

use super::layouts::{Layout, StructLayout};
use crate::file::file_writer::MAGIC_BYTES;
Expand Down Expand Up @@ -90,10 +94,11 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
dtype,
projection: self.projection,
take_indices: self.take_indices,
row_filter: self.row_filter,
row_filter: self.row_filter.unwrap_or_default(),
reader: Some(self.reader),
state: StreamingState::default(),
context: Default::default(),
current_offset: 0,
})
}

Expand Down Expand Up @@ -158,16 +163,36 @@ pub struct VortexBatchStream<R> {
// TODO(robert): Have identity projection
projection: Option<Projection>,
take_indices: Option<Array>,
row_filter: Option<RowFilter>,
row_filter: RowFilter,
reader: Option<R>,
state: StreamingState<R>,
context: Arc<vortex::Context>,
current_offset: usize,
}

impl<R> VortexBatchStream<R> {
pub fn schema(&self) -> VortexResult<Schema> {
Ok(Schema(self.dtype.clone()))
}

fn take_batch(&mut self, batch: &Array) -> VortexResult<Array> {
let curr_offset = self.current_offset;
let indices = self.take_indices.as_ref().expect("should be there");
let left =
search_sorted(indices, curr_offset, SearchSortedSide::Left)?.to_zero_offset_index();
let right = search_sorted(indices, curr_offset + batch.len(), SearchSortedSide::Left)?
.to_zero_offset_index();

self.current_offset += batch.len();
// TODO(ngates): this is probably too heavy to run on the event loop. We should spawn
// onto a worker pool.
let indices_for_batch = slice(indices, left, right)?.into_primitive()?;
let shifted_arr = match_each_integer_ptype!(indices_for_batch.ptype(), |$T| {
subtract_scalar(&indices_for_batch.into_array(), &Scalar::from(curr_offset as $T))?
});

take(batch, &shifted_arr)
}
}

type StreamStateFuture<R> = BoxFuture<'static, VortexResult<(Vec<(Arc<str>, BytesMut, DType)>, R)>>;
Expand Down Expand Up @@ -280,10 +305,34 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexBatchStream<R> {
})
.collect::<VortexResult<Vec<_>>>()?;

let s = StructArray::from_fields(arr.as_ref());
// take -> filter -> project
let mut s = StructArray::from_fields(arr.as_ref()).into_array();

s = if self.take_indices.is_some() {
self.take_batch(&s)?
} else {
s
};

let mut current_predicate = ConstantArray::new(true, s.len()).into_array();
for pred in self.row_filter._filters.iter_mut() {
let filter_bitmap = pred.evaluate(&s)?;
current_predicate = and(&current_predicate, &filter_bitmap)?;
}

s = filter(&s, &current_predicate)?;
let projected = self
.projection
.as_ref()
.map(|p| {
StructArray::try_from(s.clone())
.unwrap()
.project(p.indices())
.unwrap()
.into_array()
})
.unwrap_or(s);
self.state = StreamingState::Init;
return Poll::Ready(Some(Ok(s.into_array())));
return Poll::Ready(Some(Ok(projected)));
}
Err(e) => return Poll::Ready(Some(Err(e))),
},
Expand Down

0 comments on commit c36f6c2

Please sign in to comment.