Skip to content

Commit

Permalink
mask
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Jul 29, 2024
1 parent 4accbe3 commit 0f4a5bc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::chunked_reader::ChunkedArrayReader;
use vortex_serde::file::reader::VortexStreamBuilder;
use vortex_serde::file::reader::VortexBatchReaderBuilder;
use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite};
use vortex_serde::writer::ArrayWriter;
use vortex_serde::MessageReader;
Expand Down Expand Up @@ -180,7 +180,7 @@ pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Arr
let indices_array = indices.to_vec().into_array();

let file = TokioAdapter(tokio::fs::File::open(path).await?);
let stream = VortexStreamBuilder::new(file)
let stream = VortexBatchReaderBuilder::new(file)
.with_length(len)
.build()
.await
Expand Down
32 changes: 22 additions & 10 deletions vortex-serde/src/file/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::{ready, FutureExt, Stream};
use projections::Projection;
use schema::Schema;
use vortex::array::struct_::StructArray;
use vortex::{Array, IntoArray};
use vortex::{Array, ArrayDType, IntoArray};
use vortex_dtype::{DType, StructDType};
use vortex_error::{vortex_bail, VortexError, VortexResult};

Expand All @@ -21,13 +21,14 @@ use crate::{ArrayBufferReader, ReadResult};
pub mod projections;
pub mod schema;

pub struct VortexStreamBuilder<R> {
pub struct VortexBatchReaderBuilder<R> {
reader: R,
projection: Option<Projection>,
len: Option<u64>,
mask: Option<Array>,
}

impl<R: VortexReadAt> VortexStreamBuilder<R> {
impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024;
const FOOTER_TRAILER_SIZE: usize = 20;

Expand All @@ -36,6 +37,7 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
reader,
projection: None,
len: None,
mask: None,
}
}

Expand All @@ -49,7 +51,17 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
self
}

pub async fn build(mut self) -> VortexResult<VortexStream<R>> {
pub fn with_mask(mut self, array: Array) -> Self {
// TODO(#441): Allow providing boolean masks
assert!(
array.dtype().is_int(),
"Mask arrays have to be integer arrays"
);
self.mask = Some(array);
self
}

pub async fn build(mut self) -> VortexResult<VortexBatchStream<R>> {
let footer = self.read_footer().await?;

// TODO(adamg): We probably want to unify everything that is going on here into a single type and implementation
Expand All @@ -69,7 +81,7 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
dtype = dtype.project(projection.indices())?;
}

Ok(VortexStream {
Ok(VortexBatchStream {
layout,
dtype,
reader: Some(self.reader),
Expand Down Expand Up @@ -132,15 +144,15 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
}
}

pub struct VortexStream<R> {
pub struct VortexBatchStream<R> {
layout: StructLayout,
dtype: StructDType,
reader: Option<R>,
state: StreamingState<R>,
context: Arc<vortex::Context>,
}

impl<R> VortexStream<R> {
impl<R> VortexBatchStream<R> {
pub fn schema(&self) -> VortexResult<Schema> {
Ok(Schema(self.dtype.clone()))
}
Expand Down Expand Up @@ -172,7 +184,7 @@ impl ColumnInfo {
}
}

impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexStream<R> {
impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexBatchStream<R> {
type Item = VortexResult<Array>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -299,7 +311,7 @@ mod tests {
writer = writer.write_array_columns(st.into_array()).await.unwrap();
let written = writer.finalize().await.unwrap();

let mut builder = VortexStreamBuilder::new(written);
let mut builder = VortexBatchReaderBuilder::new(written);
let layout = builder.read_footer().await.unwrap().layout().unwrap();
dbg!(layout);

Expand Down Expand Up @@ -341,7 +353,7 @@ mod tests {
writer = writer.write_array_columns(st.into_array()).await.unwrap();
let written = writer.finalize().await.unwrap();

let mut builder = VortexStreamBuilder::new(written);
let mut builder = VortexBatchReaderBuilder::new(written);
let layout = builder.read_footer().await.unwrap().layout().unwrap();
dbg!(layout);

Expand Down

0 comments on commit 0f4a5bc

Please sign in to comment.