diff --git a/Cargo.lock b/Cargo.lock index 8d9d7ee121..5ec29ccf59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3995,6 +3995,7 @@ dependencies = [ name = "vortex-array" version = "0.1.0" dependencies = [ + "arrow-arith", "arrow-array", "arrow-buffer", "arrow-cast", diff --git a/Cargo.toml b/Cargo.toml index cab5c4f21f..34b8c5cf1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ ahash = "0.8.11" allocator-api2 = "0.2.16" arrayref = "0.3.7" arrow = { version = "52.0.0", features = ["pyarrow"] } +arrow-arith = "52.0.0" arrow-array = "52.0.0" arrow-buffer = "52.0.0" arrow-cast = "52.0.0" diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 5eb4e078e2..bcd5222438 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -25,6 +25,7 @@ arrow-cast = { workspace = true } arrow-select = { workspace = true } arrow-schema = { workspace = true } arrow-ord = { workspace = true } +arrow-arith = { workspace = true } bytes = "1" enum-iterator = { workspace = true } flatbuffers = { workspace = true } diff --git a/vortex-array/src/array/bool/mod.rs b/vortex-array/src/array/bool/mod.rs index 8635977a39..e82f67fcd0 100644 --- a/vortex-array/src/array/bool/mod.rs +++ b/vortex-array/src/array/bool/mod.rs @@ -72,10 +72,6 @@ impl BoolArray { let buffer = BooleanBuffer::from(bools); Self::try_new(buffer, validity).unwrap() } - - pub fn true_count(&self) -> usize { - self.statistics().compute_true_count().unwrap() - } } impl ArrayTrait for BoolArray {} diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 53c0b2871a..08018973ce 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -7,6 +7,7 @@ //! implementations of these operators, else we will decode, and perform the equivalent operator //! from Arrow. +use arrow_array::cast::AsArray; pub use compare::{compare, CompareFn}; pub use filter::{filter, FilterFn}; pub use filter_indices::{filter_indices, FilterIndicesFn}; @@ -17,6 +18,10 @@ use unary::cast::CastFn; use unary::fill_forward::FillForwardFn; use unary::scalar_at::ScalarAtFn; use unary::scalar_subtract::SubtractScalarFn; +use vortex_error::VortexResult; + +use crate::arrow::FromArrowArray; +use crate::{Array, ArrayData, IntoArray, IntoArrayVariant, IntoCanonical}; mod compare; mod filter; @@ -99,3 +104,27 @@ pub trait ArrayCompute { None } } + +pub fn and(lhs: &Array, rhs: &Array) -> VortexResult { + let lhs = lhs.clone().into_bool()?.into_canonical()?.into_arrow(); + let lhs_bool = lhs.as_boolean(); + let rhs = rhs.clone().into_bool()?.into_canonical()?.into_arrow(); + let rhs_bool = rhs.as_boolean(); + + let data = + ArrayData::from_arrow(&arrow_arith::boolean::and(lhs_bool, rhs_bool)?, true).into_array(); + + Ok(data) +} + +pub fn or(lhs: &Array, rhs: &Array) -> VortexResult { + let lhs = lhs.clone().into_bool()?.into_canonical()?.into_arrow(); + let lhs_bool = lhs.as_boolean(); + let rhs = rhs.clone().into_bool()?.into_canonical()?.into_arrow(); + let rhs_bool = rhs.as_boolean(); + + let data = + ArrayData::from_arrow(&arrow_arith::boolean::or(lhs_bool, rhs_bool)?, true).into_array(); + + Ok(data) +} diff --git a/vortex-datafusion/src/eval.rs b/vortex-datafusion/src/eval.rs index 70973cf129..228f4c5268 100644 --- a/vortex-datafusion/src/eval.rs +++ b/vortex-datafusion/src/eval.rs @@ -1,8 +1,8 @@ use datafusion_expr::{Expr, Operator as DFOperator}; use vortex::{ - array::{bool::BoolArray, constant::ConstantArray}, - compute::compare, - Array, IntoArray, IntoArrayVariant, + array::constant::ConstantArray, + compute::{and, compare, or}, + Array, IntoArray, }; use vortex_error::{vortex_bail, vortex_err, VortexResult}; use vortex_expr::Operator; @@ -14,77 +14,21 @@ pub struct ExpressionEvaluator; impl ExpressionEvaluator { pub fn eval(array: Array, expr: &Expr) -> VortexResult { debug_assert!(can_be_pushed_down(expr)); - let original_len = array.len(); + match expr { Expr::BinaryExpr(expr) => { + let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; + let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; // TODO(adamg): turn and/or into more general compute functions match expr.op { - DFOperator::And => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let lhs = lhs.into_bool()?; - - if lhs.true_count() == 0 { - return Ok(ConstantArray::new(false, original_len).into_array()); - } - - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - let rhs = rhs.into_bool()?; - - if rhs.true_count() == 0 { - return Ok(ConstantArray::new(false, original_len).into_array()); - } - - let buffer = &lhs.boolean_buffer() & &rhs.boolean_buffer(); - Ok(BoolArray::from(buffer).into_array()) - } - DFOperator::Or => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let lhs = lhs.into_bool()?; - - if lhs.true_count() == original_len { - return Ok(ConstantArray::new(true, original_len).into_array()); - } - - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - let rhs = rhs.into_bool()?; - - if lhs.true_count() == original_len { - return Ok(ConstantArray::new(true, original_len).into_array()); - } - - let buffer = &lhs.boolean_buffer() | &rhs.boolean_buffer(); - Ok(BoolArray::from(buffer).into_array()) - } - DFOperator::Eq => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::Eq) - } - DFOperator::Gt => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::Gt) - } - DFOperator::GtEq => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::Gte) - } - DFOperator::Lt => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::Lt) - } - DFOperator::LtEq => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::Lte) - } - DFOperator::NotEq => { - let lhs = ExpressionEvaluator::eval(array.clone(), expr.left.as_ref())?; - let rhs = ExpressionEvaluator::eval(array, expr.right.as_ref())?; - compare(&lhs, &rhs, Operator::NotEq) - } + DFOperator::And => and(&lhs, &rhs), + DFOperator::Or => or(&lhs, &rhs), + DFOperator::Eq => compare(&lhs, &rhs, Operator::Eq), + DFOperator::Gt => compare(&lhs, &rhs, Operator::Gt), + DFOperator::GtEq => compare(&lhs, &rhs, Operator::Gte), + DFOperator::Lt => compare(&lhs, &rhs, Operator::Lt), + DFOperator::LtEq => compare(&lhs, &rhs, Operator::Lte), + DFOperator::NotEq => compare(&lhs, &rhs, Operator::NotEq), _ => vortex_bail!("{} is an unsupported operator", expr.op), } }