Skip to content

Commit

Permalink
RowIndicesExec receives a vortex array, not a recordbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Jun 26, 2024
1 parent fcc964d commit f2f6c26
Showing 1 changed file with 49 additions and 43 deletions.
92 changes: 49 additions & 43 deletions vortex-datafusion/src/plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::compute::cast;
use datafusion_common::{DFSchema, Result as DFResult};
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion_expr::Expr;
Expand All @@ -25,7 +24,7 @@ use pin_project::pin_project;
use vortex::array::struct_::StructArray;
use vortex::arrow::FromArrowArray;
use vortex::compute::take::take;
use vortex::{ArrayDType, ArrayData, IntoArray, IntoCanonical};
use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical};

use crate::datatype::infer_schema;
use crate::expr::{make_conjunction, simplify_expr};
Expand Down Expand Up @@ -120,18 +119,15 @@ impl ExecutionPlan for RowSelectorExec {
let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype()));

let filter_struct = self.filter_struct.clone();
let inner = Box::pin(async move {
let arrow_array = filter_struct.into_canonical().unwrap().into_arrow();
Ok(RecordBatch::from(arrow_array.as_struct()))
});
let one_shot = Box::pin(async move { filter_struct.into_array() });

let conjunction_expr = simplify_expr(
&make_conjunction(&self.filter_exprs)?,
stream_schema.clone(),
)?;

Ok(Box::pin(RowIndicesStream {
inner,
one_shot,
polled_inner: false,
conjunction_expr,
schema_ref: stream_schema,
Expand All @@ -144,8 +140,9 @@ impl ExecutionPlan for RowSelectorExec {
#[pin_project::pin_project]
pub(crate) struct RowIndicesStream<F> {
/// The inner future that returns `DFResult<RecordBatch>`.
/// This future should only poll one time.
#[pin]
inner: F,
one_shot: F,

polled_inner: bool,

Expand All @@ -156,14 +153,14 @@ pub(crate) struct RowIndicesStream<F> {

impl<F> Stream for RowIndicesStream<F>
where
F: Future<Output = DFResult<RecordBatch>>,
F: Future<Output = Array>,
{
type Item = DFResult<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

// If we have already polled the one-shot future with the filter records, indicate
// If we have already polled the one-shot future of filter records, indicate
// that the stream has finished.
if *this.polled_inner {
return Poll::Ready(None);
Expand All @@ -172,13 +169,20 @@ where
// Get the unfiltered record batch.
// Since this is a one-shot, we only want to poll the inner future once, to create the
// initial batch for us to process.
//
// We want to avoid ever calling it again.
let record_batch = ready!(this.inner.poll(cx))?;
let vortex_struct = ready!(this.one_shot.poll(cx));
*this.polled_inner = true;

// Using a local SessionContext, generate a physical plan to execute the conjunction query
// against the filter columns.
// Immediately convert to Arrow RecordBatch for processing.
// TODO(aduffy): attempt to pushdown the filter to Vortex without decoding.
let record_batch = RecordBatch::from(
vortex_struct
.into_canonical()
.unwrap()
.into_arrow()
.as_struct(),
);

// Generate a physical plan to execute the conjunction query against the filter columns.
//
// The result of a conjunction expression is a BooleanArray containing `true` for rows
// where the conjunction was satisfied, and `false` otherwise.
Expand Down Expand Up @@ -207,7 +211,7 @@ where

impl<F> RecordBatchStream for RowIndicesStream<F>
where
F: Future<Output = DFResult<RecordBatch>>,
F: Future<Output = Array>,
{
fn schema(&self) -> SchemaRef {
self.schema_ref.clone()
Expand Down Expand Up @@ -238,14 +242,13 @@ impl TakeRowsExec {
row_indices: Arc<dyn ExecutionPlan>,
table: &StructArray,
) -> Self {
let output_schema = Arc::new(schema_ref.project(projection).unwrap());
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(schema_ref.clone()),
EquivalenceProperties::new(output_schema.clone()),
Partitioning::RoundRobinBatch(1),
ExecutionMode::Bounded,
);

let output_schema = Arc::new(schema_ref.project(projection).unwrap());

Self {
plan_properties,
projection: projection.to_owned(),
Expand Down Expand Up @@ -373,24 +376,18 @@ where
.unwrap())));
}

// Assemble the output columns using the row indices.
// NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary.
let mut columns = Vec::new();
for (output_idx, src_idx) in this.output_projection.iter().enumerate() {
let encoded = this.vortex_array.field(*src_idx).expect("field access");
let decoded = take(&encoded, &row_indices)
.expect("take")
.into_canonical()
.expect("into_canonical")
.into_arrow();
let data_type = this.output_schema.field(output_idx).data_type();

columns.push(cast(&decoded, data_type).expect("cast"));
}
// TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful.
// We should find a way to avoid decoding the filter columns and only decode the other
// columns, then stitch the StructArray back together from those.
let projected_for_output = this.vortex_array.project(this.output_projection).unwrap();
let decoded = take(&projected_for_output.into_array(), &row_indices)
.expect("take")
.into_canonical()
.expect("into_canonical")
.into_arrow();

// Send back a single record batch of the decoded data.
let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns)
.expect("RecordBatch::try_new");
let output_batch = RecordBatch::from(decoded.as_struct());

Poll::Ready(Some(Ok(output_batch)))
}
Expand All @@ -409,10 +406,16 @@ where
mod test {
use std::sync::Arc;

use arrow_array::{BooleanArray, RecordBatch, UInt64Array};
use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_expr::{and, col, lit};
use itertools::Itertools;
use vortex::array::bool::BoolArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::struct_::StructArray;
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex_dtype::FieldName;

use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF};

Expand All @@ -424,20 +427,23 @@ mod test {
]));

let _schema = schema.clone();
let inner = Box::pin(async move {
Ok(RecordBatch::try_new(
_schema,
let one_shot = Box::pin(async move {
StructArray::try_new(
Arc::new([FieldName::from("a"), FieldName::from("b")]),
vec![
Arc::new(UInt64Array::from(vec![0u64, 1, 2])),
Arc::new(BooleanArray::from(vec![false, false, true])),
PrimitiveArray::from(vec![0u64, 1, 2]).into_array(),
BoolArray::from(vec![false, false, true]).into_array(),
],
3,
Validity::NonNullable,
)
.unwrap())
.unwrap()
.into_array()
});

let _schema = schema.clone();
let filtering_stream = RowIndicesStream {
inner,
one_shot,
polled_inner: false,
conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()),
schema_ref: _schema,
Expand Down

0 comments on commit f2f6c26

Please sign in to comment.