Skip to content

Commit

Permalink
fix(df-repr): depjoin elimination rule (#212)
Browse files Browse the repository at this point in the history
This is not the most efficient way to implement this, but at least it
works. Reimplemented the eliminate depjoin rule, so that it can remove
depjoin and insert join when there are no correlated columns. This is
done by recursively inspecting whether `ExternCol` exists on the right
side of the plan tree. This also means that this eliminate depjoin rule
must be used as a heuristics rule.

Added a new subquery regression test and enabled TPC-H 1-5 to use the
optd logical optimizer.

---------

Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh authored Nov 3, 2024
1 parent 36e28ee commit 61b6e49
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 210 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions optd-datafusion-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ async-recursion = "1"
futures-lite = "2"
futures-util = "0.3"
itertools = "0.11"
tracing = "0.1"
21 changes: 21 additions & 0 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl OptdQueryPlanner {
}));
}
let mut optd_rel = ctx.conv_into_optd(logical_plan)?;

if let Some(explains) = &mut explains {
explains.push(StringifiedPlan::new(
PlanType::OptimizedLogicalPlan {
Expand All @@ -142,9 +143,17 @@ impl OptdQueryPlanner {
.explain_to_string(None),
));
}

tracing::trace!(
optd_unoptimized_plan = %("\n".to_string()
+ &PlanNode::from_rel_node(optd_rel.clone())
.unwrap()
.explain_to_string(None)));

let mut optimizer = self.optimizer.lock().unwrap().take().unwrap();

if optimizer.is_heuristic_enabled() {
// TODO: depjoin pushdown might need to run multiple times
optd_rel = optimizer.heuristic_optimize(optd_rel);
if let Some(explains) = &mut explains {
explains.push(StringifiedPlan::new(
Expand All @@ -156,6 +165,11 @@ impl OptdQueryPlanner {
.explain_to_string(None),
))
}
tracing::trace!(
optd_optimized_plan = %("\n".to_string()
+ &PlanNode::from_rel_node(optd_rel.clone())
.unwrap()
.explain_to_string(None)));
}

let (group_id, optimized_rel, meta) = optimizer.cascades_optimize(optd_rel)?;
Expand All @@ -180,6 +194,13 @@ impl OptdQueryPlanner {
join_orders.iter().map(|x| x.to_string()).join("\n"),
));
}

tracing::trace!(
optd_physical_plan = %("\n".to_string()
+ &PlanNode::from_rel_node(optimized_rel.clone())
.unwrap()
.explain_to_string(None)));

ctx.optimizer = Some(&optimizer);
let physical_plan = ctx.conv_from_optd(optimized_rel, meta).await?;
if let Some(explains) = &mut explains {
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use rules::{
pub use optd_core::rel_node::Value;

use crate::rules::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};

pub use memo_ext::{LogicalJoinOrder, MemoExt};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl DatafusionOptimizer {
Arc::new(EliminateLimitRule::new()),
Arc::new(EliminateDuplicatedSortExprRule::new()),
Arc::new(EliminateDuplicatedAggExprRule::new()),
Arc::new(DepJoinEliminateAtScan::new()),
Arc::new(DepJoinEliminate::new()),
Arc::new(DepInitialDistinct::new()),
Arc::new(DepJoinPastProj::new()),
Arc::new(DepJoinPastFilter::new()),
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ pub use project_transpose::{
project_merge::{EliminateProjectRule, ProjectMergeRule},
};
pub use subquery::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/rules/subquery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod depjoin_pushdown;

pub use depjoin_pushdown::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
DepInitialDistinct, DepJoinEliminate, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};
47 changes: 29 additions & 18 deletions optd-datafusion-repr/src/rules/subquery/depjoin_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,10 @@ fn apply_dep_join_past_agg(
vec![new_agg.into_rel_node().as_ref().clone()]
}

// Heuristics-only rule. If we don't have references to the external columns on the right side,
// we can rewrite the dependent join into a normal join.
define_rule!(
DepJoinEliminateAtScan,
DepJoinEliminate,
apply_dep_join_eliminate_at_scan, // TODO matching is all wrong
(DepJoin(JoinType::Cross), left, right, [cond], [extern_cols])
);
Expand All @@ -428,31 +430,40 @@ define_rule!(
/// for an inner join! Our main mission is complete!
fn apply_dep_join_eliminate_at_scan(
_optimizer: &impl Optimizer<OptRelNodeTyp>,
DepJoinEliminateAtScanPicks {
DepJoinEliminatePicks {
left,
right,
cond,
extern_cols: _,
}: DepJoinEliminateAtScanPicks,
}: DepJoinEliminatePicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
// TODO: Is there ever a situation we need to detect that we can convert earlier?
// Technically we can convert as soon as we clear the last externcolumnref...

// Cross join should always have true cond
assert!(cond == *ConstantExpr::bool(true).into_rel_node());

if right.typ != OptRelNodeTyp::Scan {
return vec![];
fn inspect(node: &RelNode<OptRelNodeTyp>) -> bool {
if matches!(node.typ, OptRelNodeTyp::Placeholder(_)) {
unimplemented!("this is a heuristics rule");
}
if node.typ == OptRelNodeTyp::ExternColumnRef {
return false;
}
for child in &node.children {
if !inspect(child) {
return false;
}
}
true
}

// let scan = LogicalScan::new("test".to_string()).into_rel_node();

let new_join = LogicalJoin::new(
PlanNode::from_group(left.into()),
PlanNode::from_group(right.into()),
ConstantExpr::bool(true).into_expr(),
JoinType::Inner,
);

vec![new_join.into_rel_node().as_ref().clone()]
if inspect(&right) {
let new_join = LogicalJoin::new(
PlanNode::from_group(left.into()),
PlanNode::from_group(right.into()),
ConstantExpr::bool(true).into_expr(),
JoinType::Inner,
);
vec![new_join.into_rel_node().as_ref().clone()]
} else {
vec![]
}
}
2 changes: 1 addition & 1 deletion optd-sqlplannertest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ Currently we have the following options for the explain task:
## Tracing a query

```
RUST_LOG=optd_core=trace cargo run -p optd-sqlplannertest --bin planner_test_apply -- pushdowns &> log
RUST_BACKTRACE=1 RUST_LOG=optd_core=trace,optd_datafusion_bridge=trace cargo run -p optd-sqlplannertest --bin planner_test_apply -- pushdowns &> log
```
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,87 @@ PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=9147.220000000001,io=3000
└── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
*/

-- Test whether the optimizer can unnest correlated subqueries.
select * from t1 where (select sum(t2v3) from (select * from t2, t3 where t2v1 = t1v1 and t2v3 = t3v2)) > 100;

/*
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalFilter
├── cond:Gt
│ ├── #2
│ └── 100(i64)
└── RawDependentJoin { join_type: Cross, cond: true, extern_cols: [ Extern(#0) ] }
├── LogicalScan { table: t1 }
└── LogicalProjection { exprs: [ #0 ] }
└── LogicalAgg
├── exprs:Agg(Sum)
│ └── [ Cast { cast_to: Int64, expr: #1 } ]
├── groups: []
└── LogicalProjection { exprs: [ #0, #1, #2, #3 ] }
└── LogicalFilter
├── cond:And
│ ├── Eq
│ │ ├── #0
│ │ └── Extern(#0)
│ └── Eq
│ ├── #1
│ └── #2
└── LogicalJoin { join_type: Cross, cond: true }
├── LogicalScan { table: t2 }
└── LogicalScan { table: t3 }
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalFilter
├── cond:Gt
│ ├── #2
│ └── 100(i64)
└── LogicalProjection { exprs: [ #0, #1, #3 ] }
└── LogicalJoin
├── join_type: Inner
├── cond:Eq
│ ├── #0
│ └── #2
├── LogicalScan { table: t1 }
└── LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg
├── exprs:Agg(Sum)
│ └── [ Cast { cast_to: Int64, expr: #2 } ]
├── groups: [ #1 ]
└── LogicalProjection { exprs: [ #0, #1, #2, #3, #4 ] }
└── LogicalFilter
├── cond:And
│ ├── Eq
│ │ ├── #1
│ │ └── #0
│ └── Eq
│ ├── #2
│ └── #3
└── LogicalJoin { join_type: Inner, cond: true }
├── LogicalAgg { exprs: [], groups: [ #0 ] }
│ └── LogicalScan { table: t1 }
└── LogicalJoin { join_type: Cross, cond: true }
├── LogicalScan { table: t2 }
└── LogicalScan { table: t3 }
PhysicalProjection { exprs: [ #2, #3 ], cost: {compute=10153.240000000002,io=4000}, stat: {row_cnt=1} }
└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=10145.220000000001,io=4000}, stat: {row_cnt=1} }
├── PhysicalAgg
│ ├── aggrs:Agg(Sum)
│ │ └── [ Cast { cast_to: Int64, expr: #2 } ]
│ ├── groups: [ #1 ]
│ ├── cost: {compute=9139.2,io=3000}
│ ├── stat: {row_cnt=1}
│ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ], cost: {compute=9051.080000000002,io=3000}, stat: {row_cnt=1} }
│ ├── PhysicalProjection { exprs: [ #2, #0, #1 ], cost: {compute=8045.06,io=2000}, stat: {row_cnt=1} }
│ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ], cost: {compute=8033.04,io=2000}, stat: {row_cnt=1} }
│ │ ├── PhysicalFilter
│ │ │ ├── cond:Gt
│ │ │ │ ├── #0
│ │ │ │ └── 100(i64)
│ │ │ ├── cost: {compute=5005,io=1000}
│ │ │ ├── stat: {row_cnt=1}
│ │ │ └── PhysicalScan { table: t2, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
│ │ └── PhysicalAgg { aggrs: [], groups: [ #0 ], cost: {compute=2022.0199999999995,io=1000}, stat: {row_cnt=1000} }
│ │ └── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
│ └── PhysicalScan { table: t3, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
└── PhysicalScan { table: t1, cost: {compute=0,io=1000}, stat: {row_cnt=1000} }
*/

5 changes: 5 additions & 0 deletions optd-sqlplannertest/tests/subqueries/subquery_unnesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@
desc: Test whether the optimizer can unnest correlated subqueries.
tasks:
- explain[verbose]:logical_optd,optimized_logical_optd,physical_optd
- sql: |
select * from t1 where (select sum(t2v3) from (select * from t2, t3 where t2v1 = t1v1 and t2v3 = t3v2)) > 100;
desc: Test whether the optimizer can unnest correlated subqueries.
tasks:
- explain[verbose]:logical_optd,optimized_logical_optd,physical_optd
Loading

0 comments on commit 61b6e49

Please sign in to comment.