From 3f947cf0a616f8afbcf0434b9accfa2d03986c6e Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 17 Jul 2024 15:50:02 +0100 Subject: [PATCH] final touches --- vortex-array/src/array/varbin/compute/mod.rs | 3 +-- vortex-datafusion/src/eval.rs | 1 + vortex-datafusion/src/plans.rs | 16 ++-------------- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index 3f51f3a115..2720fd2867 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -1,4 +1,3 @@ -use arrow_array::Array as _; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use vortex_error::VortexResult; use vortex_expr::Operator; @@ -62,7 +61,7 @@ impl CompareFn for VarBinArray { Operator::Lte => lt_eq(&lhs.as_ref(), &rhs.as_ref())?, }; - let data = ArrayData::from_arrow(&r, r.null_count() > 0); + let data = ArrayData::from_arrow(&r, true); Ok(data.into_array()) } } diff --git a/vortex-datafusion/src/eval.rs b/vortex-datafusion/src/eval.rs index ddaa946d3b..33a45b38d1 100644 --- a/vortex-datafusion/src/eval.rs +++ b/vortex-datafusion/src/eval.rs @@ -39,6 +39,7 @@ impl ExpressionEvaluator { } } Expr::Column(col) => { + // TODO(adamg): Use variant trait once its merged let array = array.clone().into_struct()?; let name = col.name(); array diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 39c2d375a0..04890c516d 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -134,17 +134,14 @@ impl ExecutionPlan for RowSelectorExec { .unwrap(), ); - let conjunction_expr = simplify_expr( - &make_conjunction(&self.filter_exprs)?, - filter_schema.clone(), - )?; + let conjunction_expr = + simplify_expr(&make_conjunction(&self.filter_exprs)?, filter_schema)?; Ok(Box::pin(RowIndicesStream { chunked_array: self.chunked_array.clone(), chunk_idx: 0, filter_projection: self.filter_projection.clone(), conjunction_expr, - filter_schema, })) } } @@ -155,7 +152,6 @@ pub(crate) struct RowIndicesStream { chunk_idx: usize, conjunction_expr: Expr, filter_projection: Vec, - filter_schema: SchemaRef, } impl Stream for RowIndicesStream { @@ -405,7 +401,6 @@ mod test { use std::sync::Arc; use arrow_array::{RecordBatch, UInt64Array}; - use arrow_schema::{DataType, Field, Schema}; use datafusion_expr::{and, col, lit}; use itertools::Itertools; use vortex::array::bool::BoolArray; @@ -420,11 +415,6 @@ mod test { #[tokio::test] async fn test_filtering_stream() { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::UInt64, false), - Field::new("b", DataType::Boolean, false), - ])); - let chunk = StructArray::try_new( Arc::new([FieldName::from("a"), FieldName::from("b")]), vec![ @@ -441,13 +431,11 @@ mod test { let chunked_array = ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap(); - let filter_schema = schema.clone(); let filtering_stream = RowIndicesStream { chunked_array: chunked_array.clone(), chunk_idx: 0, conjunction_expr: and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))), filter_projection: vec![0, 1], - filter_schema, }; let rows: Vec = futures::executor::block_on_stream(filtering_stream)