Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Aug 8, 2024
1 parent 9fb3327 commit c7caa7f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 15 deletions.
14 changes: 7 additions & 7 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ async fn main() {

// The formats to run against (vs the baseline)
let formats = [
Format::Arrow,
// Format::Arrow,
Format::Parquet,
Format::InMemoryVortex {
enable_pushdown: true,
},
// Format::InMemoryVortex {
// enable_pushdown: true,
// },
Format::OnDiskVortex {
enable_compression: true,
},
Format::OnDiskVortex {
enable_compression: false,
},
// Format::OnDiskVortex {
// enable_compression: false,
// },
];

// Load datasets
Expand Down
10 changes: 8 additions & 2 deletions vortex-serde/src/layouts/reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
let projection = self.projection.unwrap_or_default();
let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

let projected_dtype = match &projection {
Projection::All => footer.dtype()?,
Projection::Partial(projection) => footer.projected_dtype(projection)?,
};

let scan = Scan {
projection,
indices: self.indices,
Expand All @@ -83,11 +88,12 @@ impl<R: VortexReadAt> VortexLayoutReaderBuilder<R> {
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layouts_cache = RelativeLayoutCache::new(message_cache.clone(), footer.dtype()?);
let layouts_cache =
RelativeLayoutCache::new(message_cache.clone(), projected_dtype.clone());

let layout = footer.layout(scan.clone(), layouts_cache)?;

VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, footer.dtype()?, scan)
VortexLayoutBatchStream::try_new(self.reader, layout, message_cache, projected_dtype, scan)
}

async fn len(&self) -> usize {
Expand Down
41 changes: 39 additions & 2 deletions vortex-serde/src/layouts/reader/footer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use bytes::Bytes;
use flatbuffers::root;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_dtype::{DType, StructDType};
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::dtype::Struct_;
use vortex_flatbuffers::ReadFlatBuffer;

use crate::layouts::reader::context::LayoutDeserializer;
Expand Down Expand Up @@ -54,4 +57,38 @@ impl Footer {
.0,
)
}

pub fn projected_dtype(&self, projection: &[usize]) -> VortexResult<DType> {
let start_offset = self.leftovers_schema_offset();
let end_offset = self.leftovers_footer_offset();
let dtype_bytes = &self.leftovers[start_offset..end_offset];

let fb_schema = root::<vortex_flatbuffers::message::Schema>(dtype_bytes)?;
let fb_dtype = fb_schema
.dtype()
.ok_or_else(|| vortex_err!(InvalidSerde: "Schema missing DType"))?;

let fb_struct = fb_dtype.type__as_struct_().unwrap();
let nullability = fb_struct.nullable().into();

let (names, dtypes): (Vec<Arc<str>>, Vec<DType>) = projection
.iter()
.map(|idx| Self::read_field(fb_struct, *idx))
.collect::<VortexResult<Vec<_>>>()?
.into_iter()
.unzip();

Ok(DType::Struct(
StructDType::new(names.into(), dtypes),
nullability,
))
}

fn read_field(fb_struct: Struct_, idx: usize) -> VortexResult<(Arc<str>, DType)> {
let name = fb_struct.names().unwrap().get(idx);
let fb_dtype = fb_struct.dtypes().unwrap().get(idx);
let dtype = DType::try_from(fb_dtype)?;

Ok((name.into(), dtype))
}
}
19 changes: 15 additions & 4 deletions vortex-serde/src/layouts/reader/layouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ impl ColumnLayout {
&self,
idx: usize,
children: Vector<ForwardsUOffset<fb::Layout>>,
st_dtype: &Arc<[DType]>,
dtype: DType,
) -> VortexResult<Box<dyn Layout>> {
let layout = children.get(idx);
let dtype = st_dtype[idx].clone();

// TODO: Figure out complex nested schema projections
let mut child_scan = self.scan.clone();
child_scan.projection = Projection::All;
Expand All @@ -185,14 +185,25 @@ impl Layout for ColumnLayout {

let column_layouts = match self.scan.projection {
Projection::All => (0..fb_children.len())
.map(|idx| self.read_child(idx, fb_children, s.dtypes()))
.map(|idx| self.read_child(idx, fb_children, s.dtypes()[idx].clone()))
.collect::<VortexResult<Vec<_>>>()?,
Projection::Partial(ref v) => v
.iter()
.map(|&idx| self.read_child(idx, fb_children, s.dtypes()))
.enumerate()
.map(|(position, &projection_idx)| {
self.read_child(
projection_idx,
fb_children,
s.dtypes()[position].clone(),
)
})
.collect::<VortexResult<Vec<_>>>()?,
};

// let column_layouts = (0..s.dtypes().len())
// .map(|idx| self.read_child(idx, fb_children, s.dtypes()))
// .collect::<VortexResult<Vec<_>>>()?;

let reader = BatchReader::new(s.names().clone(), column_layouts);
self.state = ColumnLayoutState::ReadColumns(reader);
self.read()
Expand Down

0 comments on commit c7caa7f

Please sign in to comment.