Skip to content

Commit

Permalink
Pass typed flat buffers into layout builders (#1563)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 5, 2024
1 parent a1ecb12 commit 5b3b5d7
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 135 deletions.
9 changes: 4 additions & 5 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use vortex_array::arrow::infer_schema;
use vortex_array::Context;
use vortex_file::metadata::fetch_metadata;
use vortex_file::{
read_initial_bytes, read_layout_from_initial, LayoutContext, LayoutDeserializer,
LayoutMessageCache, RelativeLayoutCache, Scan, VORTEX_FILE_EXTENSION,
read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache,
Scan, VORTEX_FILE_EXTENSION,
};
use vortex_io::{IoDispatcher, ObjectStoreReadAt};

Expand Down Expand Up @@ -106,9 +106,8 @@ impl FileFormat for VortexFormat {
initial_read.lazy_dtype().into(),
);

let root_layout = read_layout_from_initial(
&initial_read,
&layout_deserializer,
let root_layout = layout_deserializer.read_layout(
initial_read.fb_layout(),
Scan::empty(),
relative_message_cache,
)?;
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@
//! example, the VortexFileArrayStream reads an array, evaluates the row filter, and then reads the
//! array again with the filter mask.
//!
//! [`read_layout_from_initial`] produces a [LayoutReader] which assembles one or more Vortex arrays
//! by reading the serialized data and metadata.
//! A [LayoutReader] then assembles one or more Vortex arrays by reading the serialized data
//! and metadata.
//!
//! # Apache Arrow
//!
Expand Down
16 changes: 1 addition & 15 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_flatbuffers::{footer, message};
use vortex_io::VortexReadAt;

use crate::{
LayoutDeserializer, LayoutReader, LazyDType, RelativeLayoutCache, Scan, EOF_SIZE,
INITIAL_READ_SIZE, MAGIC_BYTES, VERSION,
};
use crate::{LazyDType, EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION};

#[derive(Debug)]
pub struct InitialRead {
Expand Down Expand Up @@ -56,17 +53,6 @@ impl InitialRead {
}
}

pub fn read_layout_from_initial(
initial_read: &InitialRead,
layout_serde: &LayoutDeserializer,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>> {
let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range());
let fb_loc = initial_read.fb_layout()._tab.loc();
layout_serde.read_layout(layout_bytes, fb_loc, scan, message_cache)
}

pub async fn read_initial_bytes<R: VortexReadAt>(
read: &R,
file_size: u64,
Expand Down
12 changes: 5 additions & 7 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::{Arc, RwLock};

use initial_read::{read_initial_bytes, read_layout_from_initial};
use initial_read::read_initial_bytes;
use vortex_array::{ArrayDType, ArrayData};
use vortex_error::VortexResult;
use vortex_expr::Select;
Expand Down Expand Up @@ -129,9 +129,8 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
};

let message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layout_reader = read_layout_from_initial(
&initial_read,
&self.layout_serde,
let layout_reader = self.layout_serde.read_layout(
initial_read.fb_layout(),
Scan::new(match self.projection {
Projection::All => None,
Projection::Flat(p) => Some(Arc::new(Select::include(p))),
Expand All @@ -142,9 +141,8 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
let filter_reader = self
.row_filter
.map(|row_filter| {
read_layout_from_initial(
&initial_read,
&self.layout_serde,
self.layout_serde.read_layout(
initial_read.fb_layout(),
Scan::new(Some(Arc::new(row_filter))),
RelativeLayoutCache::new(message_cache.clone(), lazy_dtype),
)
Expand Down
15 changes: 4 additions & 11 deletions vortex-file/src/read/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use bytes::Bytes;
use vortex_array::aliases::hash_map::HashMap;
use vortex_array::Context;
use vortex_error::{vortex_err, VortexResult};
Expand All @@ -24,8 +23,7 @@ pub trait Layout: Debug + Send + Sync {

fn reader(
&self,
fb_bytes: Bytes,
fb_loc: usize,
layout: fb::Layout,
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
Expand Down Expand Up @@ -73,20 +71,15 @@ impl LayoutDeserializer {

pub fn read_layout(
&self,
fb_bytes: Bytes,
fb_loc: usize,
layout: fb::Layout,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>> {
let fb_layout = unsafe {
let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
fb::Layout::init_from_table(tab)
};
let layout_id = LayoutId(fb_layout.encoding());
let layout_id = LayoutId(layout.encoding());
self.layout_ctx
.lookup_layout(&layout_id)
.ok_or_else(|| vortex_err!("Unknown layout definition {layout_id}"))?
.reader(fb_bytes, fb_loc, scan, self.clone(), message_cache)
.reader(layout, scan, self.clone(), message_cache)
}

pub(crate) fn ctx(&self) -> Arc<Context> {
Expand Down
53 changes: 18 additions & 35 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::BTreeSet;
use std::sync::{Arc, OnceLock, RwLock};

use bytes::Bytes;
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
use vortex_array::array::ChunkedArray;
Expand All @@ -11,7 +10,7 @@ use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect as _, VortexResult};
use vortex_expr::Select;
use vortex_flatbuffers::footer;
use vortex_flatbuffers::footer as fb;

use crate::layouts::RangedLayoutReader;
use crate::pruning::PruningPredicate;
Expand All @@ -36,16 +35,14 @@ impl Layout for ChunkedLayout {

fn reader(
&self,
fb_bytes: Bytes,
fb_loc: usize,
layout: fb::Layout,
scan: Scan,
layout_builder: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>> {
Ok(Box::new(
ChunkedLayoutBuilder {
fb_bytes,
fb_loc,
layout,
scan,
layout_builder,
message_cache,
Expand All @@ -61,35 +58,26 @@ const METADATA_LAYOUT_PART_ID: LayoutPartId = 0;
///
/// First child in the list is the metadata table
/// Subsequent children are consecutive chunks of this layout
struct ChunkedLayoutBuilder {
fb_bytes: Bytes,
fb_loc: usize,
struct ChunkedLayoutBuilder<'a> {
layout: fb::Layout<'a>,
scan: Scan,
layout_builder: LayoutDeserializer,
message_cache: RelativeLayoutCache,
}

impl ChunkedLayoutBuilder {
fn flatbuffer(&self) -> footer::Layout {
unsafe {
let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
footer::Layout::init_from_table(tab)
}
}

impl ChunkedLayoutBuilder<'_> {
fn metadata_layout(&self) -> VortexResult<Option<Box<dyn LayoutReader>>> {
self.flatbuffer()
self.layout
.metadata()
.map(|m| {
let set_stats = stats_from_bitset_bytes(m.bytes());
let metadata_fb = self
.flatbuffer()
.layout
.children()
.ok_or_else(|| vortex_err!("Must have children if layout has metadata"))?
.get(0);
self.layout_builder.read_layout(
self.fb_bytes.clone(),
metadata_fb._tab.loc(),
metadata_fb,
Scan::new(Some(Arc::new(Select::include(
set_stats.iter().map(|s| s.to_string().into()).collect(),
)))),
Expand All @@ -105,13 +93,13 @@ impl ChunkedLayoutBuilder {
.transpose()
}

fn children(&self) -> impl Iterator<Item = (usize, footer::Layout)> {
self.flatbuffer()
fn children(&self) -> impl Iterator<Item = (usize, fb::Layout)> {
self.layout
.children()
.unwrap_or_default()
.iter()
.enumerate()
.skip(if self.flatbuffer().metadata().is_some() {
.skip(if self.layout.metadata().is_some() {
1
} else {
0
Expand All @@ -134,8 +122,7 @@ impl ChunkedLayoutBuilder {
.zip_eq(self.children_ranges())
.map(|((i, c), (begin, end))| {
let layout = self.layout_builder.read_layout(
self.fb_bytes.clone(),
c._tab.loc(),
c,
self.scan.clone(),
self.message_cache
.relative(i as u16, self.message_cache.dtype().clone()),
Expand Down Expand Up @@ -420,7 +407,7 @@ mod tests {

use arrow_buffer::BooleanBufferBuilder;
use bytes::Bytes;
use flatbuffers::{root_unchecked, FlatBufferBuilder};
use flatbuffers::{root, FlatBufferBuilder};
use futures_util::TryStreamExt;
use vortex_array::array::{BoolArray, ChunkedArray, PrimitiveArray};
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
Expand Down Expand Up @@ -474,30 +461,26 @@ mod tests {
let written = writer.into_inner();

let mut fb = FlatBufferBuilder::new();
// FIXME(ngates): impl From<LayoutSpec> for fb::Layout
let chunked_layout = write::LayoutSpec::chunked(flat_layouts.into(), len as u64, None);
let flat_buf = chunked_layout.write_flatbuffer(&mut fb);
fb.finish_minimal(flat_buf);
let fb_bytes = Bytes::copy_from_slice(fb.finished_data());

let fb_loc = (unsafe { root_unchecked::<footer::Layout>(&fb_bytes) })
._tab
.loc();
let layout = root::<footer::Layout>(&fb_bytes).unwrap();

let dtype = Arc::new(LazyDType::from_dtype(PType::I32.into()));
let layout_builder = LayoutDeserializer::default();
(
ChunkedLayoutBuilder {
fb_bytes: fb_bytes.clone(),
fb_loc,
layout,
scan,
layout_builder: layout_builder.clone(),
message_cache: RelativeLayoutCache::new(cache.clone(), dtype.clone()),
}
.build()
.unwrap(),
ChunkedLayoutBuilder {
fb_bytes,
fb_loc,
layout,
scan: Scan::new(None),
layout_builder,
message_cache: RelativeLayoutCache::new(cache, dtype),
Expand Down
Loading

0 comments on commit 5b3b5d7

Please sign in to comment.