Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: File row splits #1709

Open
wants to merge 24 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<Ar
);

let mut batches = vec![];
let mut stream = builder.build().await?;
let mut stream = builder.build().await?.into_stream();
while let Some(batch) = stream.next().await {
batches.push(batch?.into_arrow()?);
}
Expand Down
2 changes: 2 additions & 0 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
.with_io_dispatcher(DISPATCHER.clone())
.build()
.await?
.into_stream()
.read_all()
.await
}
Expand Down Expand Up @@ -127,6 +128,7 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
.with_indices(ArrayData::from(indices.to_vec()))
.build()
.await?
.into_stream()
.read_all()
.await
// For equivalence.... we decompress to make sure we're not cheating too much.
Expand Down
6 changes: 3 additions & 3 deletions pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vortex::dtype::DType;
use vortex::error::VortexResult;
use vortex::file::{
read_initial_bytes, LayoutContext, LayoutDeserializer, Projection, RowFilter,
VortexFileArrayStream, VortexReadBuilder, VortexRecordBatchReader,
VortexReadArrayStream, VortexReadBuilder, VortexRecordBatchReader,
};
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt};
use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT;
Expand All @@ -27,7 +27,7 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
projection: Projection,
row_filter: Option<RowFilter>,
indices: Option<ArrayData>,
) -> VortexResult<VortexFileArrayStream<T>> {
) -> VortexResult<VortexReadArrayStream<T>> {
let mut builder = VortexReadBuilder::new(
reader,
LayoutDeserializer::new(
Expand All @@ -45,7 +45,7 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
builder = builder.with_indices(indices);
}

builder.build().await
Ok(builder.build().await?.into_stream())
}

pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl FileOpener for VortexFileOpener {
builder
.build()
.await?
.into_stream()
.map_ok(RecordBatch::try_from)
.map(|r| r.and_then(|inner| inner))
.map_err(|e| e.into()),
Expand Down
8 changes: 3 additions & 5 deletions vortex-dtype/src/ptype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,7 @@ macro_rules! match_each_integer_ptype {
PType::U16 => __with__! { u16 },
PType::U32 => __with__! { u32 },
PType::U64 => __with__! { u64 },
PType::F16 => panic!("Unsupported ptype f16"),
PType::F32 => panic!("Unsupported ptype f32"),
PType::F64 => panic!("Unsupported ptype f64"),
other => panic!("Unsupported ptype {other}")
}
})
}
Expand All @@ -206,7 +204,7 @@ macro_rules! match_each_unsigned_integer_ptype {
PType::U16 => __with__! { u16 },
PType::U32 => __with__! { u32 },
PType::U64 => __with__! { u64 },
_ => panic!("Unsupported ptype {}", $self),
other => panic!("Unsupported ptype {other}"),
}
})
}
Expand All @@ -222,7 +220,7 @@ macro_rules! match_each_float_ptype {
PType::F16 => __with__! { f16 },
PType::F32 => __with__! { f32 },
PType::F64 => __with__! { f64 },
_ => panic!("Unsupported ptype {}", $self),
other => panic!("Unsupported ptype {other}"),
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
//!
//! # Reading
//!
//! Layout reading is implemented by [`VortexFileArrayStream`]. The [`VortexFileArrayStream`] should
//! Layout reading is implemented by [`VortexReadArrayStream`]. The [`VortexReadArrayStream`] should
//! be constructed by a [`VortexReadBuilder`], which first uses an [InitialRead] to read the footer
//! (schema, layout, postscript, version, and magic bytes). In most cases, these entire footer can
//! be read by a single read of the suffix of the file.
//!
//! A [`VortexFileArrayStream`] internally contains a [`LayoutMessageCache`] which is shared by its
//! A [`VortexReadArrayStream`] internally contains a [`LayoutMessageCache`] which is shared by its
//! layout reader and the layout reader's descendants. The cache permits the reading system to
//! "read" the bytes of a layout multiple times without triggering reads to the underlying storage.
//! For example, the [`VortexFileArrayStream`] reads an array, evaluates the row filter, and then
//! For example, the [`VortexReadArrayStream`] reads an array, evaluates the row filter, and then
//! reads the array again with the filter mask.
//!
//! A [`LayoutReader`] then assembles one or more Vortex arrays by reading the serialized data and
Expand All @@ -64,7 +64,7 @@
//! # Apache Arrow
//!
//! If you ultimately seek Arrow arrays, [`VortexRecordBatchReader`] converts a
//! [`VortexFileArrayStream`] into a [`RecordBatchReader`](arrow_array::RecordBatchReader).
//! [`VortexReadArrayStream`] into a [`RecordBatchReader`](arrow_array::RecordBatchReader).

mod read;
mod write;
Expand Down
13 changes: 7 additions & 6 deletions vortex-file/src/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ pub(crate) trait ReadMasked {

/// Read an array with a [`RowMask`].
pub(crate) struct ReadArray {
layout: Box<dyn LayoutReader>,
layout: Arc<dyn LayoutReader>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So LayoutReaders are no longer stateful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like I overlooked some layers of of state/locks in the readers, that will take some thinking and probably additional refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok after a more careful reading and a small test, as far as I can tell while the LayoutReader implementations are stateful (with interior mutability), they can read independently from each other as the LayoutBufferedReader is the part that actually controls the overall flow and each stream initializes a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be multiple concurrent reads being tracked in the layout reader but are they reusable?

I thought something encoded assumptions that splits were accessed monotonically but maybe that was in the BufferedReader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so too, but that does seem to be in the BufferedLayoutReader. The LayoutReader implementation read through poll_read which takes a RowMask to read try and read a specific range on each call, and the BufferedReader tracks that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to access the layoutreaders out of order so we can read multiple at the same time irrespective of their children.

}

impl ReadArray {
pub(crate) fn new(layout: Box<dyn LayoutReader>) -> Self {
pub(crate) fn new(layout: Arc<dyn LayoutReader>) -> Self {
Self { layout }
}
}
Expand All @@ -51,7 +51,8 @@ enum RowMaskState<V> {
}

pub struct BufferedLayoutReader<R, S, V, RM> {
values: S,
/// Stream of row masks to read
read_masks: S,
row_mask_reader: RM,
in_flight: Option<BoxFuture<'static, io::Result<Vec<Message>>>>,
queued: VecDeque<RowMaskState<V>>,
Expand All @@ -69,12 +70,12 @@ where
pub fn new(
read: R,
dispatcher: Arc<IoDispatcher>,
values: S,
read_masks: S,
row_mask_reader: RM,
cache: Arc<RwLock<LayoutMessageCache>>,
) -> Self {
Self {
values,
read_masks,
row_mask_reader,
in_flight: None,
queued: VecDeque::new(),
Expand Down Expand Up @@ -126,7 +127,7 @@ where

let mut exhausted = false;
while read_more_count < NUM_TO_COALESCE {
match self.values.poll_next_unpin(cx) {
match self.read_masks.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(next_mask))) => {
if let Some(read_result) = self.row_mask_reader.read_masked(&next_mask)? {
match read_result {
Expand Down
30 changes: 16 additions & 14 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use vortex_error::VortexResult;
use vortex_expr::Select;
use vortex_io::{IoDispatcher, VortexReadAt};

use super::handle::VortexReadHandle;
use super::InitialRead;
use crate::read::cache::{LayoutMessageCache, RelativeLayoutCache};
use crate::read::context::LayoutDeserializer;
use crate::read::filtering::RowFilter;
use crate::read::projection::Projection;
use crate::read::stream::VortexFileArrayStream;
use crate::read::{RowMask, Scan};

pub(crate) mod initial_read;
Expand Down Expand Up @@ -122,12 +122,20 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
self
}

pub async fn build(self) -> VortexResult<VortexFileArrayStream<R>> {
pub async fn make_initial_read(&mut self) -> VortexResult<InitialRead> {
if let Some(initial_read) = self.initial_read.as_ref() {
return Ok(initial_read.clone());
}

let new_read = read_initial_bytes(&self.read_at, self.file_size().await?).await?;
self.initial_read = Some(new_read.clone());

Ok(new_read)
}

pub async fn build(mut self) -> VortexResult<VortexReadHandle<R>> {
// we do a large enough initial read to get footer, layout, and schema
let initial_read = match self.initial_read {
Some(r) => r,
None => read_initial_bytes(&self.read_at, self.file_size().await?).await?,
};
let initial_read = self.make_initial_read().await?;

let layout = initial_read.fb_layout();

Expand Down Expand Up @@ -163,19 +171,13 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
let row_mask = self
.row_mask
.as_ref()
.map(|row_mask| {
if row_mask.dtype().is_int() {
RowMask::from_index_array(row_mask, 0, row_count as usize)
} else {
RowMask::from_mask_array(row_mask, 0, row_count as usize)
}
})
.map(|row_mask| RowMask::from_array(row_mask, 0, row_count as usize))
.transpose()?;

// Default: fallback to single-threaded tokio dispatcher.
let io_dispatcher = self.io_dispatcher.unwrap_or_default();

VortexFileArrayStream::try_new(
VortexReadHandle::try_new(
self.read_at,
layout_reader,
filter_reader,
Expand Down
4 changes: 2 additions & 2 deletions vortex-file/src/read/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait Layout: Debug + Send + Sync {
scan: Scan,
layout_serde: LayoutDeserializer,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>>;
) -> VortexResult<Arc<dyn LayoutReader>>;
}

pub type LayoutRef = &'static dyn Layout;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl LayoutDeserializer {
layout: fb::Layout,
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>> {
) -> VortexResult<Arc<dyn LayoutReader>> {
let layout_id = LayoutId(layout.encoding());
self.layout_ctx
.lookup_layout(&layout_id)
Expand Down
125 changes: 125 additions & 0 deletions vortex-file/src/read/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::collections::BTreeSet;
use std::sync::{Arc, RwLock};

use futures::stream;
use itertools::Itertools;
use vortex_dtype::DType;
use vortex_error::{VortexResult, VortexUnwrap as _};
use vortex_io::{IoDispatcher, VortexReadAt};

use super::splits::SplitsAccumulator;
use super::{LayoutMessageCache, LayoutReader, LazyDType, RowMask, VortexReadArrayStream};
use crate::read::buffered::{BufferedLayoutReader, ReadArray};
use crate::read::splits::ReadRowMask;

#[derive(Clone)]
pub struct VortexReadHandle<R> {
input: R,
dtype: Arc<LazyDType>,
row_count: u64,
splits: Vec<(usize, usize)>,
layout_reader: Arc<dyn LayoutReader>,
filter_reader: Option<Arc<dyn LayoutReader>>,
messages_cache: Arc<RwLock<LayoutMessageCache>>,
row_mask: Option<RowMask>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to provide a handle-level row-mask? This sort of feels like a parameter you pass when you build a scan from the handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flow I imagine is something like:

  1. Read some external index to create initial RowMask
  2. Pass that row to the the builder to create a handle.
  3. Reuse the initial mask to create concurrent reads on subranges of the file

io_dispatcher: Arc<IoDispatcher>,
}

impl<R: VortexReadAt + Unpin> VortexReadHandle<R> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn try_new(
input: R,
layout_reader: Arc<dyn LayoutReader>,
filter_reader: Option<Arc<dyn LayoutReader>>,
messages_cache: Arc<RwLock<LayoutMessageCache>>,
dtype: Arc<LazyDType>,
row_count: u64,
row_mask: Option<RowMask>,
io_dispatcher: Arc<IoDispatcher>,
) -> VortexResult<Self> {
let mut reader_splits = BTreeSet::new();
layout_reader.add_splits(0, &mut reader_splits)?;
if let Some(ref fr) = filter_reader {
fr.add_splits(0, &mut reader_splits)?;
}

reader_splits.insert(row_count as usize);

let splits = reader_splits.into_iter().tuple_windows().collect();

Ok(Self {
input,
dtype,
row_count,
splits,
layout_reader,
filter_reader,
messages_cache,
row_mask,
io_dispatcher,
})
}

/// Returns the type of the file's top-level array.
pub fn dtype(&self) -> &DType {
// FIXME(ngates): why is this allowed to unwrap?
self.dtype.value().vortex_unwrap()
}

/// Returns the total row count of the Vortex file, before any filtering.
pub fn row_count(&self) -> u64 {
self.row_count
}

/// Returns a set of row splits in the file, that can be used to inform on how to split it horizontally.
pub fn splits(&self) -> &[(usize, usize)] {
&self.splits
}

/// Create a stream over all data from the handle
pub fn into_stream(self) -> VortexReadArrayStream<R> {
let splits_vec = Vec::from_iter(self.splits().iter().copied());
let split_accumulator = SplitsAccumulator::new(splits_vec.into_iter(), self.row_mask);

let splits_stream = stream::iter(split_accumulator);

// Set up a stream of RowMask that result from applying a filter expression over the file.
let mask_iterator = if let Some(fr) = &self.filter_reader {
Box::new(BufferedLayoutReader::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we pull the splits_stream out of this thing and just stream::iter(self.splits().iter()).map(|s| self.read_range(s).boxed()).buffered(64) or whatever?

Or is that not possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly worried about using buffered everywhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffered is a bit too naive. You want more control over coalescing

self.input.clone(),
self.io_dispatcher.clone(),
splits_stream,
ReadRowMask::new(fr.clone()),
self.messages_cache.clone(),
)) as _
} else {
Box::new(splits_stream) as _
};

// Set up a stream of result ArrayData that result from applying the filter and projection
// expressions over the file.
let array_reader = BufferedLayoutReader::new(
self.input,
self.io_dispatcher,
mask_iterator,
ReadArray::new(self.layout_reader),
self.messages_cache,
);

VortexReadArrayStream::new(self.dtype, self.row_count, array_reader)
}

/// Create a stream over a specific row range from the handle
pub fn stream_range(
mut self,
begin: usize,
end: usize,
) -> VortexResult<VortexReadArrayStream<R>> {
self.row_mask = match self.row_mask {
Some(mask) => Some(mask.and_rowmask(RowMask::new_valid_between(begin, end))?),
None => Some(RowMask::new_valid_between(begin, end)),
};

Ok(self.into_stream())
}
}
Loading
Loading