diff --git a/Cargo.lock b/Cargo.lock index 5431681566..ce03c9eb3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,11 +421,13 @@ name = "bench-vortex" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-schema", "arrow-select", "bytes", "bzip2", "criterion", "csv", + "datafusion", "enum-iterator", "flexbuffers", "futures", @@ -435,6 +437,7 @@ dependencies = [ "log", "mimalloc", "parquet", + "rand", "reqwest", "serde", "simplelog", @@ -443,6 +446,7 @@ dependencies = [ "vortex-alp", "vortex-array", "vortex-buffer", + "vortex-datafusion", "vortex-datetime-parts", "vortex-dict", "vortex-dtype", @@ -3848,11 +3852,15 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-plan", "futures", + "itertools 0.13.0", + "lazy_static", "pin-project", "tokio", "vortex-array", "vortex-dtype", "vortex-error", + "vortex-expr", + "vortex-scalar", ] [[package]] diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index cb222976d9..b314f0dcf4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -16,10 +16,12 @@ workspace = true [dependencies] arrow-array = { workspace = true } +arrow-schema = { workspace = true } arrow-select = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } csv = { workspace = true } +datafusion = { workspace = true } enum-iterator = { workspace = true } flexbuffers = { workspace = true } futures = { workspace = true } @@ -29,6 +31,7 @@ lazy_static = { workspace = true } log = { workspace = true } mimalloc = { workspace = true } parquet = { workspace = true, features = [] } +rand = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } simplelog = { workspace = true } @@ -37,6 +40,7 @@ uuid = { workspace = true, features = ["v4"] } vortex-alp = { path = "../encodings/alp" } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } +vortex-datafusion = { path = "../vortex-datafusion" } vortex-datetime-parts = { path = "../encodings/datetime-parts" } vortex-dict = { path = "../encodings/dict" } vortex-dtype = { path = "../vortex-dtype" } @@ -56,3 +60,7 @@ harness = false [[bench]] name = "random_access" harness = false + +[[bench]] +name = "datafusion_benchmark" +harness = false diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs new file mode 100644 index 0000000000..73c3ced0e0 --- /dev/null +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -0,0 +1,195 @@ +use std::sync::Arc; + +use arrow_array::builder::{StringBuilder, UInt32Builder}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use criterion::measurement::Measurement; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; +use datafusion::common::Result as DFResult; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::execution::memory_pool::human_readable_size; +use datafusion::logical_expr::lit; +use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext}; +use lazy_static::lazy_static; +use vortex::compress::Compressor; +use vortex::encoding::EncodingRef; +use vortex::{Array, Context, IntoArray, ToArrayData}; +use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_dict::DictEncoding; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; + +lazy_static! { + pub static ref CTX: Context = Context::default().with_encodings([ + &BitPackedEncoding as EncodingRef, + &DictEncoding, + &FoREncoding, + &DeltaEncoding, + ]); +} + +fn toy_dataset_arrow() -> RecordBatch { + // 64,000 rows of string and numeric data. + // 8,000 values of first string, second string, third string, etc. + + let names = [ + "Alexander", + "Anastasia", + "Archibald", + "Bartholomew", + "Benjamin", + "Christopher", + "Elizabeth", + "Gabriella", + ]; + + let mut col1 = StringBuilder::with_capacity(640_000, 64_000_000); + let mut col2 = UInt32Builder::with_capacity(640_000); + for i in 0..640_000 { + col1.append_value(names[i % 8]); + col2.append_value(u32::try_from(i).unwrap()); + } + + let col1 = col1.finish(); + let col2 = col2.finish(); + + RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("names", DataType::Utf8, false), + Field::new("scores", DataType::UInt32, false), + ])), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap() +} + +fn toy_dataset_vortex(compress: bool) -> Array { + let uncompressed = toy_dataset_arrow().to_array_data().into_array(); + + if !compress { + return uncompressed; + } + + println!( + "uncompressed size: {:?}", + human_readable_size(uncompressed.nbytes()) + ); + let compressor = Compressor::new(&CTX); + let compressed = compressor.compress(&uncompressed, None).unwrap(); + println!( + "vortex compressed size: {:?}", + human_readable_size(compressed.nbytes()) + ); + compressed +} + +fn filter_agg_query(df: DataFrame) -> DFResult { + // SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000 + df.filter(col("scores").gt_eq(lit(3_000)))? + .filter(col("scores").lt_eq(lit(4_000)))? + .aggregate(vec![], vec![count_distinct(col("names"))]) +} + +fn measure_provider( + group: &mut BenchmarkGroup, + session: &SessionContext, + table: Arc, +) { + group.bench_function("planning", |b| { + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + // Force physical planner to execute on our TableProvider. + filter_agg_query(black_box(session).read_table(table.clone()).unwrap()) + .unwrap() + .create_physical_plan() + .await + .unwrap(); + }); + }); + + group.bench_function("exec", |b| { + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + // Force full query execution with .collect() + filter_agg_query(black_box(session).read_table(table.clone()).unwrap()) + .unwrap() + .collect() + .await + .unwrap(); + }); + }); +} + +fn bench_arrow(mut group: BenchmarkGroup, session: &SessionContext) { + let arrow_dataset = toy_dataset_arrow(); + let arrow_table = + Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap()); + + measure_provider(&mut group, session, arrow_table); +} + +fn bench_vortex( + mut group: BenchmarkGroup, + session: &SessionContext, + disable_pushdown: bool, + compress: bool, +) { + let vortex_dataset = toy_dataset_vortex(compress); + let vortex_table = Arc::new( + VortexMemTable::try_new( + vortex_dataset, + VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown), + ) + .unwrap(), + ); + + measure_provider(&mut group, session, vortex_table); +} + +fn bench_datafusion(c: &mut Criterion) { + bench_arrow(c.benchmark_group("arrow"), &SessionContext::new()); + + // compress=true, pushdown enabled + bench_vortex( + c.benchmark_group("vortex-pushdown-compressed"), + &SessionContext::new(), + false, + true, + ); + + // compress=false, pushdown enabled + bench_vortex( + c.benchmark_group("vortex-pushdown-uncompressed"), + &SessionContext::new(), + false, + false, + ); + + // compress=true, pushdown disabled + bench_vortex( + c.benchmark_group("vortex-nopushdown-compressed"), + &SessionContext::new(), + true, + true, + ); + + // compress=false, pushdown disabled + bench_vortex( + c.benchmark_group("vortex-nopushdown-uncompressed"), + &SessionContext::new(), + true, + false, + ); +} + +criterion_group!(benches, bench_datafusion); +criterion_main!(benches); diff --git a/pyvortex/test/test_array.py b/pyvortex/test/test_array.py index 119beb914e..0825f184e3 100644 --- a/pyvortex/test/test_array.py +++ b/pyvortex/test/test_array.py @@ -18,10 +18,9 @@ def test_varbin_array_round_trip(): def test_varbin_array_take(): a = vortex.encode(pa.array(["a", "b", "c", "d"])) - # TODO(ngates): ensure we correctly round-trip to a string and not large_string assert a.take(vortex.encode(pa.array([0, 2]))).to_pyarrow().combine_chunks() == pa.array( ["a", "c"], - type=pa.large_utf8(), + type=pa.utf8(), ) diff --git a/requirements-dev.lock b/requirements-dev.lock index 69b5191d9f..c5b17d839f 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:. -e file:pyvortex diff --git a/requirements.lock b/requirements.lock index bf905daba0..cb652616f8 100644 --- a/requirements.lock +++ b/requirements.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:. -e file:pyvortex diff --git a/vortex-array/src/array/bool/accessors.rs b/vortex-array/src/array/bool/accessors.rs index e35b25f8bf..6c464f1cff 100644 --- a/vortex-array/src/array/bool/accessors.rs +++ b/vortex-array/src/array/bool/accessors.rs @@ -1,4 +1,3 @@ -use itertools::Itertools; use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; @@ -22,7 +21,6 @@ impl ArrayAccessor for BoolArray { Validity::AllInvalid => Ok(f(&mut (0..self.len()).map(|_| None))), Validity::Array(valid) => { let valids = valid.into_bool()?.boolean_buffer(); - println!("nulls: {:?}", valids.iter().collect_vec()); let mut iter = valids.iter().zip(bools.iter()).map(|(is_valid, value)| { if is_valid { Some(if value { &TRUE } else { &FALSE }) diff --git a/vortex-array/src/array/bool/compute/compare.rs b/vortex-array/src/array/bool/compute/compare.rs index d333c9cc33..a63e432cae 100644 --- a/vortex-array/src/array/bool/compute/compare.rs +++ b/vortex-array/src/array/bool/compute/compare.rs @@ -8,6 +8,7 @@ use crate::compute::compare::CompareFn; use crate::{Array, ArrayTrait, IntoArray, IntoArrayVariant}; impl CompareFn for BoolArray { + // TODO(aduffy): replace these with Arrow compute kernels. fn compare(&self, other: &Array, op: Operator) -> VortexResult { let flattened = other.clone().into_bool()?; let lhs = self.boolean_buffer(); diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index a51f2574be..d275b64e37 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -25,6 +25,8 @@ impl ConstantArray { Scalar: From, { let scalar: Scalar = scalar.into(); + // TODO(aduffy): add stats for bools, ideally there should be a + // StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls. let stats = StatsSet::from(HashMap::from([ (Stat::Max, scalar.clone()), (Stat::Min, scalar.clone()), diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 86a74c5092..0a2be3af00 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{FieldName, FieldNames, Nullability, StructDType}; -use vortex_error::{vortex_bail, vortex_err}; +use vortex_error::vortex_bail; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -27,6 +27,15 @@ impl StructArray { self.array().child(idx, dtype) } + pub fn field_by_name(&self, name: &str) -> Option { + let field_idx = self + .names() + .iter() + .position(|field_name| field_name.as_ref() == name); + + field_idx.and_then(|field_idx| self.field(field_idx)) + } + pub fn names(&self) -> &FieldNames { let DType::Struct(st, _) = self.dtype() else { unreachable!() @@ -111,6 +120,8 @@ impl StructArray { } impl StructArray { + // TODO(aduffy): Add equivalent function to support field masks for nested column access. + /// Return a new StructArray with the given projection applied. /// /// Projection does not copy data arrays. Projection is defined by an ordinal array slice @@ -118,15 +129,17 @@ impl StructArray { /// perform column re-ordering, deletion, or duplication at a logical level, without any data /// copying. /// - /// This function will return an error if the projection includes invalid column IDs. - pub fn project(self, projection: &[usize]) -> VortexResult { + /// # Panics + /// This function will panic an error if the projection references columns not within the + /// schema boundaries. + pub fn project(&self, projection: &[usize]) -> VortexResult { let mut children = Vec::with_capacity(projection.len()); let mut names = Vec::with_capacity(projection.len()); for column_idx in projection { children.push( self.field(*column_idx) - .ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?, + .expect("column must not exceed bounds"), ); names.push(self.names()[*column_idx].clone()); } diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs index 16bdf8fb21..2ce2b576f1 100644 --- a/vortex-array/src/array/varbin/compute/take.rs +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -49,7 +49,7 @@ fn take( return Ok(take_nullable(dtype, offsets, data, indices, v)); } - let mut builder = VarBinBuilder::::with_capacity(indices.len()); + let mut builder = VarBinBuilder::::with_capacity(indices.len()); for &idx in indices { let idx = idx.to_usize().unwrap(); let start = offsets[idx].to_usize().unwrap(); diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 6fbe21b251..2d737243fb 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -209,8 +209,7 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { let offsets = varbin_array .offsets() - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"); let offsets = match offsets.ptype() { PType::I32 | PType::I64 => offsets, diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 72c4caa5b0..615171e3f4 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -13,7 +13,9 @@ rust-version.workspace = true [dependencies] vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } +vortex-expr = { path = "../vortex-expr" } vortex-error = { path = "../vortex-error" } +vortex-scalar = { path = "../vortex-scalar" } arrow-array = { workspace = true } arrow-schema = { workspace = true } @@ -26,6 +28,8 @@ datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } +lazy_static = { workspace = true } pin-project = { workspace = true } [dev-dependencies] diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs index e53fb27f89..c4a0296b72 100644 --- a/vortex-datafusion/src/datatype.rs +++ b/vortex-datafusion/src/datatype.rs @@ -13,6 +13,14 @@ use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder}; use vortex_dtype::{DType, Nullability, PType}; +/// Convert a Vortex [struct DType][DType] to an Arrow [Schema]. +/// +/// To avoid ambiguity, Vortex types are mapped to the [DataType] that +/// +/// # Panics +/// +/// This function will panic if the provided `dtype` is not a StructDType, or if the struct DType +/// has top-level nullability. pub(crate) fn infer_schema(dtype: &DType) -> Schema { let DType::Struct(struct_dtype, nullable) = dtype else { panic!("only DType::Struct can be converted to arrow schema"); diff --git a/vortex-datafusion/src/expr.rs b/vortex-datafusion/src/expr.rs new file mode 100644 index 0000000000..62e348498d --- /dev/null +++ b/vortex-datafusion/src/expr.rs @@ -0,0 +1,58 @@ +use arrow_schema::SchemaRef; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; +use datafusion_common::{Result as DFResult, ToDFSchema}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::{and, lit, Expr}; + +/// Convert a set of expressions into a single AND expression. +/// +/// # Returns +/// +/// If conversion is successful, the result will be a +/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. +pub(crate) fn make_conjunction(exprs: impl AsRef<[Expr]>) -> DFResult { + Ok(exprs + .as_ref() + .iter() + .fold(lit(true), |conj, elem| and(conj, elem.clone()))) +} + +/// Simplify an expression using DataFusion's builtin analysis passes. +/// +/// This encapsulates common optimizations like constant folding and eliminating redundant +/// expressions, e.g. `value AND true`. +pub(crate) fn simplify_expr(expr: &Expr, schema: SchemaRef) -> DFResult { + let schema = schema.to_dfschema_ref()?; + + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(schema); + let simplifier = ExprSimplifier::new(context); + + simplifier.simplify(expr.clone()) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Schema}; + use datafusion_expr::{col, lit}; + + use super::*; + + #[test] + fn test_conjunction_simplify() { + let schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, false), + Field::new("bool_col", DataType::Boolean, false), + ])); + + let exprs = vec![col("int_col").gt_eq(lit(4)), col("bool_col").is_true()]; + + assert_eq!( + simplify_expr(&make_conjunction(&exprs).unwrap(), schema).unwrap(), + and(col("int_col").gt_eq(lit(4)), col("bool_col").is_true()) + ); + } +} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index ba7330848f..2529ecc4cf 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -1,7 +1,8 @@ //! Connectors to enable DataFusion to read Vortex data. use std::any::Any; -use std::fmt::Formatter; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -14,36 +15,63 @@ use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::SessionContext; -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; +use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, StreamExt}; +use itertools::Itertools; use pin_project::pin_project; use vortex::array::chunked::ChunkedArray; use vortex::array::struct_::StructArray; -use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use crate::datatype::infer_schema; +use crate::plans::{RowSelectorExec, TakeRowsExec}; mod datatype; +mod expr; +mod plans; + +/// Optional configurations to pass when loading a [VortexMemTable]. +#[derive(Default, Debug, Clone)] +pub struct VortexMemTableOptions { + pub disable_pushdown: bool, +} + +impl VortexMemTableOptions { + pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self { + self.disable_pushdown = disable_pushdown; + self + } +} pub trait SessionContextExt { - fn read_vortex(&self, array: Array) -> DFResult; + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; } impl SessionContextExt for SessionContext { - fn read_vortex(&self, array: Array) -> DFResult { + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { assert!( matches!(array.dtype(), DType::Struct(_, _)), "Vortex arrays must have struct type" ); - let vortex_table = VortexInMemoryTableProvider::try_new(array) + let vortex_table = VortexMemTable::try_new(array, options) .map_err(|error| DataFusionError::Internal(format!("vortex error: {error}")))?; self.read_table(Arc::new(vortex_table)) @@ -55,27 +83,32 @@ impl SessionContextExt for SessionContext { /// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as /// a table to DataFusion. #[derive(Debug, Clone)] -pub(crate) struct VortexInMemoryTableProvider { +pub struct VortexMemTable { array: Array, schema_ref: SchemaRef, + options: VortexMemTableOptions, } -impl VortexInMemoryTableProvider { +impl VortexMemTable { /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. - pub fn try_new(array: Array) -> VortexResult { - if !matches!(array.dtype(), DType::Struct(_, _)) { - vortex_bail!(InvalidArgument: "only DType::Struct arrays can produce a table provider"); - } - + /// + /// # Panics + /// + /// Creation will panic if the provided array is not of `DType::Struct` type. + pub fn try_new(array: Array, options: VortexMemTableOptions) -> VortexResult { let arrow_schema = infer_schema(array.dtype()); let schema_ref = SchemaRef::new(arrow_schema); - Ok(Self { array, schema_ref }) + Ok(Self { + array, + schema_ref, + options, + }) } } #[async_trait] -impl TableProvider for VortexInMemoryTableProvider { +impl TableProvider for VortexMemTable { fn as_any(&self) -> &dyn Any { self } @@ -94,108 +127,280 @@ impl TableProvider for VortexInMemoryTableProvider { /// The array is flattened directly into the nearest Arrow-compatible encoding. async fn scan( &self, - _state: &SessionState, + state: &SessionState, projection: Option<&Vec>, filters: &[Expr], _limit: Option, ) -> DFResult> { - if !filters.is_empty() { - return exec_err!("vortex does not support filter pushdown"); + fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection } + let filter_exprs = if filters.is_empty() { + None + } else { + Some(filters) + }; + let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) } else { Partitioning::UnknownPartitioning(1) }; - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(self.schema_ref.clone()), - partitioning, - ExecutionMode::Bounded, - ); + let output_projection: Vec = match projection { + None => (0..self.schema_ref.fields().len()).collect(), + Some(proj) => proj.clone(), + }; - Ok(Arc::new(VortexMemoryExec { - array: self.array.clone(), - projection: projection.cloned(), - plan_properties, - })) + match filter_exprs { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaining struct columns + // using the calculated indices from the filter. + Some(filter_exprs) => { + let filter_projection = + get_filter_projection(filter_exprs, self.schema_ref.clone()); + + Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + output_projection.clone(), + state, + )) + } + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => { + let output_schema = Arc::new( + self.schema_ref + .project(output_projection.as_slice()) + .expect("project output schema"), + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema), + partitioning, + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + scan_projection: output_projection.clone(), + plan_properties, + })) + } + } } fn supports_filters_pushdown( &self, filters: &[&Expr], ) -> DFResult> { - // TODO(aduffy): add support for filter pushdown - Ok(filters + // In the case the caller has configured this provider with filter pushdown disabled, + // do not attempt to apply any filters at scan time. + if self.options.disable_pushdown { + return Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()); + } + + filters .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) - .collect()) + .map(|expr| { + if can_be_pushed_down(expr)? { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + }) + .try_collect() + } +} + +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + schema: SchemaRef, + filter_exprs: &[Expr], + filter_projection: Vec, + array: Array, + output_projection: Vec, + _session_state: &SessionState, +) -> Arc { + let struct_array = StructArray::try_from(array).unwrap(); + + let filter_struct = struct_array + .project(filter_projection.as_slice()) + .expect("projecting filter struct"); + + let row_selector_op = Arc::new(RowSelectorExec::new(filter_exprs, &filter_struct)); + + Arc::new(TakeRowsExec::new( + schema.clone(), + &output_projection, + row_selector_op.clone(), + &struct_array, + )) +} + +/// Check if the given expression tree can be pushed down into the scan. +fn can_be_pushed_down(expr: &Expr) -> DFResult { + // If the filter references a column not known to our schema, we reject the filter for pushdown. + fn is_supported(expr: &Expr) -> bool { + match expr { + Expr::BinaryExpr(binary_expr) => { + // Both the left and right sides must be column expressions, scalars, or casts. + + match binary_expr.op { + // Initially, we will only support pushdown for basic boolean operators + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => true, + + // TODO(aduffy): add support for LIKE + // TODO(aduffy): add support for basic mathematical ops +-*/ + // TODO(aduffy): add support for conjunctions, assuming all of the + // left and right are valid expressions. + _ => false, + } + } + Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::Column(_) + | Expr::Literal(_) + // TODO(aduffy): ensure that cast can be pushed down. + | Expr::Cast(_) => true, + _ => false, + } + } + + // Visitor that traverses the expression tree and tracks if any unsupported expressions were + // encountered. + struct IsSupportedVisitor { + supported_expressions_only: bool, } + + impl TreeNodeVisitor<'_> for IsSupportedVisitor { + type Node = Expr; + + fn f_down(&mut self, node: &Self::Node) -> DFResult { + if !is_supported(node) { + self.supported_expressions_only = false; + return Ok(TreeNodeRecursion::Stop); + } + + Ok(TreeNodeRecursion::Continue) + } + } + + let mut visitor = IsSupportedVisitor { + supported_expressions_only: true, + }; + + // Traverse the tree. + // At the end of the traversal, the internal state of `visitor` will indicate if there were + // unsupported expressions encountered. + expr.visit(&mut visitor)?; + + Ok(visitor.supported_expressions_only) +} + +/// Extract out the columns from our table referenced by the expression. +fn get_column_references(expr: &Expr) -> HashSet { + let mut references = HashSet::new(); + + expr.apply(|node| match node { + Expr::Column(col) => { + references.insert(col.name.clone()); + + Ok(TreeNodeRecursion::Continue) + } + _ => Ok(TreeNodeRecursion::Continue), + }) + .unwrap(); + + references } /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] -struct VortexMemoryExec { +struct VortexScanExec { array: Array, - projection: Option>, + scan_projection: Vec, plan_properties: PlanProperties, } -impl DisplayAs for VortexMemoryExec { +impl DisplayAs for VortexScanExec { fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -impl VortexMemoryExec { - /// Read a single array chunk from the source as a RecordBatch. - /// - /// `array` must be a [`StructArray`] or flatten into one. Passing a different Array variant - /// may cause a panic. - fn execute_single_chunk( - array: Array, - projection: &Option>, - _context: Arc, - ) -> DFResult { - let data = array +/// Read a single array chunk from the source as a RecordBatch. +/// +/// # Errors +/// This function will return an Error if `array` is not struct-typed. It will also return an +/// error if the projection references columns +fn execute_unfiltered( + array: Array, + projection: &Vec, +) -> DFResult { + // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. + let struct_array = array + .clone() + .into_struct() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + + let projected_struct = struct_array + .project(projection.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + let batch = RecordBatch::from( + projected_struct .into_canonical() - .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))? - .into_array(); - - // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. - let struct_array = StructArray::try_from(data).expect("array must be StructArray"); - - let field_order = if let Some(projection) = projection { - projection.clone() - } else { - (0..struct_array.names().len()).collect() - }; - - let projected_struct = - struct_array - .project(field_order.as_slice()) - .map_err(|vortex_err| { - exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") - })?; - let batch = RecordBatch::from( - projected_struct - .into_canonical() - .expect("struct arrays must flatten") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ); - Ok(Box::pin(VortexRecordBatchStream { - schema_ref: batch.schema(), - inner: futures::stream::iter(vec![batch]), - })) - } + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: batch.schema(), + inner: futures::stream::iter(vec![batch]), + })) } +// Row selector stream. +// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays +// back down to the other systems here instead. + #[pin_project] -struct VortexRecordBatchStream { +pub(crate) struct VortexRecordBatchStream { schema_ref: SchemaRef, #[pin] @@ -227,7 +432,7 @@ where } } -impl ExecutionPlan for VortexMemoryExec { +impl ExecutionPlan for VortexScanExec { fn as_any(&self) -> &dyn Any { self } @@ -251,7 +456,7 @@ impl ExecutionPlan for VortexMemoryExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> DFResult { let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array @@ -261,7 +466,7 @@ impl ExecutionPlan for VortexMemoryExec { self.array.clone() }; - Self::execute_single_chunk(chunk, &self.projection, context) + execute_unfiltered(chunk, &self.scan_projection) } } @@ -275,13 +480,12 @@ mod test { use vortex::array::struct_::StructArray; use vortex::array::varbin::VarBinArray; use vortex::validity::Validity; - use vortex::IntoArray; + use vortex::{Array, IntoArray}; use vortex_dtype::{DType, Nullability}; - use crate::SessionContextExt; + use crate::{SessionContextExt, VortexMemTableOptions}; - #[tokio::test] - async fn test_datafusion_simple() { + fn presidents_array() -> Array { let names = VarBinArray::from_vec( vec![ "Washington", @@ -298,19 +502,59 @@ mod test { Validity::NonNullable, ); - let presidents = StructArray::from_fields(&[ + StructArray::from_fields(&[ ("president", names.into_array()), ("term_start", term_start.into_array()), ]) - .into_array(); + .into_array() + } + + #[tokio::test] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + #[tokio::test] + async fn test_datafusion_no_pushdown() { let ctx = SessionContext::new(); - let df = ctx.read_vortex(presidents).unwrap(); + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(); let distinct_names = df .filter(col("term_start").gt_eq(lit(1795))) .unwrap() + .filter(col("term_start").lt(lit(2000))) + .unwrap() .aggregate(vec![], vec![count_distinct(col("president"))]) .unwrap() .collect() diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs new file mode 100644 index 0000000000..2e0daf3159 --- /dev/null +++ b/vortex-datafusion/src/plans.rs @@ -0,0 +1,470 @@ +//! Physical operators needed to implement scanning of Vortex arrays with pushdown. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow_array::cast::AsArray; +use arrow_array::types::UInt64Type; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::{DFSchema, Result as DFResult}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_expr::Expr; +use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; +use futures::{ready, Stream}; +use lazy_static::lazy_static; +use pin_project::pin_project; +use vortex::array::struct_::StructArray; +use vortex::arrow::FromArrowArray; +use vortex::compute::take::take; +use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; + +use crate::datatype::infer_schema; +use crate::expr::{make_conjunction, simplify_expr}; + +/// Physical plan operator that applies a set of [filters][Expr] against the input, producing a +/// row mask that can be used downstream to force a take against the corresponding struct array +/// chunks but for different columns. +pub(crate) struct RowSelectorExec { + filter_exprs: Vec, + + // cached PlanProperties object. We do not make use of this. + cached_plan_props: PlanProperties, + + // A Vortex struct array that contains all columns necessary for executing the filter + // expressions. + filter_struct: StructArray, +} + +lazy_static! { + static ref ROW_SELECTOR_SCHEMA_REF: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "row_idx", + DataType::UInt64, + false + )])); +} + +impl RowSelectorExec { + pub(crate) fn new(filter_exprs: &[Expr], filter_struct: &StructArray) -> Self { + let cached_plan_props = PlanProperties::new( + EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + filter_exprs: filter_exprs.to_owned(), + filter_struct: filter_struct.clone(), + cached_plan_props, + } + } +} + +impl Debug for RowSelectorExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RowSelectorExec") + .field("filter_exprs", &self.filter_exprs) + .finish() + } +} + +impl DisplayAs for RowSelectorExec { + fn fmt_as( + &self, + _display_format_type: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for RowSelectorExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cached_plan_props + } + + fn children(&self) -> Vec<&Arc> { + // No children + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); + + let filter_struct = self.filter_struct.clone(); + let one_shot = Box::pin(async move { filter_struct.into_array() }); + + let conjunction_expr = simplify_expr( + &make_conjunction(&self.filter_exprs)?, + stream_schema.clone(), + )?; + + Ok(Box::pin(RowIndicesStream { + one_shot, + polled_inner: false, + conjunction_expr, + schema_ref: stream_schema, + context: context.clone(), + })) + } +} + +/// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. +#[pin_project::pin_project] +pub(crate) struct RowIndicesStream { + /// The inner future that returns `DFResult`. + /// This future should only poll one time. + #[pin] + one_shot: F, + + polled_inner: bool, + + conjunction_expr: Expr, + schema_ref: SchemaRef, + context: Arc, +} + +impl Stream for RowIndicesStream +where + F: Future, +{ + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + // If we have already polled the one-shot future of filter records, indicate + // that the stream has finished. + if *this.polled_inner { + return Poll::Ready(None); + } + + // Get the unfiltered record batch. + // Since this is a one-shot, we only want to poll the inner future once, to create the + // initial batch for us to process. + let vortex_struct = ready!(this.one_shot.poll(cx)); + *this.polled_inner = true; + + // Immediately convert to Arrow RecordBatch for processing. + // TODO(aduffy): attempt to pushdown the filter to Vortex without decoding. + let record_batch = RecordBatch::from( + vortex_struct + .into_canonical() + .unwrap() + .into_arrow() + .as_struct(), + ); + + // Generate a physical plan to execute the conjunction query against the filter columns. + // + // The result of a conjunction expression is a BooleanArray containing `true` for rows + // where the conjunction was satisfied, and `false` otherwise. + let df_schema = DFSchema::try_from(this.schema_ref.clone())?; + let physical_expr = + create_physical_expr(this.conjunction_expr, &df_schema, &Default::default())?; + let selection = physical_expr + .evaluate(&record_batch)? + .into_array(record_batch.num_rows())?; + + // Convert the `selection` BooleanArray into a UInt64Array of indices. + let selection_indices: Vec = selection + .as_boolean() + .clone() + .values() + .set_indices() + .map(|idx| idx as u64) + .collect(); + + let indices: ArrayRef = Arc::new(UInt64Array::from(selection_indices)); + let indices_batch = RecordBatch::try_new(ROW_SELECTOR_SCHEMA_REF.clone(), vec![indices])?; + + Poll::Ready(Some(Ok(indices_batch))) + } +} + +impl RecordBatchStream for RowIndicesStream +where + F: Future, +{ + fn schema(&self) -> SchemaRef { + self.schema_ref.clone() + } +} + +/// Physical that receives a stream of row indices from a child operator, and uses that to perform +/// a `take` operation on tha backing Vortex array. +pub(crate) struct TakeRowsExec { + plan_properties: PlanProperties, + + // Array storing the indices used to take the plan nodes. + projection: Vec, + + // Input plan, a stream of indices on which we perform a take against the original dataset. + input: Arc, + + output_schema: SchemaRef, + + // The original Vortex array holding the fields we have not decoded yet. + table: StructArray, +} + +impl TakeRowsExec { + pub(crate) fn new( + schema_ref: SchemaRef, + projection: &[usize], + row_indices: Arc, + table: &StructArray, + ) -> Self { + let output_schema = Arc::new(schema_ref.project(projection).unwrap()); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + plan_properties, + projection: projection.to_owned(), + input: row_indices, + output_schema: output_schema.clone(), + table: table.clone(), + } + } +} + +impl Debug for TakeRowsExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TakeRowsExec") + .field("projection", &self.projection) + .field("output_schema", &self.output_schema) + .finish() + } +} + +impl DisplayAs for TakeRowsExec { + fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for TakeRowsExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + let row_indices_stream = self.input.execute(partition, context)?; + + Ok(Box::pin(TakeRowsStream { + row_indices_stream, + completed: false, + output_projection: self.projection.clone(), + output_schema: self.output_schema.clone(), + vortex_array: self.table.clone(), + })) + } +} + +/// Stream of outputs emitted by the [TakeRowsExec] physical operator. +#[pin_project] +pub(crate) struct TakeRowsStream { + // Stream of row indices arriving from upstream operator. + #[pin] + row_indices_stream: F, + + completed: bool, + + // Projection based on the schema here + output_projection: Vec, + output_schema: SchemaRef, + + // The original Vortex array we're taking from + vortex_array: StructArray, +} + +impl Stream for TakeRowsStream +where + F: Stream>, +{ + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + // If `poll_next` has already fired, return None indicating end of the stream. + if *this.completed { + return Poll::Ready(None); + } + + // Get the indices provided by the upstream operator. + let record_batch = match ready!(this.row_indices_stream.poll_next(cx)) { + None => { + // Row indices stream is complete, we are also complete. + // This should never happen right now given we only emit one recordbatch upstream. + return Poll::Ready(None); + } + Some(result) => { + *this.completed = true; + result? + } + }; + + let row_indices = + ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) + .into_array(); + + // If no columns in the output projection, we send back a RecordBatch with empty schema. + // This is common for COUNT queries. + if this.output_projection.is_empty() { + let opts = RecordBatchOptions::new().with_row_count(Some(row_indices.len())); + return Poll::Ready(Some(Ok(RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &opts, + ) + .unwrap()))); + } + + // TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful. + // We should find a way to avoid decoding the filter columns and only decode the other + // columns, then stitch the StructArray back together from those. + let projected_for_output = this.vortex_array.project(this.output_projection).unwrap(); + let decoded = take(&projected_for_output.into_array(), &row_indices) + .expect("take") + .into_canonical() + .expect("into_canonical") + .into_arrow(); + + // Send back a single record batch of the decoded data. + let output_batch = RecordBatch::from(decoded.as_struct()); + + Poll::Ready(Some(Ok(output_batch))) + } +} + +impl RecordBatchStream for TakeRowsStream +where + F: Stream>, +{ + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{RecordBatch, UInt64Array}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_expr::{and, col, lit}; + use itertools::Itertools; + use vortex::array::bool::BoolArray; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::validity::Validity; + use vortex::IntoArray; + use vortex_dtype::FieldName; + + use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; + + #[tokio::test] + async fn test_filtering_stream() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt64, false), + Field::new("b", DataType::Boolean, false), + ])); + + let _schema = schema.clone(); + let one_shot = Box::pin(async move { + StructArray::try_new( + Arc::new([FieldName::from("a"), FieldName::from("b")]), + vec![ + PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), + BoolArray::from(vec![false, false, true]).into_array(), + ], + 3, + Validity::NonNullable, + ) + .unwrap() + .into_array() + }); + + let _schema = schema.clone(); + let filtering_stream = RowIndicesStream { + one_shot, + polled_inner: false, + conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), + schema_ref: _schema, + context: Arc::new(Default::default()), + }; + + let rows: Vec = futures::executor::block_on_stream(filtering_stream) + .try_collect() + .unwrap(); + + assert_eq!(rows.len(), 1); + + // The output of row selection is a RecordBatch of indices that can be used as selectors + // against the original RecordBatch. + assert_eq!( + rows[0], + RecordBatch::try_new( + ROW_SELECTOR_SCHEMA_REF.clone(), + vec![Arc::new(UInt64Array::from(vec![2u64])),] + ) + .unwrap() + ); + } +}