diff --git a/Cargo.lock b/Cargo.lock index cb9ac8d..2a1a81d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,9 +133,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8" +checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356" [[package]] name = "apache-avro" @@ -858,7 +858,7 @@ dependencies = [ [[package]] name = "datafusion" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "apache-avro", @@ -916,7 +916,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow-schema", "async-trait", @@ -924,12 +924,13 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-physical-plan", + "parking_lot", ] [[package]] name = "datafusion-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "apache-avro", @@ -954,7 +955,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "log", "tokio", @@ -963,7 +964,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "chrono", @@ -983,7 +984,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1004,7 +1005,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "datafusion-common", @@ -1014,7 +1015,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "arrow-buffer", @@ -1040,7 +1041,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1060,7 +1061,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1073,7 +1074,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "arrow-array", @@ -1095,7 +1096,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1106,7 +1107,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "async-trait", @@ -1125,7 +1126,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1156,7 +1157,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1169,7 +1170,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow-schema", "datafusion-common", @@ -1182,7 +1183,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "ahash", "arrow", @@ -1216,7 +1217,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "41.0.0" -source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?branch=main#e99c259407c014e1606908320f037ede72f6689d" +source = "git+https://github.com/probably-nothing-labs/arrow-datafusion?rev=e1558c0856cb84a823680ff2b53097b9372b798b#e1558c0856cb84a823680ff2b53097b9372b798b" dependencies = [ "arrow", "arrow-array", @@ -2450,9 +2451,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" dependencies = [ "bitflags 2.6.0", ] @@ -2531,9 +2532,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.36" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f55e80d50763938498dd5ebb18647174e0c76dc38c5505294bb224624f30f36" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.6.0", "errno", @@ -3022,9 +3023,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" diff --git a/Cargo.toml b/Cargo.toml index 6deccca..4fabc51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ denormalized = { path = "crates/core" } denormalized-common = { path = "crates/common" } denormalized-orchestrator = { path = "crates/orchestrator" } -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" } +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev = "e1558c0856cb84a823680ff2b53097b9372b798b" } arrow = { version = "53.0.0", features = ["prettyprint"] } arrow-array = { version = "53.0.0", default-features = false, features = [ diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 39ba88b..c652540 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -52,7 +52,7 @@ pub struct GroupedWindowAggStream { pub schema: SchemaRef, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - exec_aggregate_expressions: Vec>, + exec_aggregate_expressions: Vec, aggregate_expressions: Vec>>, filter_expressions: Vec>>, latest_watermark: Arc>>, @@ -87,8 +87,12 @@ impl GroupedWindowAggStream { .input .execute(partition, Arc::clone(&context))?; - let aggregate_expressions = - aggregate_expressions(&exec_operator.aggregate_expressions, &exec_operator.mode, 0)?; + let aggregate_expressions = aggregate_expressions( + exec_operator.aggregate_expressions.as_slice(), + &exec_operator.mode, + 0, + )?; + let filter_expressions = match exec_operator.mode { AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => { agg_filter_expr @@ -181,7 +185,7 @@ impl GroupedWindowAggStream { let accumulators: Vec<_> = self .exec_aggregate_expressions .iter() - .map(create_group_accumulator) + .map(|i| create_group_accumulator(&Arc::new(i.to_owned()))) .collect::>()?; let elapsed = start_time.elapsed().unwrap().as_millis(); let name = format!("GroupedHashAggregateStream WindowStart[{elapsed}]"); diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index c846887..f69b3d1 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -195,7 +195,7 @@ pub enum PhysicalStreamingWindowType { #[derive(Debug)] pub struct StreamingWindowExec { pub(crate) input: Arc, - pub aggregate_expressions: Vec>, + pub aggregate_expressions: Vec, pub filter_expressions: Vec>>, /// Schema after the window is run pub group_by: PhysicalGroupBy, @@ -216,7 +216,7 @@ impl StreamingWindowExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -248,7 +248,7 @@ impl StreamingWindowExec { pub fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + mut aggr_expr: Vec, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -365,7 +365,7 @@ impl StreamingWindowExec { PlanProperties::new(eq_properties, output_partitioning, ExecutionMode::Unbounded) } /// Aggregate expressions - pub fn aggr_expr(&self) -> &[Arc] { + pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] { &self.aggregate_expressions } @@ -610,7 +610,7 @@ pub struct WindowAggStream { pub schema: SchemaRef, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - exec_aggregate_expressions: Vec>, + exec_aggregate_expressions: Vec, aggregate_expressions: Vec>>, filter_expressions: Vec>>, latest_watermark: Arc>>, @@ -811,7 +811,7 @@ impl FullWindowAggFrame { pub fn new( start_time: SystemTime, end_time: SystemTime, - exec_aggregate_expressions: &[Arc], + exec_aggregate_expressions: &[AggregateFunctionExpr], aggregate_expressions: Vec>>, filter_expressions: Vec>>, @@ -852,7 +852,7 @@ struct FullWindowAggStream { pub schema: SchemaRef, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - exec_aggregate_expressions: Vec>, + exec_aggregate_expressions: Vec, aggregate_expressions: Vec>>, filter_expressions: Vec>>, cached_frames: BTreeMap, @@ -1058,7 +1058,7 @@ fn snap_to_window_start(timestamp: SystemTime, window_length: Duration) -> Syste fn create_schema( input_schema: &Schema, group_expr: &[(Arc, String)], - aggr_expr: &[Arc], + aggr_expr: &[AggregateFunctionExpr], contains_null_expr: bool, mode: AggregateMode, ) -> Result { diff --git a/crates/core/src/physical_plan/utils/accumulators.rs b/crates/core/src/physical_plan/utils/accumulators.rs index 87f0978..cb8318d 100644 --- a/crates/core/src/physical_plan/utils/accumulators.rs +++ b/crates/core/src/physical_plan/utils/accumulators.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use datafusion::common::Result; use datafusion::logical_expr::Accumulator; use datafusion::physical_expr::aggregate::AggregateFunctionExpr; @@ -7,7 +5,7 @@ use datafusion::physical_expr::aggregate::AggregateFunctionExpr; pub(crate) type AccumulatorItem = Box; pub(crate) fn create_accumulators( - aggr_expr: &[Arc], + aggr_expr: &[AggregateFunctionExpr], ) -> Result> { aggr_expr .iter()