diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 0252a3bc35..5f33e0b3a0 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -126,7 +126,8 @@ impl ArrayView { .iter() .take(idx) .map(|child| Self::cumulative_nbuffers(child)) - .sum(); + .sum::() + + self.has_buffer() as usize; let buffer_count = Self::cumulative_nbuffers(child); Some(Self { diff --git a/vortex-serde/src/stream_writer/mod.rs b/vortex-serde/src/stream_writer/mod.rs index 382ab1a73a..00def6d808 100644 --- a/vortex-serde/src/stream_writer/mod.rs +++ b/vortex-serde/src/stream_writer/mod.rs @@ -11,6 +11,9 @@ use vortex_error::VortexResult; use crate::io::VortexWrite; use crate::MessageWriter; +#[cfg(test)] +mod tests; + pub struct StreamArrayWriter { msgs: MessageWriter, diff --git a/vortex-serde/src/stream_writer/tests.rs b/vortex-serde/src/stream_writer/tests.rs new file mode 100644 index 0000000000..a7632e5702 --- /dev/null +++ b/vortex-serde/src/stream_writer/tests.rs @@ -0,0 +1,36 @@ +use std::io::Cursor; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::types::Int32Type; +use arrow_array::PrimitiveArray; +use vortex::arrow::FromArrowArray; +use vortex::stream::ArrayStreamExt; +use vortex::{Array, Context, IntoCanonical}; + +use crate::stream_reader::StreamArrayReader; +use crate::stream_writer::StreamArrayWriter; + +#[tokio::test] +async fn broken_data() { + let arrow_arr: PrimitiveArray = [Some(1), Some(2), Some(3), None].iter().collect(); + let vortex_arr = Array::from_arrow(&arrow_arr, true); + let written = StreamArrayWriter::new(Vec::new()) + .write_array(vortex_arr) + .await + .unwrap() + .into_inner(); + let reader = StreamArrayReader::try_new(Cursor::new(written), Arc::new(Context::default())) + .await + .unwrap(); + let arr = reader + .load_dtype() + .await + .unwrap() + .into_array_stream() + .collect_chunked() + .await + .unwrap(); + let round_tripped = arr.into_canonical().unwrap().into_arrow(); + assert_eq!(&arrow_arr, round_tripped.as_primitive::()); +}