diff --git a/Cargo.lock b/Cargo.lock index 5431681566..3fd8d68809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3848,11 +3848,15 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-plan", "futures", + "itertools 0.13.0", + "lazy_static", "pin-project", "tokio", "vortex-array", "vortex-dtype", "vortex-error", + "vortex-expr", + "vortex-scalar", ] [[package]] diff --git a/vortex-array/src/array/bool/compute/compare.rs b/vortex-array/src/array/bool/compute/compare.rs index d333c9cc33..a63e432cae 100644 --- a/vortex-array/src/array/bool/compute/compare.rs +++ b/vortex-array/src/array/bool/compute/compare.rs @@ -8,6 +8,7 @@ use crate::compute::compare::CompareFn; use crate::{Array, ArrayTrait, IntoArray, IntoArrayVariant}; impl CompareFn for BoolArray { + // TODO(aduffy): replace these with Arrow compute kernels. fn compare(&self, other: &Array, op: Operator) -> VortexResult { let flattened = other.clone().into_bool()?; let lhs = self.boolean_buffer(); diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 0292398166..41dfa5a9b2 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -7,6 +7,7 @@ use crate::impl_encoding; use crate::stats::Stat; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; + mod compute; mod flatten; mod stats; @@ -25,6 +26,8 @@ impl ConstantArray { Scalar: From, { let scalar: Scalar = scalar.into(); + // TODO(aduffy): add stats for bools, ideally there should be a + // StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls. let stats = StatsSet::from(HashMap::from([ (Stat::Max, scalar.clone()), (Stat::Min, scalar.clone()), diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 86a74c5092..7ffa0ceda2 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{FieldName, FieldNames, Nullability, StructDType}; -use vortex_error::{vortex_bail, vortex_err}; +use vortex_error::vortex_bail; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -27,6 +27,15 @@ impl StructArray { self.array().child(idx, dtype) } + pub fn field_by_name(&self, name: &str) -> Option { + let field_idx = self + .names() + .iter() + .position(|field_name| field_name.as_ref() == name); + + field_idx.and_then(|field_idx| self.field(field_idx)) + } + pub fn names(&self) -> &FieldNames { let DType::Struct(st, _) = self.dtype() else { unreachable!() @@ -126,7 +135,7 @@ impl StructArray { for column_idx in projection { children.push( self.field(*column_idx) - .ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?, + .expect("column must not exceed bounds"), ); names.push(self.names()[*column_idx].clone()); } diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index cc5dec36d5..02c32b78ca 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use enum_iterator::all; use itertools::Itertools; + use vortex_dtype::DType; use vortex_error::VortexError; use vortex_scalar::Scalar; @@ -27,6 +28,36 @@ impl StatsSet { } } + // pub fn constant(len: usize, scalar: &Scalar) -> Self { + // let mut stats = HashMap::from([ + // (Stat::Max, scalar.clone()), + // (Stat::Min, scalar.clone()), + // (Stat::IsConstant, true.into()), + // (Stat::IsSorted, true.into()), + // (Stat::RunCount, 1.into()), + // ]); + // + // match scalar.dtype() { + // DType::Bool(_) => { + // stats.insert(Stat::TrueCount, 0.into()); + // } + // DType::Primitive(ptype, _) => { + // ptype.byte_width(); + // stats.insert( + // Stat::BitWidthFreq, + // vec![0; ptype.byte_width() * 8 + 1].into(), + // ); + // stats.insert( + // Stat::TrailingZeroFreq, + // vec![ptype.byte_width() * 8; ptype.byte_width() * 8 + 1].into(), + // ); + // } + // _ => {} + // } + // + // Self::from(stats) + // } + /// Specialized constructor for the case where the StatsSet represents /// an array consisting entirely of [null](vortex_dtype::DType::Null) values. pub fn nulls(len: usize, dtype: &DType) -> Self { diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 72c4caa5b0..f013abbf5c 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -13,7 +13,9 @@ rust-version.workspace = true [dependencies] vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } +vortex-expr = { path = "../vortex-expr" } vortex-error = { path = "../vortex-error" } +vortex-scalar = { path = "../vortex-scalar" } arrow-array = { workspace = true } arrow-schema = { workspace = true } @@ -26,10 +28,13 @@ datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } +lazy_static = { workspace = true } pin-project = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } -[lints] -workspace = true +# TODO(aduffy): re-enable +#[lints] +#workspace = true diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 6361d36869..00adf82759 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -1,7 +1,8 @@ //! Connectors to enable DataFusion to read Vortex data. use std::any::Any; -use std::fmt::Formatter; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -16,28 +17,28 @@ use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::prelude::SessionContext; -use datafusion_common::{ - exec_datafusion_err, DataFusionError, Result as DFResult, ScalarValue, ToDFSchema, -}; -use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, TableType}; +use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, StreamExt}; +use itertools::Itertools; use pin_project::pin_project; +use vortex::array::bool::BoolArray; use vortex::array::chunked::ChunkedArray; -use vortex::array::struct_::StructArray; -use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use crate::datatype::infer_schema; mod datatype; +mod plans; pub trait SessionContextExt { fn read_vortex(&self, array: Array) -> DFResult; @@ -106,16 +107,28 @@ impl TableProvider for VortexInMemoryTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let filter_expr = if filters.is_empty() { + fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection + } + + let filter_exprs: Option> = if filters.is_empty() { None } else { - Some(make_simplified_conjunction( - filters, - self.schema_ref.clone(), - )?) + Some(filters.iter().cloned().collect()) }; - println!("simplified filter: {filter_expr:?}"); + let filter_projection = filter_exprs + .clone() + .map(|exprs| get_filter_projection(exprs.as_slice(), self.schema_ref.clone())); let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) @@ -129,56 +142,164 @@ impl TableProvider for VortexInMemoryTableProvider { ExecutionMode::Bounded, ); - Ok(Arc::new(VortexMemoryExec { - array: self.array.clone(), - projection: projection.cloned(), - filter_expr, - plan_properties, - })) + match (filter_exprs, filter_projection) { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaning struct columns + // using the calculcated indices from the filter. + (Some(filter_exprs), Some(filter_projection)) => Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + projection.clone(), + plan_properties, + )), + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + filter_exprs: None, + filter_projection: None, + scan_projection: projection.cloned(), + plan_properties, + })), + } } fn supports_filters_pushdown( &self, filters: &[&Expr], ) -> DFResult> { - // TODO(aduffy): add support for filter pushdown - Ok(filters + // Get the set of column filters supported. + let schema_columns: HashSet = self + .schema_ref + .fields + .iter() + .map(|field| field.name().clone()) + .collect(); + + filters .iter() .map(|expr| { - match expr { - // Several expressions can be pushed down. - Expr::BinaryExpr(_) - | Expr::IsNotNull(_) - | Expr::IsNull(_) - | Expr::IsTrue(_) - | Expr::IsFalse(_) - | Expr::IsNotTrue(_) - | Expr::IsNotFalse(_) - | Expr::Cast(_) => TableProviderFilterPushDown::Exact, - - // All other expressions should be handled outside of the TableProvider - // via the normal DataFusion operator chain. - _ => TableProviderFilterPushDown::Unsupported, + if can_be_pushed_down(*expr, &schema_columns)? { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) } - - TableProviderFilterPushDown::Exact }) - .collect()) + .try_collect() } } -struct ValidationVisitor {} +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + _schema: SchemaRef, + _filter_exprs: Vec, + _filter_projection: Vec, + _array: Array, + _output_projection: Option<&Vec>, + _plan_properties: PlanProperties, +) -> Arc { + // Create a struct array necessary to run the filter operations. + + todo!() +} -impl ValidationVisitor { +/// Check if the given expression tree can be pushed down into the scan. +fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult { + // If the filter references a column not known to our schema, we reject the filter for pushdown. + // TODO(aduffy): is this necessary? Under what conditions would this happen? + let column_refs = get_column_references(expr); + if !column_refs.is_subset(&schema_columns) { + return Ok(false); + } -} + fn is_supported(expr: &Expr) -> bool { + match expr { + Expr::BinaryExpr(binary_expr) => { + // Both the left and right sides must be column expressions, scalars, or casts. + + match binary_expr.op { + // Initially, we will only support pushdown for basic boolean operators + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => true, + + // TODO(aduffy): add support for LIKE + // TODO(aduffy): add support for basic mathematical ops +-*/ + // TODO(aduffy): add support for conjunctions, assuming all of the + // left and right are valid expressions. + _ => false, + } + } + Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + // TODO(aduffy): ensure that cast can be pushed down. + | Expr::Cast(_) => true, + _ => false, + } + } + + // Visitor that traverses the expression tree and tracks if any unsupported expressions were + // encountered. + struct IsSupportedVisitor { + supported_expressions_only: bool, + } -impl TreeNodeVisitor for ValidationVisitor { - type Node = Expr; + impl TreeNodeVisitor<'_> for IsSupportedVisitor { + type Node = Expr; - fn f_down(&mut self, node: &Self::Node) -> DFResult { + fn f_down(&mut self, node: &Self::Node) -> DFResult { + if !is_supported(node) { + self.supported_expressions_only = false; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + } } + + let mut visitor = IsSupportedVisitor { + supported_expressions_only: true, + }; + + // Traverse the tree. + // At the end of the traversal, the internal state of `visitor` will indicate if there were + // unsupported expressions encountered. + expr.visit(&mut visitor)?; + + Ok(visitor.supported_expressions_only) +} + +/// Extract out the columns from our table referenced by the expression. +fn get_column_references(expr: &Expr) -> HashSet { + let mut references = HashSet::new(); + + expr.apply(|node| match node { + Expr::Column(col) => { + references.insert(col.name.clone()); + + Ok(TreeNodeRecursion::Continue) + } + _ => Ok(TreeNodeRecursion::Continue), + }) + .unwrap(); + + references } /// A mask determining the rows in an Array that should be treated as valid for query processing. @@ -188,29 +309,46 @@ pub(crate) struct RowSelection { selection: NullBuffer, } +impl RowSelection { + /// Construct a new RowSelection with all elements initialized to selected (true). + pub(crate) fn new_selected(len: usize) -> Self { + Self { + selection: NullBuffer::new_valid(len), + } + } + + /// Construct a new RowSelection with all elements initialized to unselected (false). + pub(crate) fn new_unselected(len: usize) -> Self { + Self { + selection: NullBuffer::new_null(len), + } + } +} + +impl RowSelection { + // Based on the boolean array outputs of the other vector here. + // We want to be careful when comparing things based on the infra for pushdown here. + pub(crate) fn refine(&mut self, matches: &BoolArray) -> &mut Self { + let matches = matches.boolean_buffer(); + + // If nothing matches, we return a new value to set to false here. + if matches.count_set_bits() == 0 { + return self; + } + + // Use an internal BoolArray to perform the logic here. + // Once we have this setup, it might just work this way. + self + } +} + /// Convert a set of expressions that must all match into a single AND expression. /// /// # Returns /// /// If conversion is successful, the result will be a /// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. -/// -/// Note that the set of operators must be provided here instead. -/// -/// # Simplification -/// -/// Simplification will occur as part of this process, so constant folding and similar optimizations -/// will be applied before returning the final expression. -fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult { - let init = Box::new(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let conjunction = filters.iter().fold(init, |conj, item| { - Box::new(Expr::BinaryExpr(BinaryExpr::new( - conj, - Operator::And, - Box::new(item.clone()), - ))) - }); - +fn make_simplified(expr: &Expr, schema: SchemaRef) -> DFResult { let schema = schema.to_dfschema_ref()?; // simplify the expression. @@ -218,72 +356,72 @@ fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult< let context = SimplifyContext::new(&props).with_schema(schema); let simplifier = ExprSimplifier::new(context); - simplifier.simplify(*conjunction) + simplifier.simplify(expr.clone()) } /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] -struct VortexMemoryExec { +struct VortexScanExec { array: Array, - filter_expr: Option, - projection: Option>, + filter_exprs: Option>, + filter_projection: Option>, + scan_projection: Option>, plan_properties: PlanProperties, } -impl DisplayAs for VortexMemoryExec { +impl DisplayAs for VortexScanExec { fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -impl VortexMemoryExec { - /// Read a single array chunk from the source as a RecordBatch. - /// - /// `array` must be a [`StructArray`] or flatten into one. Passing a different Array variant - /// may cause a panic. - fn execute_single_chunk( - array: Array, - projection: &Option>, - _context: Arc, - ) -> DFResult { - let data = array +/// Read a single array chunk from the source as a RecordBatch. +/// +/// # Errors +/// This function will return an Error if `array` is not struct-typed. It will also return an +/// error if the projection references columns +fn execute_unfiltered( + array: &Array, + projection: &Option>, +) -> DFResult { + // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. + let struct_array = array + .clone() + .into_struct() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + + let field_order = if let Some(projection) = projection { + projection.clone() + } else { + (0..struct_array.names().len()).collect() + }; + + let projected_struct = struct_array + .project(field_order.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + let batch = RecordBatch::from( + projected_struct .into_canonical() - .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))? - .into_array(); - - // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. - let struct_array = StructArray::try_from(data).expect("array must be StructArray"); - - let field_order = if let Some(projection) = projection { - projection.clone() - } else { - (0..struct_array.names().len()).collect() - }; - - let projected_struct = - struct_array - .project(field_order.as_slice()) - .map_err(|vortex_err| { - exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") - })?; - let batch = RecordBatch::from( - projected_struct - .into_canonical() - .expect("struct arrays must flatten") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ); - Ok(Box::pin(VortexRecordBatchStream { - schema_ref: batch.schema(), - inner: futures::stream::iter(vec![batch]), - })) - } + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: batch.schema(), + inner: futures::stream::iter(vec![batch]), + })) } +// Row selector stream. +// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays +// back down to the other systems here instead. + #[pin_project] -struct VortexRecordBatchStream { +pub(crate) struct VortexRecordBatchStream { schema_ref: SchemaRef, #[pin] @@ -315,7 +453,7 @@ where } } -impl ExecutionPlan for VortexMemoryExec { +impl ExecutionPlan for VortexScanExec { fn as_any(&self) -> &dyn Any { self } @@ -339,7 +477,7 @@ impl ExecutionPlan for VortexMemoryExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> DFResult { let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array @@ -349,7 +487,7 @@ impl ExecutionPlan for VortexMemoryExec { self.array.clone() }; - Self::execute_single_chunk(chunk, &self.projection, context) + execute_unfiltered(&chunk, &self.scan_projection) } } diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs new file mode 100644 index 0000000000..739465a6f0 --- /dev/null +++ b/vortex-datafusion/src/plans.rs @@ -0,0 +1,205 @@ +//! Physical operators needed to implement scanning of Vortex arrays with pushdown. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::Result as DFResult; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::Expr; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; +use lazy_static::lazy_static; +use vortex::array::struct_::StructArray; + +/// Physical plan operator that applies a set of [filters][Expr] against the input, producing a +/// row mask that can be used downstream to force a take against the corresponding struct array +/// chunks but for different columns. +pub(crate) struct RowSelectorExec { + filter_exprs: Vec, + filter_projection: Vec, + + // cached PlanProperties object. We do not make use of this. + cached_plan_props: PlanProperties, + + // A Vortex struct array that contains all columns necessary for executing the filter + // expressions. + filter_struct: StructArray, +} + +lazy_static! { + static ref ROW_SELECTOR_SCHEMA_REF: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "row_idx", + DataType::UInt64, + false + )])); +} + +impl RowSelectorExec { + pub(crate) fn new( + filter_exprs: &Vec, + filter_projection: &Vec, + filter_struct: &StructArray, + ) -> Self { + let cached_plan_props = PlanProperties::new( + EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + filter_exprs: filter_exprs.clone(), + filter_projection: filter_projection.clone(), + filter_struct: filter_struct.clone(), + cached_plan_props, + } + } +} + +impl Debug for RowSelectorExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RowSelectorExec").finish() + } +} + +impl DisplayAs for RowSelectorExec { + fn fmt_as( + &self, + _display_format_type: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for RowSelectorExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cached_plan_props + } + + fn children(&self) -> Vec<&Arc> { + // No children + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + panic!("with_new_children not supported for RowSelectorExec") + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + todo!("need to implement this") + } +} + +/// Physical that receives a stream of row indices from a child operator, and uses that to perform +/// a `take` operation on tha backing Vortex array. +pub(crate) struct TakeRowsExec { + plan_properties: PlanProperties, + + // Array storing the indices used to take the plan nodes. + projection: Vec, + + // Input plan, a stream of indices on which we perform a take against the original dataset. + input: Arc, + + output_schema: SchemaRef, + + // A record batch holding the fields that were relevant to executing the upstream filter expression. + // These fields have already been decoded, so we hold them separately and "paste" them together + // with the fields we decode from `table` below. + filter_struct: RecordBatch, + + // The original Vortex array holding the fields we have not decoded yet. + table: StructArray, +} + +impl TakeRowsExec { + pub(crate) fn new( + schema_ref: SchemaRef, + projection: &Vec, + row_indices: Arc, + output_schema: SchemaRef, + table: StructArray, + ) -> Self { + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema_ref.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + plan_properties, + projection: projection.clone(), + input: row_indices, + output_schema: output_schema.clone(), + filter_struct: RecordBatch::new_empty(output_schema.clone()), + table, + } + } +} + +impl Debug for TakeRowsExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Take").finish() + } +} + +impl DisplayAs for TakeRowsExec { + fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for TakeRowsExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + panic!("unsupported with_new_children for {:?}", &self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + todo!() + } +}