Skip to content

Commit

Permalink
Push filter schema manipulation into layout reader and reuse ipc mess…
Browse files Browse the repository at this point in the history
…age writer in file writer (#651)

This is done in preparation for pruning as we need to be able to mutate
the filters and read columns based on filters
  • Loading branch information
robert3005 authored Aug 21, 2024
1 parent 9d652f4 commit 47e8c56
Show file tree
Hide file tree
Showing 58 changed files with 646 additions and 937 deletions.
4 changes: 2 additions & 2 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex::{Array, IntoArray};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_serde::io::TokioAdapter;
use vortex_serde::writer::ArrayWriter;
use vortex_serde::stream_writer::StreamArrayWriter;

use crate::idempotent;
use crate::reader::BATCH_SIZE;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
.unwrap()
.block_on(async move {
let write = tokio::fs::File::create(path).await.unwrap();
ArrayWriter::new(TokioAdapter(write))
StreamArrayWriter::new(TokioAdapter(write))
.write_array(array)
.await
.unwrap();
Expand Down
20 changes: 11 additions & 9 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use vortex_error::{vortex_err, VortexResult};
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::chunked_reader::ChunkedArrayReader;
use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite};
use vortex_serde::writer::ArrayWriter;
use vortex_serde::MessageReader;
use vortex_serde::stream_reader::StreamArrayReader;
use vortex_serde::stream_writer::StreamArrayWriter;
use vortex_serde::DTypeReader;

use crate::{COMPRESSORS, CTX};

Expand All @@ -49,10 +50,12 @@ pub struct VortexFooter {

pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
let file = tokio::fs::File::open(path).await.unwrap();
let mut msgs = MessageReader::try_new(TokioAdapter(file)).await.unwrap();
msgs.array_stream_from_messages(CTX.clone())
.await
.unwrap()
let reader = StreamArrayReader::try_new(TokioAdapter(file), CTX.clone())
.await?
.load_dtype()
.await?;
reader
.into_array_stream()
.collect_chunked()
.await
.map(|a| a.into_array())
Expand All @@ -64,7 +67,7 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

let written = ArrayWriter::new(write)
let written = StreamArrayWriter::new(write)
.write_array_stream(chunked.array_stream())
.await?;

Expand Down Expand Up @@ -146,8 +149,7 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
buf.reserve(header_len - buf.len());
unsafe { buf.set_len(header_len) }
buf = reader.read_at_into(footer.dtype_range.start, buf).await?;
let mut header_reader = MessageReader::try_new(buf).await?;
let dtype = header_reader.read_dtype().await?;
let dtype = DTypeReader::new(buf).await?.read_dtype().await?;

ChunkedArrayReader::try_new(
reader,
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
use vortex_dtype::DType;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::writer::LayoutWriter;
use vortex_serde::layouts::LayoutWriter;

use crate::idempotent_async;

Expand Down
16 changes: 7 additions & 9 deletions vortex-array/src/array/chunked/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use vortex_error::{vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::array::chunked::ChunkedArray;
use crate::array::primitive::PrimitiveArray;
use crate::compute::unary::{scalar_at, subtract_scalar, try_cast};
use crate::compute::{search_sorted, slice, take, SearchSortedSide, TakeFn};
use crate::stats::ArrayStatistics;
use crate::{Array, ArrayDType, IntoArray, ToArray};
use crate::{Array, ArrayDType, IntoArray, IntoArrayVariant, ToArray};

impl TakeFn for ChunkedArray {
fn take(&self, indices: &Array) -> VortexResult<Array> {
Expand All @@ -25,8 +24,7 @@ impl TakeFn for ChunkedArray {
return take_strict_sorted(self, indices);
}

// FIXME(ngates): this is wrong, need to canonicalise
let indices = PrimitiveArray::try_from(try_cast(indices, PType::U64.into())?)?;
let indices = try_cast(indices, PType::U64.into())?.into_primitive()?;

// While the chunk idx remains the same, accumulate a list of chunk indices.
let mut chunks = Vec::new();
Expand Down Expand Up @@ -79,8 +77,8 @@ fn take_strict_sorted(chunked: &ChunkedArray, indices: &Array) -> VortexResult<A
let (chunk_idx, _idx_in_chunk) = chunked.find_chunk_idx(idx);

// Find the end of this chunk, and locate that position in the indices array.
let chunk_begin = usize::try_from(&scalar_at(&chunked.chunk_ends(), chunk_idx)?)?;
let chunk_end = usize::try_from(&scalar_at(&chunked.chunk_ends(), chunk_idx + 1)?)?;
let chunk_begin = usize::try_from(&scalar_at(&chunked.chunk_offsets(), chunk_idx)?)?;
let chunk_end = usize::try_from(&scalar_at(&chunked.chunk_offsets(), chunk_idx + 1)?)?;
let chunk_end_pos = search_sorted(indices, chunk_end, SearchSortedSide::Left)?.to_index();

// Now we can say the slice of indices belonging to this chunk is [pos, chunk_end_pos)
Expand Down Expand Up @@ -110,13 +108,13 @@ fn take_strict_sorted(chunked: &ChunkedArray, indices: &Array) -> VortexResult<A

// Now we can take the chunks
let chunks = indices_by_chunk
.iter()
.into_iter()
.enumerate()
.filter_map(|(chunk_idx, indices)| indices.as_ref().map(|i| (chunk_idx, i)))
.filter_map(|(chunk_idx, indices)| indices.map(|i| (chunk_idx, i)))
.map(|(chunk_idx, chunk_indices)| {
take(
&chunked.chunk(chunk_idx).expect("chunk not found"),
chunk_indices,
&chunk_indices,
)
})
.try_collect()?;
Expand Down
21 changes: 11 additions & 10 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ChunkedArray {
}
}

let chunk_ends = [0u64]
let chunk_offsets = [0u64]
.into_iter()
.chain(chunks.iter().map(|c| c.len() as u64))
.scan(0, |acc, c| {
Expand All @@ -51,12 +51,13 @@ impl ChunkedArray {
})
.collect_vec();

let num_chunks = chunk_ends.len() - 1;
let length = *chunk_ends.last().unwrap_or_else(|| {
let num_chunks = chunk_offsets.len() - 1;
let length = *chunk_offsets.last().unwrap_or_else(|| {
unreachable!("Chunk ends is guaranteed to have at least one element")
}) as usize;

let mut children = vec![PrimitiveArray::from_vec(chunk_ends, NonNullable).into_array()];
let mut children = Vec::with_capacity(chunks.len() + 1);
children.push(PrimitiveArray::from_vec(chunk_offsets, NonNullable).into_array());
children.extend(chunks);

Self::try_from_parts(
Expand All @@ -70,8 +71,8 @@ impl ChunkedArray {

#[inline]
pub fn chunk(&self, idx: usize) -> Option<Array> {
let chunk_start = usize::try_from(&scalar_at(&self.chunk_ends(), idx).ok()?).ok()?;
let chunk_end = usize::try_from(&scalar_at(&self.chunk_ends(), idx + 1).ok()?).ok()?;
let chunk_start = usize::try_from(&scalar_at(&self.chunk_offsets(), idx).ok()?).ok()?;
let chunk_end = usize::try_from(&scalar_at(&self.chunk_offsets(), idx + 1).ok()?).ok()?;

// Offset the index since chunk_ends is child 0.
self.array()
Expand All @@ -83,7 +84,7 @@ impl ChunkedArray {
}

#[inline]
pub fn chunk_ends(&self) -> Array {
pub fn chunk_offsets(&self) -> Array {
self.array()
.child(0, &Self::ENDS_DTYPE, self.nchunks() + 1)
.expect("missing chunk ends")
Expand All @@ -92,7 +93,7 @@ impl ChunkedArray {
pub fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
assert!(index <= self.len(), "Index out of bounds of the array");

let search_result = search_sorted(&self.chunk_ends(), index, SearchSortedSide::Left)
let search_result = search_sorted(&self.chunk_offsets(), index, SearchSortedSide::Left)
.unwrap_or_else(|err| {
panic!("Search sorted failed in find_chunk_idx: {}", err);
});
Expand All @@ -106,7 +107,7 @@ impl ChunkedArray {
}
SearchResult::NotFound(i) => i - 1,
};
let chunk_start = &scalar_at(&self.chunk_ends(), index_chunk)
let chunk_start = &scalar_at(&self.chunk_offsets(), index_chunk)
.and_then(|s| usize::try_from(&s))
.unwrap_or_else(|err| {
panic!("Failed to find chunk start in find_chunk_idx: {}", err);
Expand Down Expand Up @@ -154,7 +155,7 @@ impl FromIterator<Array> for ChunkedArray {

impl AcceptArrayVisitor for ChunkedArray {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("chunk_ends", &self.chunk_ends())?;
visitor.visit_child("chunk_ends", &self.chunk_offsets())?;
for (idx, chunk) in self.chunks().enumerate() {
visitor.visit_child(format!("[{}]", idx).as_str(), &chunk)?;
}
Expand Down
25 changes: 19 additions & 6 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use vortex_dtype::field::Field;
use vortex_dtype::{DType, FieldName, FieldNames, Nullability, StructDType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

Expand Down Expand Up @@ -96,16 +97,25 @@ impl StructArray {
/// # Panics
/// This function will panic an error if the projection references columns not within the
/// schema boundaries.
pub fn project(&self, projection: &[usize]) -> VortexResult<Self> {
pub fn project(&self, projection: &[Field]) -> VortexResult<Self> {
let mut children = Vec::with_capacity(projection.len());
let mut names = Vec::with_capacity(projection.len());

for &column_idx in projection {
for field in projection.iter() {
let idx = match field {
Field::Name(n) => self
.names()
.iter()
.position(|name| name.as_ref() == n)
.ok_or_else(|| vortex_err!("Unknown field {n}"))?,
Field::Index(i) => *i,
};

names.push(self.names()[idx].clone());
children.push(
self.field(column_idx)
.ok_or(vortex_err!(OutOfBounds: column_idx, 0, self.dtypes().len()))?,
self.field(idx)
.ok_or_else(|| vortex_err!(OutOfBounds: idx, 0, self.dtypes().len()))?,
);
names.push(self.names()[column_idx].clone());
}

StructArray::try_new(
Expand Down Expand Up @@ -166,6 +176,7 @@ impl ArrayStatisticsCompute for StructArray {}

#[cfg(test)]
mod test {
use vortex_dtype::field::Field;
use vortex_dtype::{DType, FieldName, FieldNames, Nullability};

use crate::array::primitive::PrimitiveArray;
Expand Down Expand Up @@ -193,7 +204,9 @@ mod test {
)
.unwrap();

let struct_b = struct_a.project(&[2usize, 0]).unwrap();
let struct_b = struct_a
.project(&[Field::from(2usize), Field::from(0)])
.unwrap();
assert_eq!(
struct_b.names().as_ref(),
[FieldName::from("zs"), FieldName::from("xs")],
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/compute/compare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Display for Operator {
Operator::Lt => "<",
Operator::Lte => "<=",
};
write!(f, "{display}")
Display::fmt(display, f)
}
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex::validity::Validity;
use vortex::{Context, IntoArray};
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::layouts::writer::LayoutWriter;
use vortex_serde::layouts::LayoutWriter;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down
24 changes: 14 additions & 10 deletions vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use persistent::config::VortexTableOptions;
use persistent::provider::VortexFileTableProvider;
use vortex::array::ChunkedArray;
use vortex::{Array, ArrayDType, IntoArrayVariant};
use vortex_dtype::field::Field;
use vortex_error::vortex_err;

pub mod memory;
Expand Down Expand Up @@ -190,9 +191,8 @@ impl Debug for VortexScanExec {
}

impl DisplayAs for VortexScanExec {
#[allow(clippy::use_debug)]
fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
Debug::fmt(self, f)
}
}

Expand All @@ -203,7 +203,7 @@ pub(crate) struct VortexRecordBatchStream {
num_chunks: usize,
chunks: ChunkedArray,

projection: Vec<usize>,
projection: Vec<Field>,
}

impl Stream for VortexRecordBatchStream {
Expand All @@ -227,12 +227,11 @@ impl Stream for VortexRecordBatchStream {
.into_struct()
.map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

let projected_struct =
struct_array
.project(this.projection.as_slice())
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;
let projected_struct = struct_array
.project(&this.projection)
.map_err(|vortex_err| {
exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
})?;

Poll::Ready(Some(Ok(projected_struct.into())))
}
Expand Down Expand Up @@ -284,7 +283,12 @@ impl ExecutionPlan for VortexScanExec {
idx: 0,
num_chunks: self.array.nchunks(),
chunks: self.array.clone(),
projection: self.scan_projection.clone(),
projection: self
.scan_projection
.iter()
.copied()
.map(Field::from)
.collect(),
}))
}
}
21 changes: 6 additions & 15 deletions vortex-datafusion/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use datafusion::prelude::*;
use datafusion_common::{Result as DFResult, ToDFSchema};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{TableProviderFilterPushDown, TableType};
use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties, PhysicalExpr};
use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties};
use datafusion_physical_plan::{ExecutionMode, ExecutionPlan, Partitioning, PlanProperties};
use itertools::Itertools;
use vortex::array::ChunkedArray;
use vortex::{Array, ArrayDType as _};
use vortex_expr::datafusion::extract_columns_from_expr;
use vortex_expr::datafusion::convert_expr_to_vortex;
use vortex_expr::VortexExpr;

use crate::datatype::infer_schema;
use crate::plans::{RowSelectorExec, TakeRowsExec};
Expand Down Expand Up @@ -95,16 +96,11 @@ impl TableProvider for VortexMemTable {
let df_schema = self.schema_ref.clone().to_dfschema()?;

let filter_expr = create_physical_expr(&expr, &df_schema, state.execution_props())?;

let filter_projection =
extract_columns_from_expr(Some(&filter_expr), self.schema_ref.clone())?
.into_iter()
.collect();
let filter_expr = convert_expr_to_vortex(filter_expr)?;

make_filter_then_take_plan(
self.schema_ref.clone(),
filter_expr,
filter_projection,
self.array.clone(),
output_projection.clone(),
state,
Expand Down Expand Up @@ -192,17 +188,12 @@ impl VortexMemTableOptions {
/// columns.
fn make_filter_then_take_plan(
schema: SchemaRef,
filter_expr: Arc<dyn PhysicalExpr>,
filter_projection: Vec<usize>,
filter_expr: Arc<dyn VortexExpr>,
chunked_array: ChunkedArray,
output_projection: Vec<usize>,
_session_state: &dyn Session,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let row_selector_op = Arc::new(RowSelectorExec::try_new(
filter_expr,
filter_projection,
&chunked_array,
)?);
let row_selector_op = Arc::new(RowSelectorExec::try_new(filter_expr, &chunked_array)?);

Ok(Arc::new(TakeRowsExec::new(
schema.clone(),
Expand Down
Loading

0 comments on commit 47e8c56

Please sign in to comment.