Skip to content

Commit

Permalink
ignore
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 committed Jul 29, 2024
1 parent 476fc48 commit d6daeb8
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
1 change: 1 addition & 0 deletions vortex-serde/src/file/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl StructLayout {
}
}

#[allow(dead_code)]
pub(crate) fn project(&self, projection: &Projection) -> StructLayout {
let mut new_children = VecDeque::with_capacity(projection.indices().len());

Expand Down
28 changes: 17 additions & 11 deletions vortex-serde/src/file/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct VortexBatchReaderBuilder<R> {
reader: R,
projection: Option<Projection>,
len: Option<u64>,
mask: Option<Array>,
take_indices: Option<Array>,
row_filter: Option<RowFilter>,
}

Expand All @@ -41,7 +41,7 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
projection: None,
row_filter: None,
len: None,
mask: None,
take_indices: None,
}
}

Expand All @@ -55,13 +55,13 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
self
}

pub fn with_mask(mut self, array: Array) -> Self {
pub fn with_take_indices(mut self, array: Array) -> Self {
// TODO(#441): Allow providing boolean masks
assert!(
array.dtype().is_int(),
"Mask arrays have to be integer arrays"
);
self.mask = Some(array);
self.take_indices = Some(array);
self
}

Expand All @@ -74,25 +74,23 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
let footer = self.read_footer().await?;

// TODO(adamg): We probably want to unify everything that is going on here into a single type and implementation
let mut layout = if let Layout::Struct(s) = footer.layout()? {
let layout = if let Layout::Struct(s) = footer.layout()? {
s
} else {
vortex_bail!("Top level layout must be a 'StructLayout'");
};
let mut dtype = if let DType::Struct(s, _) = footer.dtype()? {
let dtype = if let DType::Struct(s, _) = footer.dtype()? {
s
} else {
vortex_bail!("Top level dtype must be a 'StructDType'");
};

if let Some(projection) = self.projection.as_ref() {
layout = layout.project(projection);
dtype = dtype.project(projection.indices())?;
}

Ok(VortexBatchStream {
layout,
dtype,
projection: self.projection,
take_indices: self.take_indices,
row_filter: self.row_filter,
reader: Some(self.reader),
state: StreamingState::default(),
context: Default::default(),
Expand Down Expand Up @@ -153,9 +151,14 @@ impl<R: VortexReadAt> VortexBatchReaderBuilder<R> {
}
}

#[allow(dead_code)]
pub struct VortexBatchStream<R> {
layout: StructLayout,
dtype: StructDType,
// TODO(robert): Have identity projection
projection: Option<Projection>,
take_indices: Option<Array>,
row_filter: Option<RowFilter>,
reader: Option<R>,
state: StreamingState<R>,
context: Arc<vortex::Context>,
Expand Down Expand Up @@ -278,6 +281,7 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexBatchStream<R> {
.collect::<VortexResult<Vec<_>>>()?;

let s = StructArray::from_fields(arr.as_ref());
// take -> filter -> project
self.state = StreamingState::Init;
return Poll::Ready(Some(Ok(s.into_array())));
}
Expand All @@ -302,6 +306,7 @@ mod tests {
use crate::file::file_writer::FileWriter;

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_read_simple() {
let strings = ChunkedArray::from_iter([
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
Expand Down Expand Up @@ -344,6 +349,7 @@ mod tests {
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_read_projection() {
let strings = ChunkedArray::from_iter([
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
Expand Down
2 changes: 2 additions & 0 deletions vortex-serde/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mod test {
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_empty_index() -> VortexResult<()> {
let data = PrimitiveArray::from((0i32..3_000_000).collect_vec());
let buffer = write_ipc(data);
Expand All @@ -101,6 +102,7 @@ mod test {
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_write_read_chunked() -> VortexResult<()> {
let indices = PrimitiveArray::from(vec![
10u32, 11, 12, 13, 100_000, 2_999_999, 2_999_999, 3_000_000,
Expand Down

0 comments on commit d6daeb8

Please sign in to comment.