From 8dd21a58c7ce51c966b78f581ac4f370db086124 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 31 Jul 2024 11:21:10 +0100 Subject: [PATCH] Support dynamic layouts with io batching --- .gitmodules | 3 - Cargo.lock | 26 +- .../src/array/primitive/compute/mod.rs | 1 + vortex-datafusion/examples/table_provider.rs | 4 +- vortex-datafusion/src/persistent/opener.rs | 15 +- vortex-serde/Cargo.toml | 2 + vortex-serde/src/file/mod.rs | 9 - vortex-serde/src/file/reader/batch.rs | 72 --- vortex-serde/src/file/reader/column.rs | 112 ---- vortex-serde/src/file/reader/filtering.rs | 20 - vortex-serde/src/file/reader/mod.rs | 340 ------------ vortex-serde/src/file/reader/schema.rs | 24 - vortex-serde/src/io/object_store.rs | 2 + vortex-serde/src/io/read.rs | 2 +- vortex-serde/src/{file => layout}/footer.rs | 20 +- vortex-serde/src/layout/mod.rs | 507 ++++++++++++++++++ vortex-serde/src/layout/reader/batch.rs | 73 +++ vortex-serde/src/layout/reader/buffered.rs | 105 ++++ vortex-serde/src/layout/reader/filtering.rs | 19 + vortex-serde/src/layout/reader/mod.rs | 318 +++++++++++ .../{file => layout}/reader/projections.rs | 2 +- vortex-serde/src/layout/reader/schema.rs | 22 + vortex-serde/src/{file => layout}/tests.rs | 66 +-- .../writer/layout_writer.rs} | 39 +- .../src/{file => layout/writer}/layouts.rs | 39 +- vortex-serde/src/layout/writer/mod.rs | 2 + vortex-serde/src/lib.rs | 2 +- vortex-serde/src/message_reader.rs | 14 +- vortex-serde/src/writer.rs | 8 +- 29 files changed, 1167 insertions(+), 701 deletions(-) delete mode 100644 .gitmodules delete mode 100644 vortex-serde/src/file/mod.rs delete mode 100644 vortex-serde/src/file/reader/batch.rs delete mode 100644 vortex-serde/src/file/reader/column.rs delete mode 100644 vortex-serde/src/file/reader/filtering.rs delete mode 100644 vortex-serde/src/file/reader/mod.rs delete mode 100644 vortex-serde/src/file/reader/schema.rs rename vortex-serde/src/{file => layout}/footer.rs (71%) create mode 100644 vortex-serde/src/layout/mod.rs create mode 100644 vortex-serde/src/layout/reader/batch.rs create mode 100644 vortex-serde/src/layout/reader/buffered.rs create mode 100644 vortex-serde/src/layout/reader/filtering.rs create mode 100644 vortex-serde/src/layout/reader/mod.rs rename vortex-serde/src/{file => layout}/reader/projections.rs (90%) create mode 100644 vortex-serde/src/layout/reader/schema.rs rename vortex-serde/src/{file => layout}/tests.rs (69%) rename vortex-serde/src/{file/file_writer.rs => layout/writer/layout_writer.rs} (90%) rename vortex-serde/src/{file => layout/writer}/layouts.rs (84%) create mode 100644 vortex-serde/src/layout/writer/mod.rs diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index bfbcfba415..0000000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "deps/fastlanez"] - path = deps/fastlanez - url = https://github.com/fulcrum-so/fastlanez.git diff --git a/Cargo.lock b/Cargo.lock index d5a6753a2b..341787c8ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -543,9 +543,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "fca2be1d5c43812bae364ee3f30b3afcb7877cf59f4aeb94c66f313a41d2fac9" [[package]] name = "bzip2" @@ -3435,9 +3435,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.122" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ "itoa", "memchr", @@ -3447,9 +3447,9 @@ dependencies = [ [[package]] name = "serde_test" -version = "1.0.177" +version = "1.0.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f901ee573cab6b3060453d2d5f0bae4e6d628c23c0a962ff9b5f1d7c8d4f1ed" +checksum = "5a2f49ace1498612d14f7e0b8245519584db8299541dfe31a06374a828d620ab" dependencies = [ "serde", ] @@ -4383,6 +4383,7 @@ dependencies = [ name = "vortex-serde" version = "0.2.0" dependencies = [ + "ahash", "arrow", "arrow-array", "arrow-ipc", @@ -4407,6 +4408,7 @@ dependencies = [ "vortex-build", "vortex-dtype", "vortex-error", + "vortex-expr", "vortex-fastlanes", "vortex-flatbuffers", "vortex-sampling-compressor", @@ -4799,9 +4801,9 @@ dependencies = [ [[package]] name = "worker" -version = "0.3.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a229b4d95f00cda5b229afe91b733714b79558c6b02830bb5c5bb1d32922f7" +checksum = "76faa841b0c43036d7a0c15e7ea6917ea8edd654c0255f641b6a91ee587fc35d" dependencies = [ "async-trait", "bytes", @@ -4845,9 +4847,9 @@ dependencies = [ [[package]] name = "worker-macros" -version = "0.3.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7ed257c9a7ebb524d03e109ac5c5ad0168e2ea79e647e0aaad90a9b0b90fb30" +checksum = "10720bd9a0f6681445769d7eaa30758cc92b51050943bc615ad70cbcbac45360" dependencies = [ "async-trait", "proc-macro2", @@ -4861,9 +4863,9 @@ dependencies = [ [[package]] name = "worker-sys" -version = "0.3.2" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac736bbe32f34a9299a1d094dbc06733a7e68844e224f34580574e5b1c432d46" +checksum = "a76ce0f37481733a38c20d6d696f3c365e86ab059a6dfd37dea15c3b500416a4" dependencies = [ "cfg-if", "js-sys", diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 914ad12b55..e539bb1bbb 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -25,6 +25,7 @@ impl ArrayCompute for PrimitiveArray { fn fill_forward(&self) -> Option<&dyn FillForwardFn> { Some(self) } + fn filter_indices(&self) -> Option<&dyn FilterIndicesFn> { Some(self) } diff --git a/vortex-datafusion/examples/table_provider.rs b/vortex-datafusion/examples/table_provider.rs index 14ed9b4cc0..b40456e223 100644 --- a/vortex-datafusion/examples/table_provider.rs +++ b/vortex-datafusion/examples/table_provider.rs @@ -14,7 +14,7 @@ use vortex::validity::Validity; use vortex::IntoArray; use vortex_datafusion::persistent::config::{VortexFile, VortexTableConfig}; use vortex_datafusion::persistent::provider::VortexFileTableProvider; -use vortex_serde::file::file_writer::FileWriter; +use vortex_serde::layout::writer::layout_writer::LayoutWriter; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> { .open(&filepath) .await?; - let writer = FileWriter::new(f); + let writer = LayoutWriter::new(f); let writer = writer.write_array_columns(st.into_array()).await?; writer.finalize().await?; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 1ce621e026..e193d94bb5 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -7,10 +7,11 @@ use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; use object_store::ObjectStore; -use vortex::IntoCanonical; -use vortex_serde::file::reader::projections::Projection; -use vortex_serde::file::reader::VortexBatchReaderBuilder; +use vortex::{Context, IntoCanonical}; use vortex_serde::io::ObjectStoreReadAt; +use vortex_serde::layout::reader::projections::Projection; +use vortex_serde::layout::reader::VortexLayoutReaderBuilder; +use vortex_serde::layout::{LayoutContext, LayoutReader}; pub struct VortexFileOpener { pub object_store: Arc, @@ -24,7 +25,13 @@ impl FileOpener for VortexFileOpener { let read_at = ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone()); - let mut builder = VortexBatchReaderBuilder::new(read_at); + let mut builder = VortexLayoutReaderBuilder::new( + read_at, + LayoutReader::new( + Arc::new(Context::default()), + Arc::new(LayoutContext::default()), + ), + ); if let Some(batch_size) = self.batch_size { builder = builder.with_batch_size(batch_size); diff --git a/vortex-serde/Cargo.toml b/vortex-serde/Cargo.toml index 36992ccdeb..a2b6711af4 100644 --- a/vortex-serde/Cargo.toml +++ b/vortex-serde/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } rust-version = { workspace = true } [dependencies] +ahash = { workspace = true } bytes = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true } @@ -25,6 +26,7 @@ vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-dtype = { workspace = true } vortex-error = { workspace = true, features = ["object_store"] } +vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true } vortex-scalar = { workspace = true } diff --git a/vortex-serde/src/file/mod.rs b/vortex-serde/src/file/mod.rs deleted file mode 100644 index 0e18c6cbcd..0000000000 --- a/vortex-serde/src/file/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod file_writer; -mod footer; -mod layouts; -pub mod reader; - -#[cfg(test)] -mod tests; - -pub const FULL_FOOTER_SIZE: usize = 20; diff --git a/vortex-serde/src/file/reader/batch.rs b/vortex-serde/src/file/reader/batch.rs deleted file mode 100644 index 1b0074c486..0000000000 --- a/vortex-serde/src/file/reader/batch.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::collections::VecDeque; -use std::sync::Arc; - -use vortex::array::StructArray; -use vortex::{Array, Context, IntoArray}; -use vortex_dtype::DType; -use vortex_error::VortexResult; - -use crate::file::layouts::Layout; -use crate::file::reader::column::ColumnReader; -use crate::io::VortexReadAt; - -pub(super) struct BatchReader { - readers: Vec<(Arc, ColumnReader)>, - reader: R, -} - -impl BatchReader { - pub fn new( - reader: R, - column_info: impl Iterator, DType, VecDeque)>, - ) -> Self { - Self { - reader, - readers: column_info - .map(|(name, dtype, layouts)| { - (name.clone(), ColumnReader::new(dtype.clone(), layouts)) - }) - .collect(), - } - } - - pub fn is_empty(&self) -> bool { - self.readers().all(|c| c.is_empty()) - } - - fn readers(&self) -> impl Iterator { - self.readers.iter().map(|(_, r)| r) - } - - pub async fn load(&mut self, batch_size: usize, context: Arc) -> VortexResult<()> { - for (_, column_reader) in self.readers.iter_mut() { - column_reader - .load(&mut self.reader, batch_size, context.clone()) - .await?; - } - - Ok(()) - } - - pub fn next(&mut self, batch_size: usize) -> Option> { - let mut final_columns = vec![]; - - for (col_name, column_reader) in self.readers.iter_mut() { - match column_reader.read_rows(batch_size) { - Ok(Some(array)) => final_columns.push((col_name.clone(), array)), - Ok(None) => { - debug_assert!( - final_columns.is_empty(), - "All columns should be empty together" - ); - return None; - } - Err(e) => return Some(Err(e)), - } - } - - Some(VortexResult::Ok( - StructArray::from_fields(final_columns.as_slice()).into_array(), - )) - } -} diff --git a/vortex-serde/src/file/reader/column.rs b/vortex-serde/src/file/reader/column.rs deleted file mode 100644 index f53d5ba5ca..0000000000 --- a/vortex-serde/src/file/reader/column.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::collections::VecDeque; -use std::sync::Arc; - -use bytes::{Bytes, BytesMut}; -use vortex::array::ChunkedArray; -use vortex::compute::slice; -use vortex::{Array, Context, IntoArray}; -use vortex_dtype::DType; -use vortex_error::{VortexError, VortexResult}; - -use crate::file::layouts::Layout; -use crate::io::VortexReadAt; -use crate::{ArrayBufferReader, ReadResult}; - -pub(super) struct ColumnReader { - dtype: DType, - layouts: VecDeque, - arrays: VecDeque, -} - -impl ColumnReader { - pub fn new(dtype: DType, layouts: VecDeque) -> Self { - Self { - dtype, - layouts, - arrays: Default::default(), - } - } - - pub fn is_empty(&self) -> bool { - self.layouts.is_empty() && self.arrays.is_empty() - } - - pub fn buffered_row_count(&self) -> usize { - self.arrays.iter().map(|arr| arr.len()).sum() - } - - pub async fn load( - &mut self, - reader: &mut R, - batch_size: usize, - context: Arc, - ) -> VortexResult<()> { - while self.buffered_row_count() < batch_size { - if let Some(layout) = self.layouts.pop_front() { - let byte_range = layout.as_flat().unwrap().range; - let mut buffer = BytesMut::with_capacity(byte_range.len()); - unsafe { buffer.set_len(byte_range.len()) }; - - let mut buff = reader - .read_at_into(byte_range.begin, buffer) - .await - .map_err(VortexError::from) - .unwrap() - .freeze(); - - let mut array_reader = ArrayBufferReader::new(); - let mut read_buf = Bytes::new(); - while let Some(ReadResult::ReadMore(u)) = array_reader.read(read_buf.clone())? { - read_buf = buff.split_to(u); - } - - let array = array_reader - .into_array(context.clone(), self.dtype.clone()) - .unwrap(); - - self.arrays.push_back(array); - } else { - break; - } - } - - Ok(()) - } - - pub fn read_rows(&mut self, mut rows_needed: usize) -> VortexResult> { - if self.is_empty() { - return Ok(None); - } - - if self.layouts.is_empty() { - rows_needed = usize::min(rows_needed, self.buffered_row_count()); - } - - let mut result = Vec::default(); - - while rows_needed != 0 { - match self.arrays.pop_front() { - None => break, - Some(array) => { - if array.len() > rows_needed { - let taken = slice(&array, 0, rows_needed)?; - let leftover = slice(&array, rows_needed, array.len())?; - self.arrays.push_front(leftover); - rows_needed -= taken.len(); - result.push(taken); - } else { - rows_needed -= array.len(); - result.push(array); - } - } - } - } - - match result.len() { - 0 | 1 => Ok(result.pop()), - _ => Ok(Some( - ChunkedArray::try_new(result, self.dtype.clone())?.into_array(), - )), - } - } -} diff --git a/vortex-serde/src/file/reader/filtering.rs b/vortex-serde/src/file/reader/filtering.rs deleted file mode 100644 index e3ddc3347f..0000000000 --- a/vortex-serde/src/file/reader/filtering.rs +++ /dev/null @@ -1,20 +0,0 @@ -use vortex::Array; -use vortex_error::VortexResult; - -use super::projections::Projection; - -pub trait FilteringPredicate: Send + Sync { - fn projection(&self) -> &Projection; - fn evaluate(&mut self, array: &Array) -> VortexResult; -} - -#[derive(Default)] -pub struct RowFilter { - pub(crate) _filters: Vec>, -} - -impl RowFilter { - pub fn new(filters: Vec>) -> Self { - Self { _filters: filters } - } -} diff --git a/vortex-serde/src/file/reader/mod.rs b/vortex-serde/src/file/reader/mod.rs deleted file mode 100644 index ece71e7973..0000000000 --- a/vortex-serde/src/file/reader/mod.rs +++ /dev/null @@ -1,340 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use batch::BatchReader; -use bytes::BytesMut; -use filtering::RowFilter; -use futures::future::BoxFuture; -use futures::{ready, FutureExt, Stream}; -use projections::Projection; -use schema::Schema; -use vortex::array::{ConstantArray, StructArray}; -use vortex::compute::unary::subtract_scalar; -use vortex::compute::{and, filter, search_sorted, slice, take, SearchSortedSide}; -use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant}; -use vortex_dtype::{match_each_integer_ptype, DType, StructDType}; -use vortex_error::{vortex_bail, VortexResult}; -use vortex_scalar::Scalar; - -use super::layouts::{Layout, StructLayout}; -use crate::file::file_writer::MAGIC_BYTES; -use crate::file::footer::Footer; -use crate::io::VortexReadAt; - -mod batch; -mod column; -pub mod filtering; -pub mod projections; -pub mod schema; - -const DEFAULT_BATCH_SIZE: usize = 65536; -const DEFAULT_PROJECTION: Projection = Projection::All; - -pub struct VortexBatchReaderBuilder { - reader: R, - projection: Option, - len: Option, - take_indices: Option, - row_filter: Option, - batch_size: Option, -} - -impl VortexBatchReaderBuilder { - // Recommended read-size according to the AWS performance guide - const FOOTER_READ_SIZE: usize = 8 * 1024 * 1024; - const FOOTER_TRAILER_SIZE: usize = 20; - - pub fn new(reader: R) -> Self { - Self { - reader, - projection: None, - row_filter: None, - len: None, - take_indices: None, - batch_size: None, - } - } - - pub fn with_length(mut self, len: u64) -> Self { - self.len = Some(len); - self - } - - pub fn with_projection(mut self, projection: Projection) -> Self { - self.projection = Some(projection); - self - } - - pub fn with_take_indices(mut self, array: Array) -> Self { - // TODO(#441): Allow providing boolean masks - assert!( - array.dtype().is_int(), - "Mask arrays have to be integer arrays" - ); - self.take_indices = Some(array); - self - } - - pub fn with_row_filter(mut self, row_filter: RowFilter) -> Self { - self.row_filter = Some(row_filter); - self - } - - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = Some(batch_size); - self - } - - pub async fn build(mut self) -> VortexResult> { - let footer = self.read_footer().await?; - - // TODO(adamg): We probably want to unify everything that is going on here into a single type and implementation - let layout = if let Layout::Struct(s) = footer.layout()? { - s - } else { - vortex_bail!("Top level layout must be a 'StructLayout'"); - }; - let dtype = if let DType::Struct(s, _) = footer.dtype()? { - s - } else { - vortex_bail!("Top level dtype must be a 'StructDType'"); - }; - - let batch_size = self.batch_size.unwrap_or(DEFAULT_BATCH_SIZE); - let projection = self.projection.unwrap_or(DEFAULT_PROJECTION); - - VortexBatchStream::try_new( - self.reader, - layout, - dtype, - self.row_filter.unwrap_or_default(), - batch_size, - projection, - self.take_indices, - ) - } - - async fn len(&self) -> usize { - let len = match self.len { - Some(l) => l, - None => self.reader.size().await, - }; - - len as usize - } - - pub async fn read_footer(&mut self) -> VortexResult