Skip to content

Commit

Permalink
[SPARK-49646][SQL] fix subquery decorrelation for union/set operation…
Browse files Browse the repository at this point in the history
…s when parentOuterReferences has references not covered in collectedChildOuterReferences

### What changes were proposed in this pull request?
fix bug when encounter union/setOp under limit/aggregation with filter predicates cannot pulled up directly in lateral join. eg:
```
create table IF NOT EXISTS t(t1 INT,t2 int) using json;
CREATE TABLE IF NOT EXISTS a (a1 INT) using json;

select 1
from t as t_outer
left join
   lateral(
       select b1,b2
       from
       (
           select
               a.a1 as b1,
               1 as b2
           from a
           union
           select t_outer.t1 as b1,
                  null as b2
       ) as t_inner
       where (t_inner.b1 < t_outer.t2  or t_inner.b1 is null) and  t_inner.b1 = t_outer.t1
       order by t_inner.b1,t_inner.b2 desc limit 1
   ) as lateral_table
```

### Why are the changes needed?
In general, spark cannot handle this query because:
1. Decorrelation logic tries to rewrite limit operator into Window aggregation and pull up correlated predicates, and Union operator is rewritten to have DomainJoin within its children with outer references.
2. When we're rewriting DomainJoin to real join execution, it needs attribute reference map based on pulled up correlated predicates to rewrite outer references in DomainJoin. However, each child of Union/SetOp operator are using different attribute references even they are referring to the same column of outer table. We need Union/SetOp output and its children output to map between these references.
3. Combined with aggregation and filters with inequality comparison, more outer references are remained within children of Union operator, and these references are not covered in Union/SetOp output which leads to lacking of information when we're trying to map different attributed references within children of Union/SetOp operator.

More context -> please read this short investigation doc(I've changed the link and it's now public):
https://docs.google.com/document/d/1_pJIi_8GuLHOXabLEgRy2e7OHw-OIBnWbwGwSkwIcxg/edit?usp=sharing

### Does this PR introduce _any_ user-facing change?

yes, bug is fixed and the above query can be handled without error.

### How was this patch tested?

added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48109 from averyqi-db/averyqi-db/SPARK-49646.

Authored-by: Avery Qi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
averyqi-db authored and cloud-fan committed Sep 16, 2024
1 parent b4f4d9b commit 738db07
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
// Project, they could get added at the beginning or the end of the output columns
// depending on the child plan.
// The inner expressions for the domain are the values of newOuterReferenceMap.
val domainProjections = collectedChildOuterReferences.map(newOuterReferenceMap(_))
val domainProjections = newOuterReferences.map(newOuterReferenceMap(_))
val newChild = Project(child.output ++ domainProjections, decorrelatedChild)
(newChild, newJoinCond, newOuterReferenceMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3017,6 +3017,53 @@ Project [c1#x, c2#x, t#x]
+- LocalRelation [col1#x, col2#x]


-- !query
select 1
from t1 as t_outer
left join
lateral(
select b1,b2
from
(
select
t2.c1 as b1,
1 as b2
from t2
union
select t_outer.c1 as b1,
null as b2
) as t_inner
where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
and t_inner.b1 = t_outer.c1
order by t_inner.b1,t_inner.b2 desc limit 1
) as lateral_table
-- !query analysis
Project [1 AS 1#x]
+- LateralJoin lateral-subquery#x [c2#x && c1#x && c1#x], LeftOuter
: +- SubqueryAlias lateral_table
: +- GlobalLimit 1
: +- LocalLimit 1
: +- Sort [b1#x ASC NULLS FIRST, b2#x DESC NULLS LAST], true
: +- Project [b1#x, b2#x]
: +- Filter (((b1#x < outer(c2#x)) OR isnull(b1#x)) AND (b1#x = outer(c1#x)))
: +- SubqueryAlias t_inner
: +- Distinct
: +- Union false, false
: :- Project [c1#x AS b1#x, 1 AS b2#x]
: : +- SubqueryAlias spark_catalog.default.t2
: : +- View (`spark_catalog`.`default`.`t2`, [c1#x, c2#x])
: : +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
: : +- LocalRelation [col1#x, col2#x]
: +- Project [b1#x, cast(b2#x as int) AS b2#x]
: +- Project [outer(c1#x) AS b1#x, null AS b2#x]
: +- OneRowRelation
+- SubqueryAlias t_outer
+- SubqueryAlias spark_catalog.default.t1
+- View (`spark_catalog`.`default`.`t1`, [c1#x, c2#x])
+- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+- LocalRelation [col1#x, col2#x]


-- !query
DROP VIEW t1
-- !query analysis
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,27 @@ select * from t1 join lateral
(select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
order by foo.t limit 5);


select 1
from t1 as t_outer
left join
lateral(
select b1,b2
from
(
select
t2.c1 as b1,
1 as b2
from t2
union
select t_outer.c1 as b1,
null as b2
) as t_inner
where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
and t_inner.b1 = t_outer.c1
order by t_inner.b1,t_inner.b2 desc limit 1
) as lateral_table;

-- clean up
DROP VIEW t1;
DROP VIEW t2;
Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,33 @@ struct<c1:int,c2:int,t:int>
1 2 3


-- !query
select 1
from t1 as t_outer
left join
lateral(
select b1,b2
from
(
select
t2.c1 as b1,
1 as b2
from t2
union
select t_outer.c1 as b1,
null as b2
) as t_inner
where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
and t_inner.b1 = t_outer.c1
order by t_inner.b1,t_inner.b2 desc limit 1
) as lateral_table
-- !query schema
struct<1:int>
-- !query output
1
1


-- !query
DROP VIEW t1
-- !query schema
Expand Down

0 comments on commit 738db07

Please sign in to comment.