Skip to content

Commit

Permalink
feat: unnesting arbitrary subqueries (likely broken) (#180)
Browse files Browse the repository at this point in the history
Somewhere between a proof of concept and a draft—work still heavily in
progress. Will already successfully parse and fully unnest a subset of
correlated and uncorrelated subqueries (although I am uncertain about
correctness).

**TODO**:
- [ ] Formal testing
- [ ] EXISTS clauses
- [ ] IN clauses
- [ ] ANY/ALL clauses
- [ ] Correctness issue with COUNT(*) (requires adding left outer join
to plan)
- [x] Move some/all of this to rewriting stage to support multiple
subqueries/ordering operations
- [x] “Sideways information passing” (subplans are duplicated now
instead of making a DAG)
- It seems that a DAG representation is only supported by looking for
groups that appear the same. It looks to me that the cloned branches
generated by this PR are indeed marked with the same group ID. I marked
this bullet point as completed with this in mind.
- [ ] Support more pushdowns (e.g. limit, joins)
- [ ] Optimizations from the paper are all missing (Out of scope?)

---------

Signed-off-by: Alex Chi <[email protected]>
Co-authored-by: Alex Chi <[email protected]>
  • Loading branch information
jurplel and skyzh authored Oct 24, 2024
1 parent f8f714c commit 5065c42
Show file tree
Hide file tree
Showing 14 changed files with 898 additions and 72 deletions.
288 changes: 227 additions & 61 deletions optd-datafusion-bridge/src/into_optd.rs

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use rules::{

pub use optd_core::rel_node::Value;

use crate::rules::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};

pub mod cost;
mod explain;
pub mod plan_nodes;
Expand Down Expand Up @@ -88,6 +92,11 @@ impl DatafusionOptimizer {
Arc::new(EliminateLimitRule::new()),
Arc::new(EliminateDuplicatedSortExprRule::new()),
Arc::new(EliminateDuplicatedAggExprRule::new()),
Arc::new(DepJoinEliminateAtScan::new()),
Arc::new(DepInitialDistinct::new()),
Arc::new(DepJoinPastProj::new()),
Arc::new(DepJoinPastFilter::new()),
Arc::new(DepJoinPastAgg::new()),
Arc::new(ProjectMergeRule::new()),
Arc::new(FilterMergeRule::new()),
]
Expand Down Expand Up @@ -159,7 +168,7 @@ impl DatafusionOptimizer {
),
hueristic_optimizer: HeuristicsOptimizer::new_with_rules(
heuristic_rules,
ApplyOrder::BottomUp,
ApplyOrder::TopDown, // uhh TODO reconsider
property_builders.clone(),
),
enable_adaptive,
Expand Down
32 changes: 29 additions & 3 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(super) mod macros;
mod projection;
mod scan;
mod sort;
mod subquery;

use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -37,6 +38,7 @@ use pretty_xmlish::{Pretty, PrettyConfig};
pub use projection::{LogicalProjection, PhysicalProjection};
pub use scan::{LogicalScan, PhysicalScan};
pub use sort::{LogicalSort, PhysicalSort};
pub use subquery::{DependentJoin, ExternColumnRefExpr, RawDependentJoin}; // Add missing import

use crate::properties::schema::{Schema, SchemaPropertyBuilder};

Expand All @@ -52,6 +54,8 @@ pub enum OptRelNodeTyp {
Filter,
Scan,
Join(JoinType),
RawDepJoin(JoinType),
DepJoin(JoinType),
Sort,
Agg,
Apply(ApplyType),
Expand All @@ -70,6 +74,7 @@ pub enum OptRelNodeTyp {
// Expressions
Constant(ConstantType),
ColumnRef,
ExternColumnRef,
UnOp(UnOpType),
BinOp(BinOpType),
LogOp(LogOpType),
Expand All @@ -90,6 +95,8 @@ impl OptRelNodeTyp {
| Self::Filter
| Self::Scan
| Self::Join(_)
| Self::RawDepJoin(_)
| Self::DepJoin(_)
| Self::Apply(_)
| Self::Sort
| Self::Agg
Expand All @@ -112,6 +119,7 @@ impl OptRelNodeTyp {
self,
Self::Constant(_)
| Self::ColumnRef
| Self::ExternColumnRef
| Self::UnOp(_)
| Self::BinOp(_)
| Self::LogOp(_)
Expand Down Expand Up @@ -295,7 +303,7 @@ impl Expr {
/// the call stack, and no expression will be returned.
pub fn rewrite_column_refs(
&self,
rewrite_fn: &impl Fn(usize) -> Option<usize>,
rewrite_fn: &mut impl FnMut(usize) -> Option<usize>,
) -> Option<Self> {
assert!(self.typ().is_expression());
if let OptRelNodeTyp::ColumnRef = self.typ() {
Expand All @@ -314,8 +322,17 @@ impl Expr {
.into_iter()
.map(|child| {
if child.typ == OptRelNodeTyp::List {
// TODO: What should we do with List?
return Some(child);
return Some(
ExprList::new(
ExprList::from_rel_node(child.clone())
.unwrap()
.to_vec()
.into_iter()
.map(|x| x.rewrite_column_refs(rewrite_fn).unwrap())
.collect(),
)
.into_rel_node(),
);
}
Expr::from_rel_node(child.clone())
.unwrap()
Expand Down Expand Up @@ -392,6 +409,9 @@ pub fn explain(rel_node: OptRelNodeRef, meta_map: Option<&RelNodeMetaMap>) -> Pr
OptRelNodeTyp::ColumnRef => ColumnRefExpr::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
OptRelNodeTyp::ExternColumnRef => ExternColumnRefExpr::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
OptRelNodeTyp::Constant(_) => ConstantExpr::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
Expand All @@ -407,6 +427,12 @@ pub fn explain(rel_node: OptRelNodeRef, meta_map: Option<&RelNodeMetaMap>) -> Pr
OptRelNodeTyp::Join(_) => LogicalJoin::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
OptRelNodeTyp::RawDepJoin(_) => RawDependentJoin::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
OptRelNodeTyp::DepJoin(_) => DependentJoin::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
OptRelNodeTyp::Scan => LogicalScan::from_rel_node(rel_node)
.unwrap()
.dispatch_explain(meta_map),
Expand Down
78 changes: 78 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/subquery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use optd_core::rel_node::{RelNode, RelNodeMetaMap, Value};
use pretty_xmlish::Pretty;

use super::macros::define_plan_node;
use super::{Expr, ExprList, JoinType, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

#[derive(Clone, Debug)]
pub struct RawDependentJoin(pub PlanNode);

define_plan_node!(
RawDependentJoin : PlanNode,
RawDepJoin, [
{ 0, left: PlanNode },
{ 1, right: PlanNode }
], [
{ 2, cond: Expr },
{ 3, extern_cols: ExprList }
], { join_type: JoinType }
);

#[derive(Clone, Debug)]
pub struct DependentJoin(pub PlanNode);

define_plan_node!(
DependentJoin : PlanNode,
DepJoin, [
{ 0, left: PlanNode },
{ 1, right: PlanNode }
], [
{ 2, cond: Expr },
{ 3, extern_cols: ExprList }
], { join_type: JoinType }
);

#[derive(Clone, Debug)]
pub struct ExternColumnRefExpr(pub Expr);

impl ExternColumnRefExpr {
/// Creates a new `DepExternColumnRef` expression.
pub fn new(column_idx: usize) -> ExternColumnRefExpr {
// this conversion is always safe since usize is at most u64
let u64_column_idx = column_idx as u64;
ExternColumnRefExpr(Expr(
RelNode {
typ: OptRelNodeTyp::ExternColumnRef,
children: vec![],
data: Some(Value::UInt64(u64_column_idx)),
}
.into(),
))
}

fn get_data_usize(&self) -> usize {
self.0 .0.data.as_ref().unwrap().as_u64() as usize
}

/// Gets the column index.
pub fn index(&self) -> usize {
self.get_data_usize()
}
}

impl OptRelNode for ExternColumnRefExpr {
fn into_rel_node(self) -> OptRelNodeRef {
self.0.into_rel_node()
}

fn from_rel_node(rel_node: OptRelNodeRef) -> Option<Self> {
if rel_node.typ != OptRelNodeTyp::ExternColumnRef {
return None;
}
Expr::from_rel_node(rel_node).map(Self)
}

fn dispatch_explain(&self, _meta_map: Option<&RelNodeMetaMap>) -> Pretty<'static> {
Pretty::display(&format!("Extern(#{})", self.index()))
}
}
3 changes: 2 additions & 1 deletion optd-datafusion-repr/src/properties/column_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ impl PropertyBuilder<OptRelNodeTyp> for ColumnRefPropertyBuilder {
GroupColumnRefs::new(column_refs, child.output_correlation.clone())
}
// Should account for all physical join types.
OptRelNodeTyp::Join(join_type) => {
OptRelNodeTyp::Join(join_type) | OptRelNodeTyp::RawDepJoin(join_type) | OptRelNodeTyp::DepJoin(join_type)=> {
// Concatenate left and right children column refs.
let column_refs = Self::concat_children_col_refs(&children[0..2]);
// Merge the equal columns of two children as input correlation.
Expand Down Expand Up @@ -465,6 +465,7 @@ impl PropertyBuilder<OptRelNodeTyp> for ColumnRefPropertyBuilder {
GroupColumnRefs::new(column_refs, correlation)
}
OptRelNodeTyp::Constant(_)
| OptRelNodeTyp::ExternColumnRef // TODO Possibly very very wrong---consult cost model team
| OptRelNodeTyp::Func(_)
| OptRelNodeTyp::DataType(_)
| OptRelNodeTyp::Between
Expand Down
2 changes: 1 addition & 1 deletion optd-datafusion-repr/src/properties/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl PropertyBuilder<OptRelNodeTyp> for SchemaPropertyBuilder {
}
OptRelNodeTyp::Projection => children[1].clone(),
OptRelNodeTyp::Filter => children[0].clone(),
OptRelNodeTyp::Join(_) => {
OptRelNodeTyp::DepJoin(_) | OptRelNodeTyp::Join(_) => {
let mut schema = children[0].clone();
let schema2 = children[1].clone();
schema.fields.extend(schema2.fields);
Expand Down
4 changes: 4 additions & 0 deletions optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod joins;
mod macros;
mod physical;
mod project_transpose;
mod subquery;

// pub use filter_join::FilterJoinPullUpRule;
pub use eliminate_duplicated_expr::{
Expand All @@ -25,3 +26,6 @@ pub use project_transpose::{
project_join_transpose::ProjectionPullUpJoin,
project_merge::ProjectMergeRule,
};
pub use subquery::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};
7 changes: 6 additions & 1 deletion optd-datafusion-repr/src/rules/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! At a high level, filter pushdown is responsible for pushing the filter node
//! further down the query plan whenever it is possible to do so.

use core::panic;
use std::collections::{HashMap, HashSet};
use std::vec;

Expand Down Expand Up @@ -64,6 +65,10 @@ fn determine_join_cond_dep(
left_col = true;
} else if index >= left_schema_size && index < left_schema_size + right_schema_size {
right_col = true;
} else {
panic!(
"Column index {index} out of bounds {left_schema_size} + {right_schema_size}"
);
}
}
}
Expand Down Expand Up @@ -238,7 +243,7 @@ fn filter_join_transpose(
match location {
JoinCondDependency::Left => left_conds.push(expr),
JoinCondDependency::Right => right_conds.push(
expr.rewrite_column_refs(&|idx| {
expr.rewrite_column_refs(&mut |idx| {
Some(LogicalJoin::map_through_join(
idx,
left_schema_size,
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-repr/src/rules/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn apply_join_commute(
let right_schema = optimizer.get_property::<SchemaPropertyBuilder>(Arc::new(right.clone()), 0);
let cond = Expr::from_rel_node(cond.into())
.unwrap()
.rewrite_column_refs(&|idx| {
.rewrite_column_refs(&mut |idx| {
Some(if idx < left_schema.len() {
idx + right_schema.len()
} else {
Expand Down Expand Up @@ -129,7 +129,7 @@ fn apply_join_assoc(

let cond2 = Expr::from_rel_node(cond2.into()).unwrap();

let Some(cond2) = cond2.rewrite_column_refs(&|idx| {
let Some(cond2) = cond2.rewrite_column_refs(&mut |idx| {
if idx < a_schema.len() {
None
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl ProjectionMapping {
/// Join { cond: #1=#4 }
pub fn rewrite_join_cond(&self, cond: Expr, child_schema_len: usize) -> Expr {
let schema_size = self.forward.len();
cond.rewrite_column_refs(&|col_idx| {
cond.rewrite_column_refs(&mut |col_idx| {
if col_idx < schema_size {
self.projection_col_maps_to(col_idx)
} else {
Expand All @@ -79,7 +79,7 @@ impl ProjectionMapping {
/// ---->
/// Filter { cond: #1=0 and #4=1 }
pub fn rewrite_filter_cond(&self, cond: Expr, is_added: bool) -> Expr {
cond.rewrite_column_refs(&|col_idx| {
cond.rewrite_column_refs(&mut |col_idx| {
if is_added {
self.original_col_maps_to(col_idx)
} else {
Expand Down
5 changes: 5 additions & 0 deletions optd-datafusion-repr/src/rules/subquery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod depjoin_pushdown;

pub use depjoin_pushdown::{
DepInitialDistinct, DepJoinEliminateAtScan, DepJoinPastAgg, DepJoinPastFilter, DepJoinPastProj,
};
Loading

0 comments on commit 5065c42

Please sign in to comment.