Skip to content

Commit

Permalink
final touches
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Jul 17, 2024
1 parent 2d9f2ac commit 3f947cf
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 16 deletions.
3 changes: 1 addition & 2 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use arrow_array::Array as _;
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use vortex_error::VortexResult;
use vortex_expr::Operator;
Expand Down Expand Up @@ -62,7 +61,7 @@ impl CompareFn for VarBinArray {
Operator::Lte => lt_eq(&lhs.as_ref(), &rhs.as_ref())?,
};

let data = ArrayData::from_arrow(&r, r.null_count() > 0);
let data = ArrayData::from_arrow(&r, true);
Ok(data.into_array())
}
}
1 change: 1 addition & 0 deletions vortex-datafusion/src/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl ExpressionEvaluator {
}
}
Expr::Column(col) => {
// TODO(adamg): Use variant trait once its merged
let array = array.clone().into_struct()?;
let name = col.name();
array
Expand Down
16 changes: 2 additions & 14 deletions vortex-datafusion/src/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,14 @@ impl ExecutionPlan for RowSelectorExec {
.unwrap(),
);

let conjunction_expr = simplify_expr(
&make_conjunction(&self.filter_exprs)?,
filter_schema.clone(),
)?;
let conjunction_expr =
simplify_expr(&make_conjunction(&self.filter_exprs)?, filter_schema)?;

Ok(Box::pin(RowIndicesStream {
chunked_array: self.chunked_array.clone(),
chunk_idx: 0,
filter_projection: self.filter_projection.clone(),
conjunction_expr,
filter_schema,
}))
}
}
Expand All @@ -155,7 +152,6 @@ pub(crate) struct RowIndicesStream {
chunk_idx: usize,
conjunction_expr: Expr,
filter_projection: Vec<usize>,
filter_schema: SchemaRef,
}

impl Stream for RowIndicesStream {
Expand Down Expand Up @@ -405,7 +401,6 @@ mod test {
use std::sync::Arc;

use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_expr::{and, col, lit};
use itertools::Itertools;
use vortex::array::bool::BoolArray;
Expand All @@ -420,11 +415,6 @@ mod test {

#[tokio::test]
async fn test_filtering_stream() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt64, false),
Field::new("b", DataType::Boolean, false),
]));

let chunk = StructArray::try_new(
Arc::new([FieldName::from("a"), FieldName::from("b")]),
vec![
Expand All @@ -441,13 +431,11 @@ mod test {
let chunked_array =
ChunkedArray::try_new(vec![chunk.clone(), chunk.clone()], dtype).unwrap();

let filter_schema = schema.clone();
let filtering_stream = RowIndicesStream {
chunked_array: chunked_array.clone(),
chunk_idx: 0,
conjunction_expr: and((col("a")).eq(lit(2u64)), col("b").eq(lit(true))),
filter_projection: vec![0, 1],
filter_schema,
};

let rows: Vec<RecordBatch> = futures::executor::block_on_stream(filtering_stream)
Expand Down

0 comments on commit 3f947cf

Please sign in to comment.