Skip to content

Commit

Permalink
chore: test for repeated columns in a projection (#1691)
Browse files Browse the repository at this point in the history
Resolves #712

---------

Co-authored-by: Andrew Duffy <[email protected]>
  • Loading branch information
danking and a10y authored Dec 17, 2024
1 parent 49bd8d6 commit a421d2e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 3 deletions.
74 changes: 73 additions & 1 deletion vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVari
use vortex_buffer::Buffer;
use vortex_dtype::field::Field;
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::vortex_panic;
use vortex_error::{vortex_panic, VortexResult};
use vortex_expr::{BinaryExpr, Column, Literal, Operator};
use vortex_io::VortexReadAt;

use crate::builder::initial_read::read_initial_bytes;
use crate::write::VortexFileWriter;
Expand Down Expand Up @@ -1003,3 +1004,74 @@ async fn test_pruning_with_or() {
]
);
}

#[tokio::test]
async fn test_repeated_projection() {
let strings = ChunkedArray::from_iter([
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
])
.into_array();

let single_column_array = StructArray::from_fields(&[("strings", strings.clone())])
.unwrap()
.into_array();

let expected = StructArray::from_fields(&[("strings", strings.clone()), ("strings", strings)])
.unwrap()
.into_array();

let written = VortexFileWriter::new(Vec::new())
.write_array_columns(single_column_array)
.await
.unwrap()
.finalize()
.await
.unwrap();

async fn read_all<W: VortexReadAt + Unpin>(
w: W,
projection: Projection,
) -> VortexResult<ArrayData> {
VortexReadBuilder::new(w, LayoutDeserializer::default())
.with_projection(projection)
.build()
.await?
.read_all()
.await
}

let actual = read_all(Buffer::from(written.clone()), Projection::new([0, 0]))
.await
.unwrap();

assert_eq!(
(0..actual.len())
.map(|index| scalar_at(&actual, index).unwrap())
.collect_vec(),
(0..expected.len())
.map(|index| scalar_at(&expected, index).unwrap())
.collect_vec()
);

let actual = read_all(
Buffer::from(written.clone()),
Projection::Flat(
["strings", "strings"]
.iter()
.map(|x| Field::from(x.to_string()))
.collect_vec(),
),
)
.await
.unwrap();

assert_eq!(
(0..actual.len())
.map(|index| scalar_at(&actual, index).unwrap())
.collect_vec(),
(0..expected.len())
.map(|index| scalar_at(&expected, index).unwrap())
.collect_vec()
);
}
6 changes: 6 additions & 0 deletions vortex-io/src/aligned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ where

/// Extend this mutable buffer with the contents of the provided slice.
pub fn extend_from_slice(&mut self, slice: &[u8]) {
let bytes_remaining = self.capacity - self.len();
assert!(
slice.len() <= bytes_remaining,
"extend_from_slice cannot reallocate"
);

// The internal `buf` is padded, so appends will land after the padded region.
self.buf.extend_from_slice(slice)
}
Expand Down
8 changes: 6 additions & 2 deletions vortex-io/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::future::{self, Future};
use std::io;
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use vortex_buffer::Buffer;
use vortex_error::{vortex_err, VortexUnwrap};

use crate::aligned::AlignedBytesMut;
use crate::ALIGNMENT;

/// A trait for types that support asynchronous reads.
///
/// References to the type must be safe to [share across threads][Send], but spawned
Expand Down Expand Up @@ -73,7 +76,8 @@ impl VortexReadAt for Buffer {
vortex_err!("unexpected eof"),
)))
} else {
let mut buffer = BytesMut::with_capacity(len.try_into().vortex_unwrap());
let mut buffer =
AlignedBytesMut::<ALIGNMENT>::with_capacity(len.try_into().vortex_unwrap());
unsafe {
buffer.set_len(len.try_into().vortex_unwrap());
}
Expand Down

0 comments on commit a421d2e

Please sign in to comment.