Skip to content

Commit

Permalink
Vortex physical expressions support for on-disk data (#581)
Browse files Browse the repository at this point in the history
Adds initial support for vortex-specific physical expression evaluation,
mostly for filtering. Falls back to arrow/datafusion if there's any
issue.
  • Loading branch information
AdamGS authored Aug 12, 2024
1 parent 1b22286 commit 92b921e
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 55 deletions.
32 changes: 26 additions & 6 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,35 @@ use prettytable::{Cell, Row, Table};
struct Args {
#[arg(short, long, value_delimiter = ',')]
queries: Option<Vec<usize>>,
#[arg(short, long)]
threads: Option<usize>,
}

#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
async fn main() {
fn main() {
let args = Args::parse();

let runtime = match args.threads {
Some(0) => panic!("Can't use 0 threads for runtime"),
Some(1) => tokio::runtime::Builder::new_current_thread()
.enable_all()
.build(),
Some(n) => tokio::runtime::Builder::new_multi_thread()
.worker_threads(n)
.enable_all()
.build(),
None => tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build(),
}
.expect("Failed building the Runtime");

runtime.block_on(bench_main(args.queries));
}

async fn bench_main(queries: Option<Vec<usize>>) {
// uncomment the below to enable trace logging of datafusion execution
// setup_logger(LevelFilter::Trace);

let args = Args::parse();

// Run TPC-H data gen.
let data_dir = DBGen::new(DBGenOptions::default()).generate().unwrap();

Expand Down Expand Up @@ -55,15 +75,15 @@ async fn main() {
table.add_row(Row::new(cells));
}

let query_count = args.queries.as_ref().map_or(21, |c| c.len());
let query_count = queries.as_ref().map_or(21, |c| c.len());

// Setup a progress bar
let progress = ProgressBar::new((query_count * formats.len()) as u64);

// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
for (q, query) in tpch_queries() {
if let Some(queries) = args.queries.as_ref() {
if let Some(queries) = queries.as_ref() {
if !queries.contains(&q) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/bool/compute/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arrow_array::cast::AsArray as _;
use vortex_error::VortexResult;

use crate::array::BoolArray;
use crate::arrow::FromArrowArray;
use crate::arrow::FromArrowArray as _;
use crate::compute::{AndFn, OrFn};
use crate::{Array, IntoCanonical};

Expand Down
15 changes: 15 additions & 0 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ mod tests {

use crate::array::BoolArray;
use crate::compute::unary::scalar_at;
use crate::validity::Validity;
use crate::variants::BoolArrayTrait;
use crate::IntoArray;

Expand All @@ -169,6 +170,20 @@ mod tests {
assert!(scalar);
}

#[test]
fn test_all_some_iter() {
let arr = BoolArray::from_iter([Some(true), Some(false)]);

assert!(matches!(arr.validity(), Validity::AllValid));

let arr = arr.into_array();

let scalar = bool::try_from(&scalar_at(&arr, 0).unwrap()).unwrap();
assert!(scalar);
let scalar = bool::try_from(&scalar_at(&arr, 1).unwrap()).unwrap();
assert!(!scalar);
}

#[test]
fn test_bool_from_iter() {
let arr =
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn filter(array: &Array, predicate: &Array) -> VortexResult<Array> {
if predicate.dtype() != &DType::Bool(Nullability::NonNullable) {
vortex_bail!(
"predicate must be non-nullable bool, has dtype {}",
predicate.dtype()
predicate.dtype(),
);
}
if predicate.len() != array.len() {
Expand Down
72 changes: 35 additions & 37 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,41 @@ impl Array {
futures_util::stream::once(ready(Ok(self))),
)
}

#[inline]
pub fn with_dyn<R, F>(&self, mut f: F) -> R
where
F: FnMut(&dyn ArrayTrait) -> R,
{
let mut result = None;

self.encoding()
.with_dyn(self, &mut |array| {
// Sanity check that the encoding implements the correct array trait
debug_assert!(
match array.dtype() {
DType::Null => array.as_null_array().is_some(),
DType::Bool(_) => array.as_bool_array().is_some(),
DType::Primitive(..) => array.as_primitive_array().is_some(),
DType::Utf8(_) => array.as_utf8_array().is_some(),
DType::Binary(_) => array.as_binary_array().is_some(),
DType::Struct(..) => array.as_struct_array().is_some(),
DType::List(..) => array.as_list_array().is_some(),
DType::Extension(..) => array.as_extension_array().is_some(),
},
"Encoding {} does not implement the variant trait for {}",
self.encoding().id(),
array.dtype()
);

result = Some(f(array));
Ok(())
})
.unwrap();

// Now we unwrap the optional, which we know to be populated by the closure.
result.unwrap()
}
}

/// A depth-first pre-order iterator over a ArrayData.
Expand Down Expand Up @@ -243,43 +278,6 @@ impl ArrayVisitor for NBytesVisitor {
}
}

impl Array {
#[inline]
pub fn with_dyn<R, F>(&self, mut f: F) -> R
where
F: FnMut(&dyn ArrayTrait) -> R,
{
let mut result = None;

self.encoding()
.with_dyn(self, &mut |array| {
// Sanity check that the encoding implements the correct array trait
debug_assert!(
match array.dtype() {
DType::Null => array.as_null_array().is_some(),
DType::Bool(_) => array.as_bool_array().is_some(),
DType::Primitive(..) => array.as_primitive_array().is_some(),
DType::Utf8(_) => array.as_utf8_array().is_some(),
DType::Binary(_) => array.as_binary_array().is_some(),
DType::Struct(..) => array.as_struct_array().is_some(),
DType::List(..) => array.as_list_array().is_some(),
DType::Extension(..) => array.as_extension_array().is_some(),
},
"Encoding {} does not implement the variant trait for {}",
self.encoding().id(),
array.dtype()
);

result = Some(f(array));
Ok(())
})
.unwrap();

// Now we unwrap the optional, which we know to be populated by the closure.
result.unwrap()
}
}

impl Display for Array {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let prefix = match self {
Expand Down
6 changes: 5 additions & 1 deletion vortex-datafusion/src/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_expr::Operator;

use crate::can_be_pushed_down;
use crate::scalar::dfvalue_to_scalar;

pub struct ExpressionEvaluator;

Expand Down Expand Up @@ -36,7 +37,10 @@ impl ExpressionEvaluator {
.and_then(|a| a.field_by_name(name))
.ok_or(vortex_err!("Missing field {name} in struct array"))
}),
Expr::Literal(lit) => Ok(ConstantArray::new(lit.clone(), array.len()).into_array()),
Expr::Literal(lit) => {
let lit = dfvalue_to_scalar(lit.clone());
Ok(ConstantArray::new(lit, array.len()).into_array())
}
_ => unreachable!(),
}
}
Expand Down
140 changes: 138 additions & 2 deletions vortex-datafusion/src/expr.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
use arrow_schema::SchemaRef;
#![allow(dead_code)]

use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion_common::{Result as DFResult, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{and, lit, Expr};
use datafusion_expr::{and, lit, Expr, Operator as DFOperator};
use datafusion_physical_expr::PhysicalExpr;
use vortex::array::{ConstantArray, StructArray};
use vortex::compute::compare;
use vortex::variants::StructArrayTrait;
use vortex::{Array, IntoArray};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_expr::Operator;
use vortex_scalar::Scalar;

use crate::scalar::dfvalue_to_scalar;

/// Convert a set of expressions into a single AND expression.
///
Expand Down Expand Up @@ -32,6 +46,128 @@ pub(crate) fn simplify_expr(expr: &Expr, schema: SchemaRef) -> DFResult<Expr> {
simplifier.simplify(expr.clone())
}

pub trait VortexPhysicalExpr: Send + Sync {
fn evaluate(&self, array: &Array) -> VortexResult<Array>;
}

pub struct NoOp;

pub struct BinaryExpr {
left: Arc<dyn VortexPhysicalExpr>,
right: Arc<dyn VortexPhysicalExpr>,
operator: DFOperator,
}

pub struct Column {
name: String,
index: usize,
}

impl VortexPhysicalExpr for Column {
fn evaluate(&self, array: &Array) -> VortexResult<Array> {
let s = StructArray::try_from(array)?;

let column = s.field_by_name(&self.name).ok_or(vortex_err!(
"Array doesn't contain child array of name {}",
self.name
))?;

Ok(column)
}
}

pub struct Literal {
scalar_value: Scalar,
}

impl VortexPhysicalExpr for Literal {
fn evaluate(&self, array: &Array) -> VortexResult<Array> {
Ok(ConstantArray::new(self.scalar_value.clone(), array.len()).into_array())
}
}

impl VortexPhysicalExpr for BinaryExpr {
fn evaluate(&self, array: &Array) -> VortexResult<Array> {
let lhs = self.left.evaluate(array)?;
let rhs = self.right.evaluate(array)?;

let array = match self.operator {
DFOperator::Eq => compare(&lhs, &rhs, Operator::Eq)?,
DFOperator::NotEq => compare(&lhs, &rhs, Operator::NotEq)?,
DFOperator::Lt => compare(&lhs, &rhs, Operator::Lt)?,
DFOperator::LtEq => compare(&lhs, &rhs, Operator::Lte)?,
DFOperator::Gt => compare(&lhs, &rhs, Operator::Gt)?,
DFOperator::GtEq => compare(&lhs, &rhs, Operator::Gte)?,
DFOperator::And => vortex::compute::and(&lhs, &rhs)?,
DFOperator::Or => vortex::compute::or(&lhs, &rhs)?,
_ => vortex_bail!("{} is not a supported DF operator in Vortex", self.operator),
};

Ok(array)
}
}

impl VortexPhysicalExpr for NoOp {
fn evaluate(&self, _array: &Array) -> VortexResult<Array> {
vortex_bail!("NoOp::evaluate() should not be called")
}
}

pub fn convert_expr_to_vortex(
physical_expr: Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> VortexResult<Arc<dyn VortexPhysicalExpr>> {
if physical_expr.data_type(input_schema).unwrap().is_temporal() {
vortex_bail!("Doesn't support evaluating operations over temporal values");
}
if let Some(binary_expr) = physical_expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::BinaryExpr>()
{
let left = convert_expr_to_vortex(binary_expr.left().clone(), input_schema)?;
let right = convert_expr_to_vortex(binary_expr.right().clone(), input_schema)?;
let operator = *binary_expr.op();

return Ok(Arc::new(BinaryExpr {
left,
right,
operator,
}) as _);
}

if let Some(col_expr) = physical_expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
{
let expr = Column {
name: col_expr.name().to_owned(),
index: col_expr.index(),
};

return Ok(Arc::new(expr) as _);
}

if let Some(lit) = physical_expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Literal>()
{
let value = dfvalue_to_scalar(lit.value().clone());
return Ok(Arc::new(Literal {
scalar_value: value,
}) as _);
}

if physical_expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::NoOp>()
.is_some()
{
return Ok(Arc::new(NoOp));
}

vortex_bail!("Couldn't convert DataFusion physical expression to a vortex expression")
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down
3 changes: 2 additions & 1 deletion vortex-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use vortex::array::ChunkedArray;
use vortex::{Array, ArrayDType, IntoArrayVariant};
use vortex_error::vortex_err;

pub mod expr;
pub mod memory;
pub mod persistent;
pub mod scalar;

mod datatype;
mod eval;
mod expr;
mod plans;

const SUPPORTED_BINARY_OPS: &[Operator] = &[
Expand Down
4 changes: 4 additions & 0 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ impl ExecutionPlan for VortexExec {
let object_store = context
.runtime_env()
.object_store(&self.file_scan_config.object_store_url)?;

let arrow_schema = self.file_scan_config.file_schema.clone();

let opener = VortexFileOpener {
ctx: self.ctx.clone(),
object_store,
projection: self.file_scan_config.projection.clone(),
batch_size: None,
predicate: self.predicate.clone(),
arrow_schema,
};
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;

Expand Down
Loading

0 comments on commit 92b921e

Please sign in to comment.