Skip to content

Commit

Permalink
Basic predicate pushdown support for Datafusion (#472)
Browse files Browse the repository at this point in the history
Enables basic support for predicate pushdown over in-memory vortex
arrays for `eq` operations under fairly limited conditions.
  • Loading branch information
AdamGS authored Jul 17, 2024
1 parent 5f72446 commit fde193f
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 161 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ arrow-cast = "52.0.0"
arrow-csv = "52.0.0"
arrow-data = "52.0.0"
arrow-ipc = "52.0.0"
arrow-ord = "52.0.0"
arrow-schema = "52.0.0"
arrow-select = "52.0.0"
async-trait = "0.1"
Expand Down
30 changes: 10 additions & 20 deletions bench-vortex/src/bin/tpch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::time::SystemTime;

use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::tpch::{load_datasets, tpch_queries, Format};
use futures::future::join_all;
use futures::future::try_join_all;
use indicatif::ProgressBar;
use itertools::Itertools;
use prettytable::{Cell, Row, Table};

#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
Expand All @@ -23,21 +22,12 @@ async fn main() {
Format::Vortex {
disable_pushdown: false,
},
Format::Vortex {
disable_pushdown: true,
},
];

// Load datasets
let ctxs = join_all(
formats
.iter()
.map(|format| load_datasets(&data_dir, *format)),
)
.await
.into_iter()
.map(|r| r.unwrap())
.collect_vec();
let ctxs = try_join_all(formats.map(|format| load_datasets(&data_dir, format)))
.await
.unwrap();

// Set up a results table
let mut table = Table::new();
Expand All @@ -53,9 +43,9 @@ async fn main() {
// Send back a channel with the results of Row.
let (rows_tx, rows_rx) = sync::mpsc::channel();
for (q, query) in tpch_queries() {
let _ctxs = ctxs.clone();
let _tx = rows_tx.clone();
let _progress = progress.clone();
let ctxs = ctxs.clone();
let tx = rows_tx.clone();
let progress = progress.clone();
rayon::spawn_fifo(move || {
let mut cells = Vec::with_capacity(formats.len());
cells.push(Cell::new(&format!("Q{}", q)));
Expand All @@ -65,7 +55,7 @@ async fn main() {
.enable_all()
.build()
.unwrap();
for (ctx, format) in _ctxs.iter().zip(formats.iter()) {
for (ctx, format) in ctxs.iter().zip(formats.iter()) {
for _ in 0..3 {
// warmup
rt.block_on(async {
Expand Down Expand Up @@ -98,7 +88,7 @@ async fn main() {
let fastest = measure.iter().cloned().min().unwrap();
elapsed_us.push(fastest);

_progress.inc(1);
progress.inc(1);
}

let baseline = elapsed_us.first().unwrap();
Expand All @@ -125,7 +115,7 @@ async fn main() {
);
}

_tx.send((q, Row::new(cells))).unwrap();
tx.send((q, Row::new(cells))).unwrap();
});
}

Expand Down
4 changes: 2 additions & 2 deletions encodings/dict/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
let mut lookup_dict: HashMap<u64, (), ()> = HashMap::with_hasher(());
let mut codes: Vec<u64> = Vec::with_capacity(lower);
let mut bytes: Vec<u8> = Vec::new();
let mut offsets: Vec<u64> = Vec::new();
let mut offsets: Vec<u32> = Vec::new();
offsets.push(0);

if dtype.is_nullable() {
Expand All @@ -133,7 +133,7 @@ where
RawEntryMut::Vacant(vac) => {
let next_code = offsets.len() as u64 - 1;
bytes.extend_from_slice(byte_ref);
offsets.push(bytes.len() as u64);
offsets.push(bytes.len() as u32);
vac.insert_with_hasher(value_hash, next_code, (), |idx| {
hasher.hash_one(lookup_bytes(
offsets.as_slice(),
Expand Down
1 change: 1 addition & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-select = { workspace = true }
arrow-schema = { workspace = true }
arrow-ord = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
flexbuffers = { workspace = true }
Expand Down
17 changes: 15 additions & 2 deletions vortex-array/src/array/constant/canonical.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use vortex_dtype::{match_each_native_ptype, Nullability, PType};
use std::iter;

use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType};
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::BoolScalar;
use vortex_scalar::{BoolScalar, Utf8Scalar};

use crate::array::bool::BoolArray;
use crate::array::constant::ConstantArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::varbin::VarBinArray;
use crate::validity::Validity;
use crate::ArrayDType;
use crate::{Canonical, IntoCanonical};
Expand All @@ -26,6 +29,16 @@ impl IntoCanonical for ConstantArray {
)));
}

if let Ok(s) = Utf8Scalar::try_from(self.scalar()) {
let const_value = s.value().unwrap();
let bytes = const_value.as_bytes();

return Ok(Canonical::VarBin(VarBinArray::from_iter(
iter::repeat(Some(bytes)).take(self.len()),
DType::Utf8(validity.nullability()),
)));
}

if let Ok(ptype) = PType::try_from(self.scalar().dtype()) {
return match_each_native_ptype!(ptype, |$P| {
Ok(Canonical::Primitive(PrimitiveArray::from_vec::<$P>(
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub struct ConstantMetadata {
impl ConstantArray {
pub fn new<S>(scalar: S, length: usize) -> Self
where
Scalar: From<S>,
S: Into<Scalar>,
{
let scalar: Scalar = scalar.into();
let scalar = scalar.into();
// TODO(aduffy): add stats for bools, ideally there should be a
// StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls.
let stats = StatsSet::from(HashMap::from([
Expand Down
4 changes: 1 addition & 3 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use vortex_scalar::Scalar;

use crate::array::varbin::{varbin_scalar, VarBinArray};
use crate::compute::unary::scalar_at::ScalarAtFn;
use crate::compute::ArrayCompute;
use crate::compute::SliceFn;
use crate::compute::TakeFn;
use crate::compute::{ArrayCompute, SliceFn, TakeFn};
use crate::validity::ArrayValidity;
use crate::ArrayDType;

Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn take_nullable<I: NativePType, O: NativePType>(
indices: &[I],
null_buffer: NullBuffer,
) -> VarBinArray {
let mut builder = VarBinBuilder::<I>::with_capacity(indices.len());
let mut builder = VarBinBuilder::<O>::with_capacity(indices.len());
for &idx in indices {
let idx = idx.to_usize().unwrap();
if null_buffer.is_valid(idx) {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl VarBinArray {
dtype: DType,
) -> Self {
let iter = iter.into_iter();
let mut builder = VarBinBuilder::<u64>::with_capacity(iter.size_hint().0);
let mut builder = VarBinBuilder::<u32>::with_capacity(iter.size_hint().0);
for v in iter {
builder.push(v.as_ref().map(|o| o.as_ref()));
}
Expand Down
3 changes: 1 addition & 2 deletions vortex-array/src/array/varbinview/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use vortex_scalar::Scalar;
use crate::array::varbin::varbin_scalar;
use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE};
use crate::compute::unary::scalar_at::ScalarAtFn;
use crate::compute::ArrayCompute;
use crate::compute::{slice, SliceFn};
use crate::compute::{slice, ArrayCompute, SliceFn};
use crate::validity::ArrayValidity;
use crate::{Array, ArrayDType, IntoArray, IntoArrayData};

Expand Down
40 changes: 21 additions & 19 deletions vortex-array/src/compute/compare.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use arrow_ord::cmp;
use vortex_error::VortexResult;
use vortex_expr::Operator;

use crate::{Array, ArrayDType, IntoArrayVariant};
use crate::{arrow::FromArrowArray, Array, ArrayData, IntoArray, IntoCanonical};

pub trait CompareFn {
fn compare(&self, array: &Array, predicate: Operator) -> VortexResult<Array>;
fn compare(&self, array: &Array, operator: Operator) -> VortexResult<Array>;
}

pub fn compare(left: &Array, right: &Array, operator: Operator) -> VortexResult<Array> {
if let Some(matching_indices) =
left.with_dyn(|lhs| lhs.compare().map(|rhs| rhs.compare(right, operator)))
if let Some(selection) =
left.with_dyn(|lhs| lhs.compare().map(|lhs| lhs.compare(right, operator)))
{
return matching_indices;
return selection;
}

// if compare is not implemented for the given array type, but the array has a numeric
// DType, we can flatten the array and apply filter to the flattened primitive array
match left.dtype() {
DType::Primitive(..) => {
let flat = left.clone().into_primitive()?;
flat.compare(right, operator)
}
_ => Err(vortex_err!(
NotImplemented: "compare",
left.encoding().id()
)),
}
// Fallback to arrow on canonical types
let lhs = left.clone().into_canonical()?.into_arrow();
let rhs = right.clone().into_canonical()?.into_arrow();

let array = match operator {
Operator::Eq => cmp::eq(&lhs.as_ref(), &rhs.as_ref())?,
Operator::NotEq => cmp::neq(&lhs.as_ref(), &rhs.as_ref())?,
Operator::Gt => cmp::gt(&lhs.as_ref(), &rhs.as_ref())?,
Operator::Gte => cmp::gt_eq(&lhs.as_ref(), &rhs.as_ref())?,
Operator::Lt => cmp::lt(&lhs.as_ref(), &rhs.as_ref())?,
Operator::Lte => cmp::lt_eq(&lhs.as_ref(), &rhs.as_ref())?,
};

Ok(ArrayData::from_arrow(&array, true).into_array())
}
2 changes: 1 addition & 1 deletion vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ vortex-array = { path = "../vortex-array" }
vortex-dtype = { path = "../vortex-dtype" }
vortex-expr = { path = "../vortex-expr" }
vortex-error = { path = "../vortex-error" }
vortex-scalar = { path = "../vortex-scalar" }
vortex-scalar = { path = "../vortex-scalar", features = ["datafusion"] }

arrow-array = { workspace = true }
arrow-schema = { workspace = true }
Expand Down
53 changes: 53 additions & 0 deletions vortex-datafusion/src/eval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use datafusion_expr::{Expr, Operator as DFOperator};
use vortex::{
array::{bool::BoolArray, constant::ConstantArray},
compute::compare,
Array, IntoArray, IntoArrayVariant,
};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_expr::Operator;

pub struct ExpressionEvaluator;

impl ExpressionEvaluator {
pub fn eval(array: Array, expr: &Expr) -> VortexResult<Array> {
match expr {
Expr::BinaryExpr(expr) => {
let lhs = expr.left.as_ref();
let rhs = expr.right.as_ref();

// TODO(adamg): turn and/or into more general compute functions
match expr.op {
DFOperator::And => {
let lhs = ExpressionEvaluator::eval(array.clone(), lhs)?.into_bool()?;
let rhs = ExpressionEvaluator::eval(array, rhs)?.into_bool()?;
let buffer = &lhs.boolean_buffer() & &rhs.boolean_buffer();
Ok(BoolArray::from(buffer).into_array())
}
DFOperator::Or => {
let lhs = ExpressionEvaluator::eval(array.clone(), lhs)?.into_bool()?;
let rhs = ExpressionEvaluator::eval(array.clone(), rhs)?.into_bool()?;
let buffer = &lhs.boolean_buffer() | &rhs.boolean_buffer();
Ok(BoolArray::from(buffer).into_array())
}
DFOperator::Eq => {
let lhs = ExpressionEvaluator::eval(array.clone(), lhs)?;
let rhs = ExpressionEvaluator::eval(array.clone(), rhs)?;
compare(&lhs, &rhs, Operator::Eq)
}
_ => vortex_bail!("{} is an unsupported operator", expr.op),
}
}
Expr::Column(col) => {
// TODO(adamg): Use variant trait once its merged
let array = array.clone().into_struct()?;
let name = col.name();
array
.field_by_name(name)
.ok_or(vortex_err!("Missing field {name} in struct"))
}
Expr::Literal(lit) => Ok(ConstantArray::new(lit.clone(), array.len()).into_array()),
_ => unreachable!(),
}
}
}
Loading

0 comments on commit fde193f

Please sign in to comment.