From a8895fb11316c4ab5f080d85fd4670550815de39 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 19 Aug 2024 10:38:30 -0700 Subject: [PATCH] Update datafusion --- Cargo.lock | 99 +++++++++++++++---- Cargo.toml | 2 +- crates/core/src/logical_plan/mod.rs | 10 +- .../continuous/grouped_window_agg_stream.rs | 8 +- .../continuous/streaming_window.rs | 20 ++-- 5 files changed, 100 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a9ae2c..06037c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -797,7 +797,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -818,9 +818,11 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate", "datafusion-functions-nested", + "datafusion-functions-window", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-expr-functions-aggregate", "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", @@ -852,7 +854,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow-schema", "async-trait", @@ -865,7 +867,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -883,6 +885,7 @@ dependencies = [ "num_cpus", "object_store", "parquet", + "paste", "serde", "serde_json", "sqlparser", @@ -891,7 +894,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "tokio", ] @@ -899,7 +902,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow", "chrono", @@ -919,7 +922,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -927,6 +930,9 @@ dependencies = [ "arrow-buffer", "chrono", "datafusion-common", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr-common", "paste", "serde_json", "sqlparser", @@ -934,10 +940,20 @@ dependencies = [ "strum_macros 0.26.4", ] +[[package]] +name = "datafusion-expr-common" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" +dependencies = [ + "arrow", + "datafusion-common", + "paste", +] + [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow", "arrow-buffer", @@ -963,7 +979,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -971,16 +987,32 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr", "datafusion-physical-expr-common", + "half", "log", "paste", "sqlparser", ] +[[package]] +name = "datafusion-functions-aggregate-common" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr-common", + "datafusion-physical-expr-common", + "rand", +] + [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow", "arrow-array", @@ -998,10 +1030,21 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-window" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" +dependencies = [ + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", +] + [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow", "async-trait", @@ -1020,7 +1063,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -1034,6 +1077,8 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", "datafusion-physical-expr-common", "half", "hashbrown", @@ -1049,31 +1094,47 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", "datafusion-common", - "datafusion-expr", + "datafusion-expr-common", "hashbrown", "rand", ] +[[package]] +name = "datafusion-physical-expr-functions-aggregate" +version = "41.0.0" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr", + "datafusion-expr-common", + "datafusion-functions-aggregate-common", + "datafusion-physical-expr-common", + "rand", +] + [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-physical-expr", "datafusion-physical-plan", + "itertools 0.12.1", ] [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "ahash", "arrow", @@ -1088,8 +1149,10 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", "datafusion-physical-expr", "datafusion-physical-expr-common", + "datafusion-physical-expr-functions-aggregate", "futures", "half", "hashbrown", @@ -1106,7 +1169,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=fc67038#fc67038398c05f760b1bf910809444446d7348ba" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=80874c4#80874c424814574697e29585663cc52376a5e4fb" dependencies = [ "arrow", "arrow-array", @@ -2545,9 +2608,9 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.49.0" +version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a404d0e14905361b918cb8afdb73605e25c1d5029312bd9785142dcb3aa49e" +checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac" dependencies = [ "log", "sqlparser_derive", diff --git a/Cargo.toml b/Cargo.toml index cbdf84f..674bb9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,4 +55,4 @@ chrono = { version = "0.4.38", default-features = false } itertools = "0.13" [patch.crates-io] -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev="fc67038" } +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev = "80874c4" } diff --git a/crates/core/src/logical_plan/mod.rs b/crates/core/src/logical_plan/mod.rs index 234192a..3433994 100644 --- a/crates/core/src/logical_plan/mod.rs +++ b/crates/core/src/logical_plan/mod.rs @@ -33,18 +33,18 @@ impl StreamingLogicalPlanBuilder for LogicalPlanBuilder { window_length: Duration, slide: Option, ) -> Result { - let group_expr = normalize_cols(group_expr, &self.plan)?; - let aggr_expr = normalize_cols(aggr_expr, &self.plan)?; + let group_expr = normalize_cols(group_expr, self.plan())?; + let aggr_expr = normalize_cols(aggr_expr, self.plan())?; - let group_expr = add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?; + let group_expr = add_group_by_exprs_from_dependencies(group_expr, self.schema())?; let window: StreamingWindowType = slide.map_or_else( || StreamingWindowType::Tumbling(window_length), |_slide| StreamingWindowType::Sliding(window_length, _slide), ); - let plan = self.plan.clone(); + let plan = self.plan().clone(); - Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr) + Aggregate::try_new(Arc::new(plan.clone()), group_expr, aggr_expr) .map(|new_aggr| { LogicalPlan::Extension(Extension { node: Arc::new(StreamingWindowPlanNode { diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index e16eaf7..8cfb757 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -98,7 +98,7 @@ impl GroupedWindowAggStream { }; let group_by = exec_operator.group_by.clone(); - let group_schema = group_schema(&agg_schema, group_by.expr.len()); + let group_schema = group_schema(&agg_schema, group_by.expr().len()); Ok(Self { schema: agg_schema, input, @@ -461,7 +461,7 @@ pub(crate) fn evaluate_group_by( batch: &RecordBatch, ) -> Result>> { let exprs: Vec = group_by - .expr + .expr() .iter() .map(|(expr, _)| { let value = expr.evaluate(batch)?; @@ -470,7 +470,7 @@ pub(crate) fn evaluate_group_by( .collect::>>()?; let null_exprs: Vec = group_by - .null_expr + .null_expr() .iter() .map(|(expr, _)| { let value = expr.evaluate(batch)?; @@ -479,7 +479,7 @@ pub(crate) fn evaluate_group_by( .collect::>>()?; Ok(group_by - .groups + .groups() .iter() .map(|group| { group diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index e058aa0..361873a 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -218,7 +218,7 @@ impl FranzStreamingWindowExec { ) -> Result { let schema = create_schema( &input.schema(), - &group_by.expr, + group_by.expr(), &aggr_expr, group_by.contains_null(), mode, @@ -284,7 +284,7 @@ impl FranzStreamingWindowExec { }; // construct a map from the input expression to the output expression of the Aggregation group by - let projection_mapping = ProjectionMapping::try_new(&group_by.expr, &input.schema())?; + let projection_mapping = ProjectionMapping::try_new(group_by.expr(), &input.schema())?; let cache = FranzStreamingWindowExec::compute_properties( &input, @@ -437,9 +437,7 @@ impl ExecutionPlan for FranzStreamingWindowExec { fn statistics(&self) -> Result { let column_statistics = Statistics::unknown_column(&self.schema()); match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_by.expr.is_empty() => - { + AggregateMode::Final | AggregateMode::FinalPartitioned if self.group_by.is_empty() => { Ok(Statistics { num_rows: Precision::Exact(1), column_statistics, @@ -495,7 +493,7 @@ impl DisplayAs for FranzStreamingWindowExec { write!(f, "FranzStreamingWindowExec: mode={:?}", self.mode)?; let g: Vec = if self.group_by.is_single() { self.group_by - .expr + .expr() .iter() .map(|(e, alias)| { let e = e.to_string(); @@ -508,7 +506,7 @@ impl DisplayAs for FranzStreamingWindowExec { .collect() } else { self.group_by - .groups + .groups() .iter() .map(|group| { let terms = group @@ -516,17 +514,17 @@ impl DisplayAs for FranzStreamingWindowExec { .enumerate() .map(|(idx, is_null)| { if *is_null { - let (e, alias) = &self.group_by.null_expr[idx]; + let (e, alias) = &self.group_by.null_expr()[idx]; let e = e.to_string(); - if &e != alias { + if e != *alias { format!("{e} as {alias}") } else { e } } else { - let (e, alias) = &self.group_by.expr[idx]; + let (e, alias) = &self.group_by.expr()[idx]; let e = e.to_string(); - if &e != alias { + if e != *alias { format!("{e} as {alias}") } else { e