diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index f8bf193776..7f421032a1 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -15,15 +15,35 @@ use prettytable::{Cell, Row, Table}; struct Args { #[arg(short, long, value_delimiter = ',')] queries: Option>, + #[arg(short, long)] + threads: Option, } -#[tokio::main(flavor = "multi_thread", worker_threads = 8)] -async fn main() { +fn main() { + let args = Args::parse(); + + let runtime = match args.threads { + Some(0) => panic!("Can't use 0 threads for runtime"), + Some(1) => tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(), + Some(n) => tokio::runtime::Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build(), + None => tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build(), + } + .expect("Failed building the Runtime"); + + runtime.block_on(bench_main(args.queries)); +} + +async fn bench_main(queries: Option>) { // uncomment the below to enable trace logging of datafusion execution // setup_logger(LevelFilter::Trace); - let args = Args::parse(); - // Run TPC-H data gen. let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap(); @@ -55,7 +75,7 @@ async fn main() { table.add_row(Row::new(cells)); } - let query_count = args.queries.as_ref().map_or(21, |c| c.len()); + let query_count = queries.as_ref().map_or(21, |c| c.len()); // Setup a progress bar let progress = ProgressBar::new((query_count * formats.len()) as u64); @@ -63,7 +83,7 @@ async fn main() { // Send back a channel with the results of Row. let (rows_tx, rows_rx) = sync::mpsc::channel(); for (q, query) in tpch_queries() { - if let Some(queries) = args.queries.as_ref() { + if let Some(queries) = queries.as_ref() { if !queries.contains(&q) { continue; } diff --git a/vortex-array/src/array/bool/compute/boolean.rs b/vortex-array/src/array/bool/compute/boolean.rs index 9cdce98425..091fa72c52 100644 --- a/vortex-array/src/array/bool/compute/boolean.rs +++ b/vortex-array/src/array/bool/compute/boolean.rs @@ -3,7 +3,7 @@ use arrow_array::cast::AsArray as _; use vortex_error::VortexResult; use crate::array::BoolArray; -use crate::arrow::FromArrowArray; +use crate::arrow::FromArrowArray as _; use crate::compute::{AndFn, OrFn}; use crate::{Array, IntoCanonical}; diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 36b308a9de..26065bb8b7 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -159,6 +159,7 @@ mod tests { use crate::array::BoolArray; use crate::compute::unary::scalar_at; + use crate::validity::Validity; use crate::variants::BoolArrayTrait; use crate::IntoArray; @@ -169,6 +170,20 @@ mod tests { assert!(scalar); } + #[test] + fn test_all_some_iter() { + let arr = BoolArray::from_iter([Some(true), Some(false)]); + + assert!(matches!(arr.validity(), Validity::AllValid)); + + let arr = arr.into_array(); + + let scalar = bool::try_from(&scalar_at(&arr, 0).unwrap()).unwrap(); + assert!(scalar); + let scalar = bool::try_from(&scalar_at(&arr, 1).unwrap()).unwrap(); + assert!(!scalar); + } + #[test] fn test_bool_from_iter() { let arr = diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index c7be6a5b58..3ec0178a73 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -24,7 +24,7 @@ pub fn filter(array: &Array, predicate: &Array) -> VortexResult { if predicate.dtype() != &DType::Bool(Nullability::NonNullable) { vortex_bail!( "predicate must be non-nullable bool, has dtype {}", - predicate.dtype() + predicate.dtype(), ); } if predicate.len() != array.len() { diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 74c9f24f49..d48f23ff80 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -161,6 +161,41 @@ impl Array { futures_util::stream::once(ready(Ok(self))), ) } + + #[inline] + pub fn with_dyn(&self, mut f: F) -> R + where + F: FnMut(&dyn ArrayTrait) -> R, + { + let mut result = None; + + self.encoding() + .with_dyn(self, &mut |array| { + // Sanity check that the encoding implements the correct array trait + debug_assert!( + match array.dtype() { + DType::Null => array.as_null_array().is_some(), + DType::Bool(_) => array.as_bool_array().is_some(), + DType::Primitive(..) => array.as_primitive_array().is_some(), + DType::Utf8(_) => array.as_utf8_array().is_some(), + DType::Binary(_) => array.as_binary_array().is_some(), + DType::Struct(..) => array.as_struct_array().is_some(), + DType::List(..) => array.as_list_array().is_some(), + DType::Extension(..) => array.as_extension_array().is_some(), + }, + "Encoding {} does not implement the variant trait for {}", + self.encoding().id(), + array.dtype() + ); + + result = Some(f(array)); + Ok(()) + }) + .unwrap(); + + // Now we unwrap the optional, which we know to be populated by the closure. + result.unwrap() + } } /// A depth-first pre-order iterator over a ArrayData. @@ -243,43 +278,6 @@ impl ArrayVisitor for NBytesVisitor { } } -impl Array { - #[inline] - pub fn with_dyn(&self, mut f: F) -> R - where - F: FnMut(&dyn ArrayTrait) -> R, - { - let mut result = None; - - self.encoding() - .with_dyn(self, &mut |array| { - // Sanity check that the encoding implements the correct array trait - debug_assert!( - match array.dtype() { - DType::Null => array.as_null_array().is_some(), - DType::Bool(_) => array.as_bool_array().is_some(), - DType::Primitive(..) => array.as_primitive_array().is_some(), - DType::Utf8(_) => array.as_utf8_array().is_some(), - DType::Binary(_) => array.as_binary_array().is_some(), - DType::Struct(..) => array.as_struct_array().is_some(), - DType::List(..) => array.as_list_array().is_some(), - DType::Extension(..) => array.as_extension_array().is_some(), - }, - "Encoding {} does not implement the variant trait for {}", - self.encoding().id(), - array.dtype() - ); - - result = Some(f(array)); - Ok(()) - }) - .unwrap(); - - // Now we unwrap the optional, which we know to be populated by the closure. - result.unwrap() - } -} - impl Display for Array { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let prefix = match self { diff --git a/vortex-datafusion/src/eval.rs b/vortex-datafusion/src/eval.rs index 49034f92b2..cd7ccbdd74 100644 --- a/vortex-datafusion/src/eval.rs +++ b/vortex-datafusion/src/eval.rs @@ -6,6 +6,7 @@ use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_expr::Operator; use crate::can_be_pushed_down; +use crate::scalar::dfvalue_to_scalar; pub struct ExpressionEvaluator; @@ -36,7 +37,10 @@ impl ExpressionEvaluator { .and_then(|a| a.field_by_name(name)) .ok_or(vortex_err!("Missing field {name} in struct array")) }), - Expr::Literal(lit) => Ok(ConstantArray::new(lit.clone(), array.len()).into_array()), + Expr::Literal(lit) => { + let lit = dfvalue_to_scalar(lit.clone()); + Ok(ConstantArray::new(lit, array.len()).into_array()) + } _ => unreachable!(), } } diff --git a/vortex-datafusion/src/expr.rs b/vortex-datafusion/src/expr.rs index 62e348498d..342d22b0fd 100644 --- a/vortex-datafusion/src/expr.rs +++ b/vortex-datafusion/src/expr.rs @@ -1,9 +1,23 @@ -use arrow_schema::SchemaRef; +#![allow(dead_code)] + +use std::sync::Arc; + +use arrow_schema::{Schema, SchemaRef}; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion_common::{Result as DFResult, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::{and, lit, Expr}; +use datafusion_expr::{and, lit, Expr, Operator as DFOperator}; +use datafusion_physical_expr::PhysicalExpr; +use vortex::array::{ConstantArray, StructArray}; +use vortex::compute::compare; +use vortex::variants::StructArrayTrait; +use vortex::{Array, IntoArray}; +use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_expr::Operator; +use vortex_scalar::Scalar; + +use crate::scalar::dfvalue_to_scalar; /// Convert a set of expressions into a single AND expression. /// @@ -32,6 +46,128 @@ pub(crate) fn simplify_expr(expr: &Expr, schema: SchemaRef) -> DFResult { simplifier.simplify(expr.clone()) } +pub trait VortexPhysicalExpr: Send + Sync { + fn evaluate(&self, array: &Array) -> VortexResult; +} + +pub struct NoOp; + +pub struct BinaryExpr { + left: Arc, + right: Arc, + operator: DFOperator, +} + +pub struct Column { + name: String, + index: usize, +} + +impl VortexPhysicalExpr for Column { + fn evaluate(&self, array: &Array) -> VortexResult { + let s = StructArray::try_from(array)?; + + let column = s.field_by_name(&self.name).ok_or(vortex_err!( + "Array doesn't contain child array of name {}", + self.name + ))?; + + Ok(column) + } +} + +pub struct Literal { + scalar_value: Scalar, +} + +impl VortexPhysicalExpr for Literal { + fn evaluate(&self, array: &Array) -> VortexResult { + Ok(ConstantArray::new(self.scalar_value.clone(), array.len()).into_array()) + } +} + +impl VortexPhysicalExpr for BinaryExpr { + fn evaluate(&self, array: &Array) -> VortexResult { + let lhs = self.left.evaluate(array)?; + let rhs = self.right.evaluate(array)?; + + let array = match self.operator { + DFOperator::Eq => compare(&lhs, &rhs, Operator::Eq)?, + DFOperator::NotEq => compare(&lhs, &rhs, Operator::NotEq)?, + DFOperator::Lt => compare(&lhs, &rhs, Operator::Lt)?, + DFOperator::LtEq => compare(&lhs, &rhs, Operator::Lte)?, + DFOperator::Gt => compare(&lhs, &rhs, Operator::Gt)?, + DFOperator::GtEq => compare(&lhs, &rhs, Operator::Gte)?, + DFOperator::And => vortex::compute::and(&lhs, &rhs)?, + DFOperator::Or => vortex::compute::or(&lhs, &rhs)?, + _ => vortex_bail!("{} is not a supported DF operator in Vortex", self.operator), + }; + + Ok(array) + } +} + +impl VortexPhysicalExpr for NoOp { + fn evaluate(&self, _array: &Array) -> VortexResult { + vortex_bail!("NoOp::evaluate() should not be called") + } +} + +pub fn convert_expr_to_vortex( + physical_expr: Arc, + input_schema: &Schema, +) -> VortexResult> { + if physical_expr.data_type(input_schema).unwrap().is_temporal() { + vortex_bail!("Doesn't support evaluating operations over temporal values"); + } + if let Some(binary_expr) = physical_expr + .as_any() + .downcast_ref::() + { + let left = convert_expr_to_vortex(binary_expr.left().clone(), input_schema)?; + let right = convert_expr_to_vortex(binary_expr.right().clone(), input_schema)?; + let operator = *binary_expr.op(); + + return Ok(Arc::new(BinaryExpr { + left, + right, + operator, + }) as _); + } + + if let Some(col_expr) = physical_expr + .as_any() + .downcast_ref::() + { + let expr = Column { + name: col_expr.name().to_owned(), + index: col_expr.index(), + }; + + return Ok(Arc::new(expr) as _); + } + + if let Some(lit) = physical_expr + .as_any() + .downcast_ref::() + { + let value = dfvalue_to_scalar(lit.value().clone()); + return Ok(Arc::new(Literal { + scalar_value: value, + }) as _); + } + + if physical_expr + .as_any() + .downcast_ref::() + .is_some() + { + return Ok(Arc::new(NoOp)); + } + + vortex_bail!("Couldn't convert DataFusion physical expression to a vortex expression") +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index a72d1bbeac..8c75b126c6 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -27,12 +27,13 @@ use vortex::array::ChunkedArray; use vortex::{Array, ArrayDType, IntoArrayVariant}; use vortex_error::vortex_err; +pub mod expr; pub mod memory; pub mod persistent; +pub mod scalar; mod datatype; mod eval; -mod expr; mod plans; const SUPPORTED_BINARY_OPS: &[Operator] = &[ diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 792cb68459..43ff1a7a48 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -91,12 +91,16 @@ impl ExecutionPlan for VortexExec { let object_store = context .runtime_env() .object_store(&self.file_scan_config.object_store_url)?; + + let arrow_schema = self.file_scan_config.file_schema.clone(); + let opener = VortexFileOpener { ctx: self.ctx.clone(), object_store, projection: self.file_scan_config.projection.clone(), batch_size: None, predicate: self.predicate.clone(), + arrow_schema, }; let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d3e275c42c..79fd0768eb 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,23 +1,31 @@ use std::sync::Arc; -use arrow_array::RecordBatch; +use arrow_array::{Array as _, BooleanArray, RecordBatch}; +use arrow_schema::SchemaRef; +use datafusion::arrow::buffer::{buffer_bin_and_not, BooleanBuffer}; use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; use datafusion_common::Result as DFResult; use datafusion_physical_expr::PhysicalExpr; use futures::{FutureExt as _, TryStreamExt}; use object_store::ObjectStore; -use vortex::Context; +use vortex::array::BoolArray; +use vortex::arrow::FromArrowArray; +use vortex::{Array, Context, IntoArrayVariant as _}; +use vortex_error::VortexResult; use vortex_serde::io::ObjectStoreReadAt; use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder; use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer}; use vortex_serde::layouts::reader::projections::Projection; +use crate::expr::convert_expr_to_vortex; + pub struct VortexFileOpener { pub ctx: Arc, pub object_store: Arc, pub batch_size: Option, pub projection: Option>, pub predicate: Option>, + pub arrow_schema: SchemaRef, } impl FileOpener for VortexFileOpener { @@ -34,9 +42,11 @@ impl FileOpener for VortexFileOpener { builder = builder.with_batch_size(batch_size); } - if let Some(_predicate) = self.predicate.as_ref() { - log::warn!("Missing logic to turn a physical expression into a RowFilter"); - } + let predicate = self + .predicate + .clone() + .map(|predicate| convert_expr_to_vortex(predicate, self.arrow_schema.as_ref())) + .transpose()?; if let Some(projection) = self.projection.as_ref() { builder = builder.with_projection(Projection::new(projection)) @@ -44,9 +54,50 @@ impl FileOpener for VortexFileOpener { Ok(async move { let reader = builder.build().await?; - let stream = reader.map_ok(RecordBatch::from).map_err(|e| e.into()); + + let stream = reader + .and_then(move |array| { + let predicate = predicate.clone(); + async move { + let array = if let Some(predicate) = predicate.as_ref() { + let predicate_result = predicate.evaluate(&array)?; + + let filter_array = null_as_false(&predicate_result.into_bool()?)?; + vortex::compute::filter(&array, &filter_array)? + } else { + array + }; + + VortexResult::Ok(RecordBatch::from(array)) + } + }) + .map_err(|e| e.into()); Ok(Box::pin(stream) as _) } .boxed()) } } + +/// Mask all null values of a Arrow boolean array to false +fn null_as_false(array: &BoolArray) -> VortexResult { + let array = BooleanArray::from(array.boolean_buffer()); + + let boolean_array = match array.nulls() { + None => array, + Some(nulls) => { + let inner_bool_buffer = array.values(); + let buff = buffer_bin_and_not( + inner_bool_buffer.inner(), + inner_bool_buffer.offset(), + nulls.buffer(), + nulls.offset(), + inner_bool_buffer.len(), + ); + let bool_buffer = + BooleanBuffer::new(buff, inner_bool_buffer.offset(), inner_bool_buffer.len()); + BooleanArray::from(bool_buffer) + } + }; + + Ok(Array::from_arrow(&boolean_array, false)) +} diff --git a/vortex-datafusion/src/scalar.rs b/vortex-datafusion/src/scalar.rs new file mode 100644 index 0000000000..856a5699a1 --- /dev/null +++ b/vortex-datafusion/src/scalar.rs @@ -0,0 +1,38 @@ +use datafusion_common::ScalarValue; +use vortex::array::make_temporal_ext_dtype; +use vortex_dtype::{DType, Nullability}; +use vortex_scalar::{PValue, Scalar}; + +pub fn dfvalue_to_scalar(value: ScalarValue) -> Scalar { + match value { + ScalarValue::Null => Some(Scalar::null(DType::Null)), + ScalarValue::Boolean(b) => b.map(Scalar::from), + ScalarValue::Float16(f) => f.map(Scalar::from), + ScalarValue::Float32(f) => f.map(Scalar::from), + ScalarValue::Float64(f) => f.map(Scalar::from), + ScalarValue::Int8(i) => i.map(Scalar::from), + ScalarValue::Int16(i) => i.map(Scalar::from), + ScalarValue::Int32(i) => i.map(Scalar::from), + ScalarValue::Int64(i) => i.map(Scalar::from), + ScalarValue::UInt8(i) => i.map(Scalar::from), + ScalarValue::UInt16(i) => i.map(Scalar::from), + ScalarValue::UInt32(i) => i.map(Scalar::from), + ScalarValue::UInt64(i) => i.map(Scalar::from), + ScalarValue::Utf8(s) => s.as_ref().map(|s| Scalar::from(s.as_str())), + ScalarValue::Utf8View(s) => s.as_ref().map(|s| Scalar::from(s.as_str())), + ScalarValue::LargeUtf8(s) => s.as_ref().map(|s| Scalar::from(s.as_str())), + ScalarValue::Binary(b) => b.as_ref().map(|b| Scalar::from(b.clone())), + ScalarValue::BinaryView(b) => b.as_ref().map(|b| Scalar::from(b.clone())), + ScalarValue::LargeBinary(b) => b.as_ref().map(|b| Scalar::from(b.clone())), + ScalarValue::FixedSizeBinary(_, b) => b.map(|b| Scalar::from(b.clone())), + ScalarValue::Date32(v) => v.map(|i| { + let ext_dtype = make_temporal_ext_dtype(&value.data_type()); + Scalar::new( + DType::Extension(ext_dtype, Nullability::Nullable), + vortex_scalar::ScalarValue::Primitive(PValue::I32(i)), + ) + }), + _ => unimplemented!("Can't convert {value:?} value to a Vortex scalar"), + } + .unwrap_or(Scalar::null(DType::Null)) +}