Skip to content

Commit

Permalink
projection work and basic test
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Jul 29, 2024
1 parent 8349398 commit 370a9d0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 20 deletions.
11 changes: 11 additions & 0 deletions vortex-serde/src/file/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use flatbuffers::{FlatBufferBuilder, WIPOffset};
use vortex_error::{VortexError, VortexResult};
use vortex_flatbuffers::WriteFlatBuffer;

use super::reader::projections::Projection;
use crate::flatbuffers::footer as fb;
use crate::writer::ByteRange;

Expand Down Expand Up @@ -189,6 +190,16 @@ impl StructLayout {
children: child_ranges,
}
}

pub(crate) fn project(&self, projection: &Projection) -> StructLayout {
let mut new_childern = VecDeque::with_capacity(projection.indices().len());

for &idx in projection.indices() {
new_childern.push_back(self.children[idx].clone());
}

StructLayout::new(new_childern)
}
}

impl TryFrom<fb::NestedLayout<'_>> for Layout {
Expand Down
96 changes: 76 additions & 20 deletions vortex-serde/src/file/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use projections::Projection;
use schema::Schema;
use vortex::array::struct_::StructArray;
use vortex::{Array, IntoArray};
use vortex_dtype::DType;
use vortex_dtype::{DType, StructDType};
use vortex_error::{vortex_bail, VortexError, VortexResult};

use super::layouts::Layout;
use super::layouts::{Layout, StructLayout};
use crate::file::file_writer::MAGIC_BYTES;
use crate::file::footer::Footer;
use crate::io::VortexReadAt;
Expand Down Expand Up @@ -52,13 +52,26 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
pub async fn build(mut self) -> VortexResult<VortexStream<R>> {
let footer = self.read_footer().await?;

let layout = footer.layout()?;
let dtype = footer.dtype()?;
// TODO(adamg): We probably want to unify everything that is going on here into a single type and implementation
let mut layout = if let Layout::Struct(s) = footer.layout()? {
s
} else {
vortex_bail!("Top level layout must be a 'StructLayout'");
};
let mut dtype = if let DType::Struct(s, _) = footer.dtype()? {
s
} else {
vortex_bail!("Top level dtype must be a 'StructDType'");
};

if let Some(projection) = self.projection.as_ref() {
layout = layout.project(projection);
dtype = dtype.project(projection.indices())?;
}

Ok(VortexStream {
layout,
dtype,
projection: self.projection,
reader: Some(self.reader),
state: StreamingState::default(),
context: Default::default(),
Expand Down Expand Up @@ -120,22 +133,16 @@ impl<R: VortexReadAt> VortexStreamBuilder<R> {
}

pub struct VortexStream<R> {
layout: Layout,
dtype: DType,
layout: StructLayout,
dtype: StructDType,
reader: Option<R>,
projection: Option<Projection>,
state: StreamingState<R>,
context: Arc<vortex::Context>,
}

impl<R> VortexStream<R> {
pub fn schema(&self) -> VortexResult<Schema> {
let struct_schema = self.dtype.as_struct().cloned().unwrap();
let dtype = match self.projection.as_ref() {
Some(projection) => struct_schema.project(projection.indices())?,
None => struct_schema,
};
Ok(Schema(dtype))
Ok(Schema(self.dtype.clone()))
}
}

Expand Down Expand Up @@ -173,10 +180,8 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexStream<R> {
match &mut self.state {
StreamingState::Init => {
let mut layouts = Vec::default();
let struct_types = self.dtype.as_struct().unwrap().clone();
let top_level = self.layout.as_struct_mut().unwrap();

for c_layout in top_level.children.iter_mut() {
for c_layout in self.layout.children.iter_mut() {
let layout = c_layout.as_chunked_mut().unwrap();

if layout.children.len() == 1 {
Expand All @@ -186,8 +191,8 @@ impl<R: VortexReadAt + Unpin + Send + 'static> Stream for VortexStream<R> {
}
}

let names = struct_types.names().iter();
let types = struct_types.dtypes().iter().cloned();
let names = self.dtype.names().iter();
let types = self.dtype.dtypes().iter().cloned();

let layouts = layouts
.into_iter()
Expand Down Expand Up @@ -263,7 +268,7 @@ mod tests {
use vortex::array::struct_::StructArray;
use vortex::array::varbin::VarBinArray;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex::{ArrayDType, IntoArray, IntoArrayVariant};

use super::*;
use crate::file::file_writer::FileWriter;
Expand Down Expand Up @@ -309,4 +314,55 @@ mod tests {

assert_eq!(cnt, 2);
}

#[tokio::test]
async fn test_read_projection() {
let strings = ChunkedArray::from_iter([
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(),
])
.into_array();

let numbers = ChunkedArray::from_iter([
PrimitiveArray::from(vec![1u32, 2, 3, 4]).into_array(),
PrimitiveArray::from(vec![5u32, 6, 7, 8]).into_array(),
])
.into_array();

let st = StructArray::try_new(
["strings".into(), "numbers".into()].into(),
vec![strings, numbers],
8,
Validity::NonNullable,
)
.unwrap();
let buf = Vec::new();
let mut writer = FileWriter::new(buf);
writer = writer.write_array_columns(st.into_array()).await.unwrap();
let written = writer.finalize().await.unwrap();

let mut builder = VortexStreamBuilder::new(written);
let layout = builder.read_footer().await.unwrap().layout().unwrap();
dbg!(layout);

let mut stream = builder
.with_projection(Projection::new([0]))
.build()
.await
.unwrap();
let mut cnt = 0;

while let Some(array) = stream.next().await {
let array = array.unwrap();
assert_eq!(array.len(), 4);

let array = array.into_struct().unwrap();
let struct_dtype = array.dtype().as_struct().unwrap();
assert_eq!(struct_dtype.dtypes().len(), 1);
assert_eq!(struct_dtype.names()[0].as_ref(), "strings");
cnt += 1;
}

assert_eq!(cnt, 2);
}
}
1 change: 1 addition & 0 deletions vortex-serde/src/file/reader/projections.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#[derive(Clone)]
pub struct Projection {
indices: Vec<usize>,
}
Expand Down

0 comments on commit 370a9d0

Please sign in to comment.