Skip to content

Commit

Permalink
Upgrade datafusion (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee authored Sep 12, 2024
1 parent 64b6e82 commit c5135cf
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 42 deletions.
53 changes: 27 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ denormalized = { path = "crates/core" }
denormalized-common = { path = "crates/common" }
denormalized-orchestrator = { path = "crates/orchestrator" }

datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", branch = "main" }
datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion", rev = "e1558c0856cb84a823680ff2b53097b9372b798b" }

arrow = { version = "53.0.0", features = ["prettyprint"] }
arrow-array = { version = "53.0.0", default-features = false, features = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct GroupedWindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
exec_aggregate_expressions: Vec<AggregateFunctionExpr>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
latest_watermark: Arc<Mutex<Option<SystemTime>>>,
Expand Down Expand Up @@ -87,8 +87,12 @@ impl GroupedWindowAggStream {
.input
.execute(partition, Arc::clone(&context))?;

let aggregate_expressions =
aggregate_expressions(&exec_operator.aggregate_expressions, &exec_operator.mode, 0)?;
let aggregate_expressions = aggregate_expressions(
exec_operator.aggregate_expressions.as_slice(),
&exec_operator.mode,
0,
)?;

let filter_expressions = match exec_operator.mode {
AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned => {
agg_filter_expr
Expand Down Expand Up @@ -181,7 +185,7 @@ impl GroupedWindowAggStream {
let accumulators: Vec<_> = self
.exec_aggregate_expressions
.iter()
.map(create_group_accumulator)
.map(|i| create_group_accumulator(&Arc::new(i.to_owned())))
.collect::<Result<_>>()?;
let elapsed = start_time.elapsed().unwrap().as_millis();
let name = format!("GroupedHashAggregateStream WindowStart[{elapsed}]");
Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub enum PhysicalStreamingWindowType {
#[derive(Debug)]
pub struct StreamingWindowExec {
pub(crate) input: Arc<dyn ExecutionPlan>,
pub aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
pub aggregate_expressions: Vec<AggregateFunctionExpr>,
pub filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
/// Schema after the window is run
pub group_by: PhysicalGroupBy,
Expand All @@ -216,7 +216,7 @@ impl StreamingWindowExec {
pub fn try_new(
mode: AggregateMode,
group_by: PhysicalGroupBy,
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
aggr_expr: Vec<AggregateFunctionExpr>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
Expand Down Expand Up @@ -248,7 +248,7 @@ impl StreamingWindowExec {
pub fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
mut aggr_expr: Vec<AggregateFunctionExpr>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
Expand Down Expand Up @@ -365,7 +365,7 @@ impl StreamingWindowExec {
PlanProperties::new(eq_properties, output_partitioning, ExecutionMode::Unbounded)
}
/// Aggregate expressions
pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] {
&self.aggregate_expressions
}

Expand Down Expand Up @@ -610,7 +610,7 @@ pub struct WindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
exec_aggregate_expressions: Vec<AggregateFunctionExpr>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
latest_watermark: Arc<Mutex<Option<SystemTime>>>,
Expand Down Expand Up @@ -811,7 +811,7 @@ impl FullWindowAggFrame {
pub fn new(
start_time: SystemTime,
end_time: SystemTime,
exec_aggregate_expressions: &[Arc<AggregateFunctionExpr>],
exec_aggregate_expressions: &[AggregateFunctionExpr],
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,

Expand Down Expand Up @@ -852,7 +852,7 @@ struct FullWindowAggStream {
pub schema: SchemaRef,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
exec_aggregate_expressions: Vec<Arc<AggregateFunctionExpr>>,
exec_aggregate_expressions: Vec<AggregateFunctionExpr>,
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
cached_frames: BTreeMap<SystemTime, FullWindowAggFrame>,
Expand Down Expand Up @@ -1058,7 +1058,7 @@ fn snap_to_window_start(timestamp: SystemTime, window_length: Duration) -> Syste
fn create_schema(
input_schema: &Schema,
group_expr: &[(Arc<dyn PhysicalExpr>, String)],
aggr_expr: &[Arc<AggregateFunctionExpr>],
aggr_expr: &[AggregateFunctionExpr],
contains_null_expr: bool,
mode: AggregateMode,
) -> Result<Schema> {
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/physical_plan/utils/accumulators.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::logical_expr::Accumulator;
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;

pub(crate) type AccumulatorItem = Box<dyn Accumulator>;

pub(crate) fn create_accumulators(
aggr_expr: &[Arc<AggregateFunctionExpr>],
aggr_expr: &[AggregateFunctionExpr],
) -> Result<Vec<AccumulatorItem>> {
aggr_expr
.iter()
Expand Down

0 comments on commit c5135cf

Please sign in to comment.