From 876582bd12fd13407fc73cea227f7bc5caa280f0 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 31 Jul 2023 10:03:34 -0700 Subject: [PATCH 01/12] tests --- .../scalar-subquery-set-op.sql.out | 22 +++++++++++++++++ .../scalar-subquery-set-op.sql | 5 ++++ .../scalar-subquery-set-op.sql.out | 24 +++++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out index 0f53d05ac3e7d..8517da127cefb 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out @@ -1802,3 +1802,25 @@ org.apache.spark.sql.AnalysisException "fragment" : "SELECT sum(t0a) as d\n FROM t1" } ] } + + +-- !query +SELECT 1 +FROM t0 +WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "Filter (t1a#x = outer(t0b#x))\n+- SubqueryAlias t1\n +- View (`t1`, [t1a#x,t1b#x,t1c#x])\n +- Project [cast(col1#x as int) AS t1a#x, cast(col2#x as int) AS t1b#x, cast(col3#x as int) AS t1c#x]\n +- LocalRelation [col1#x, col2#x, col3#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 55, + "stopIndex" : 142, + "fragment" : "SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql index 8f03f7e41004b..bc6013963c65d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql @@ -619,3 +619,8 @@ SELECT t0a, (SELECT sum(d) FROM FROM t2) ) FROM t0; + + +SELECT 1 +FROM t0 +WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp); \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out index d21873cd3c8ed..619be3d15f754 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out @@ -1041,3 +1041,27 @@ org.apache.spark.sql.AnalysisException "fragment" : "SELECT sum(t0a) as d\n FROM t1" } ] } + + +-- !query +SELECT 1 +FROM t0 +WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "Filter (t1a#x = outer(t0b#x))\n+- SubqueryAlias t1\n +- View (`t1`, [t1a#x,t1b#x,t1c#x])\n +- Project [cast(col1#x as int) AS t1a#x, cast(col2#x as int) AS t1b#x, cast(col3#x as int) AS t1c#x]\n +- LocalRelation [col1#x, col2#x, col3#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 55, + "stopIndex" : 142, + "fragment" : "SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b" + } ] +} From 51376ac0d6c5a11701158864ddeac0efb7216f82 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 7 Aug 2023 12:54:57 -0700 Subject: [PATCH 02/12] Support window functions in correlated scalar subqueries --- .../sql/catalyst/analysis/CheckAnalysis.scala | 5 ++ .../optimizer/DecorrelateInnerQuery.scala | 19 ++++ .../DecorrelateInnerQuerySuite.scala | 44 ++++++++++ .../subquery/in-subquery/in-group-by.sql.out | 33 +++++++ .../scalar-subquery-predicate.sql.out | 87 +++++++++++++++++++ .../subquery/in-subquery/in-group-by.sql | 7 +- .../scalar-subquery-predicate.sql | 21 +++++ .../subquery/in-subquery/in-group-by.sql.out | 25 ++++++ .../scalar-subquery-predicate.sql.out | 54 ++++++++++++ 9 files changed, 293 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 8b04c8108bd08..caa1bbda46669 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1328,6 +1328,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB failOnInvalidOuterReference(a) checkPlan(a.child, aggregated = true, canContainOuter) + // Same as Aggregate above. + case w: Window => + failOnInvalidOuterReference(w) + checkPlan(w.child, aggregated = true, canContainOuter) + // Distinct does not host any correlated expressions, but during the optimization phase // it will be rewritten as Aggregate, which can only be on a correlation path if the // correlation contains only the supported correlated equality predicates. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index 86fa78e96a5f6..b266aaadd8dce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -654,6 +654,25 @@ object DecorrelateInnerQuery extends PredicateHelper { val newProject = Project(newProjectList ++ referencesToAdd, newChild) (newProject, joinCond, outerReferenceMap) + case w@Window(projectList, partitionSpec, orderSpec, child) => + val outerReferences = collectOuterReferences(w.expressions) + assert(outerReferences.isEmpty, s"Correlated column is not allowed in window " + + s"function: $w") + val newOuterReferences = parentOuterReferences ++ outerReferences + val (newChild, joinCond, outerReferenceMap) = + decorrelate(child, newOuterReferences, aggregated = true, underSetOp) + val newProjectList = replaceOuterReferences(projectList, outerReferenceMap) + val newPartitionSpec = replaceOuterReferences(partitionSpec, outerReferenceMap) + val newOrderSpec = replaceOuterReferences(orderSpec, outerReferenceMap) + val referencesToAdd = missingReferences( + newPartitionSpec.asInstanceOf[Seq[NamedExpression]], + joinCond) + + val newWindow = Window(newProjectList ++ referencesToAdd, + partitionSpec = newPartitionSpec ++ referencesToAdd, + orderSpec = newOrderSpec, newChild) + (newWindow, joinCond, outerReferenceMap) + case a @ Aggregate(groupingExpressions, aggregateExpressions, child) => val outerReferences = collectOuterReferences(a.expressions) val newOuterReferences = parentOuterReferences ++ outerReferences diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala index 304f7de4c6ab9..4e0cbbc096957 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala @@ -454,4 +454,48 @@ class DecorrelateInnerQuerySuite extends PlanTest { DomainJoin(Seq(x), testRelation)))) check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x)) } + + test("window function with correlated equality predicate") { + val outerPlan = testRelation2 + val innerPlan = + Window(Seq(b, c), + partitionSpec = Seq(c), orderSpec = b.asc :: Nil, + Filter(And(OuterReference(x) === a, b === 3), + testRelation)) + // Both the project list and the partition spec have added the correlated variable. + val correctAnswer = + Window(Seq(b, c, a), partitionSpec = Seq(c, a), orderSpec = b.asc :: Nil, + Filter(b === 3, + testRelation)) + check(innerPlan, outerPlan, correctAnswer, Seq(x === a)) + } + + test("window function with correlated non-equality predicate") { + val outerPlan = testRelation2 + val innerPlan = + Window(Seq(b, c), + partitionSpec = Seq(c), orderSpec = b.asc :: Nil, + Filter(And(OuterReference(x) > a, b === 3), + testRelation)) + // Both the project list and the partition spec have added the correlated variable. + // The input to the filter is a domain join that produces 'x' values. + val correctAnswer = + Window(Seq(b, c, x), partitionSpec = Seq(c, x), orderSpec = b.asc :: Nil, + Filter(And(b === 3, x > a), + DomainJoin(Seq(x), testRelation))) + check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x)) + } + + test("window function with correlated columns inside") { + val outerPlan = testRelation2 + val innerPlan = + Window(projectList = Seq(b, c), + partitionSpec = Seq(c, OuterReference(x)), orderSpec = b.asc :: Nil, + Filter(b === 3, + testRelation)) + val e = intercept[java.lang.AssertionError] { + DecorrelateInnerQuery(innerPlan, outerPlan.select()) + } + assert(e.getMessage.contains("Correlated column is not allowed in")) + } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out index 11c596eee1cb2..50f11c9445056 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out @@ -639,3 +639,36 @@ Filter isnotnull(min(t1b)#x) +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] +- SubqueryAlias t1 +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + + +-- !query +select t1a +from t1 +where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s + FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a) +-- !query analysis +Project [t1a#x] ++- Filter t1f#x IN (list#x [t1a#x]) + : +- Project [cast(s#x as double) AS s#x] + : +- Window [rank(t2b#x) windowspecdefinition(t3c#x, t2b#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x], [t2b#x ASC NULLS FIRST] + : +- Project [t3c#x, t2b#x] + : +- Filter ((t2c#x = t3c#x) AND (t2a#x < outer(t1a#x))) + : +- Join Inner + : :- SubqueryAlias t2 + : : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : : +- SubqueryAlias t2 + : : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t3 + : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) + : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] + : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + : +- SubqueryAlias t3 + : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index 311f3803902e3..4fa36fc74a633 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -880,6 +880,93 @@ Project [t1a#x] +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] +-- !query +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d = t1.t1d) as tmp) +-- !query analysis +Project [1 AS 1#x] ++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL]) + : +- Aggregate [max(s#xL) AS max(s)#xL] + : +- SubqueryAlias tmp + : +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST] + : +- Project [t2c#x, t2d#xL, t2b#x] + : +- Filter (t2d#xL = outer(t1d#xL)) + : +- SubqueryAlias t2 + : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t2 + : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + + +-- !query +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s + FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp) +-- !query analysis +Project [t1b#x] ++- Filter (cast(t1b#x as int) > scalar-subquery#x [t1a#x]) + : +- Aggregate [max(s#x) AS max(s)#x] + : +- SubqueryAlias tmp + : +- Window [rank(t3c#x) windowspecdefinition(t3c#x, t2b#x, t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x, t2b#x], [t3c#x ASC NULLS FIRST] + : +- Project [t3c#x, t2b#x] + : +- Filter ((t2c#x = t3c#x) AND (t2a#x = outer(t1a#x))) + : +- Join Inner + : :- SubqueryAlias t2 + : : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : : +- SubqueryAlias t2 + : : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t3 + : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) + : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] + : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + : +- SubqueryAlias t3 + : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + + +-- !query +SELECT 1 +FROM t1 +WHERE t1b = (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s + FROM t2) as tmp) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", + "sqlState" : "0A000", + "messageParameters" : { + "sqlExprs" : "\"sum(t2c) OVER (PARTITION BY t2c ORDER BY (t1d + t2d) ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s\",\"(t1d + t2d) ASC NULLS FIRST\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 69, + "stopIndex" : 157, + "fragment" : "SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s\n FROM t2" + } ] +} + + -- !query CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0) -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql index 496285e3514ea..168faa0aee7c5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql @@ -240,5 +240,8 @@ WHERE t1c IN (SELECT Min(t2c) GROUP BY t1a HAVING Min(t1b) IS NOT NULL; - - +-- Window functions are not supported in IN subqueries yet +select t1a +from t1 +where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s + FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a); diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql index e015d57754999..bf9e37736219e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql @@ -280,6 +280,27 @@ HAVING max(t1b) <= (SELECT max(t2b) WHERE t2c = t1c GROUP BY t2c); +-- SPARK-44549: window function in the correlated subquery. +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d = t1.t1d) as tmp); + +-- SPARK-44549: window function in the correlated subquery over joins. +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s + FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp); + +-- SPARK-44549: correlation in window function itself is not supported yet. +SELECT 1 +FROM t1 +WHERE t1b = (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s + FROM t2) as tmp); + -- Set operations in correlation path CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0); diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out index 4c715342bee0e..ba0ed9236bb5c 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out @@ -352,3 +352,28 @@ struct t1a 16 t1b 8 t1c 8 + + +-- !query +select t1a +from t1 +where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s + FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "(t2a#x < outer(t1a#x))\nFilter ((t2c#x = t3c#x) AND (t2a#x < outer(t1a#x)))\n+- Join Inner\n :- SubqueryAlias t2\n : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]\n : +- SubqueryAlias t2\n : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]\n +- SubqueryAlias t3\n +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x])\n +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x]\n +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]\n +- SubqueryAlias t3\n +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 172, + "fragment" : "SELECT RANK() OVER (partition by t3c order by t2b) as s\n FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index 02f3f9eae790a..89bdc653558d7 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -443,6 +443,60 @@ val1b val1c +-- !query +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d = t1.t1d) as tmp) +-- !query schema +struct<1:int> +-- !query output +1 +1 +1 +1 + + +-- !query +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s + FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp) +-- !query schema +struct +-- !query output +8 +8 + + +-- !query +SELECT 1 +FROM t1 +WHERE t1b = (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s + FROM t2) as tmp) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", + "sqlState" : "0A000", + "messageParameters" : { + "sqlExprs" : "\"sum(t2c) OVER (PARTITION BY t2c ORDER BY (t1d + t2d) ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s\",\"(t1d + t2d) ASC NULLS FIRST\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 69, + "stopIndex" : 157, + "fragment" : "SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s\n FROM t2" + } ] +} + + -- !query CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0) -- !query schema From 973cd6f207a1d5b6f390d863c88ffc7d727b7680 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 7 Aug 2023 12:58:30 -0700 Subject: [PATCH 03/12] Cleanup --- .../subquery/scalar-subquery/scalar-subquery-set-op.sql | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql index bc6013963c65d..8f03f7e41004b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-set-op.sql @@ -619,8 +619,3 @@ SELECT t0a, (SELECT sum(d) FROM FROM t2) ) FROM t0; - - -SELECT 1 -FROM t0 -WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp); \ No newline at end of file From c9687985bb311f0abe52095c34c6b6639bb419d4 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 7 Aug 2023 16:38:57 -0700 Subject: [PATCH 04/12] compile --- .../sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala index 4e0cbbc096957..d31e2964f761e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala @@ -489,7 +489,7 @@ class DecorrelateInnerQuerySuite extends PlanTest { test("window function with correlated columns inside") { val outerPlan = testRelation2 val innerPlan = - Window(projectList = Seq(b, c), + Window(Seq(b, c), partitionSpec = Seq(c, OuterReference(x)), orderSpec = b.asc :: Nil, Filter(b === 3, testRelation)) From 83b4518a739df38339a70fa478bc083ae2477492 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Tue, 8 Aug 2023 09:52:09 -0700 Subject: [PATCH 05/12] fix test --- .../subquery/in-subquery/in-group-by.sql.out | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out index 50f11c9445056..407240c1329fe 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out @@ -647,28 +647,18 @@ from t1 where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a) -- !query analysis -Project [t1a#x] -+- Filter t1f#x IN (list#x [t1a#x]) - : +- Project [cast(s#x as double) AS s#x] - : +- Window [rank(t2b#x) windowspecdefinition(t3c#x, t2b#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x], [t2b#x ASC NULLS FIRST] - : +- Project [t3c#x, t2b#x] - : +- Filter ((t2c#x = t3c#x) AND (t2a#x < outer(t1a#x))) - : +- Join Inner - : :- SubqueryAlias t2 - : : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) - : : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] - : : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] - : : +- SubqueryAlias t2 - : : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] - : +- SubqueryAlias t3 - : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) - : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] - : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] - : +- SubqueryAlias t3 - : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] - +- SubqueryAlias t1 - +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) - +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] - +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] - +- SubqueryAlias t1 - +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "(t2a#x < outer(t1a#x))\nFilter ((t2c#x = t3c#x) AND (t2a#x < outer(t1a#x)))\n+- Join Inner\n :- SubqueryAlias t2\n : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]\n : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]\n : +- SubqueryAlias t2\n : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]\n +- SubqueryAlias t3\n +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x])\n +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x]\n +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]\n +- SubqueryAlias t3\n +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 172, + "fragment" : "SELECT RANK() OVER (partition by t3c order by t2b) as s\n FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a" + } ] +} From e0c99d2abacf12410730118b4d976660eca5f09b Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Tue, 8 Aug 2023 10:02:18 -0700 Subject: [PATCH 06/12] golden files --- .../scalar-subquery-predicate.sql.out | 89 ++++++++++--------- .../scalar-subquery-set-op.sql.out | 22 ----- .../scalar-subquery-predicate.sql.out | 18 +--- .../scalar-subquery-set-op.sql.out | 24 ----- 4 files changed, 51 insertions(+), 102 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index 4fa36fc74a633..8409212f6e04e 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -891,15 +891,17 @@ Project [1 AS 1#x] +- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL]) : +- Aggregate [max(s#xL) AS max(s)#xL] : +- SubqueryAlias tmp - : +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST] - : +- Project [t2c#x, t2d#xL, t2b#x] - : +- Filter (t2d#xL = outer(t1d#xL)) - : +- SubqueryAlias t2 - : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) - : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] - : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] - : +- SubqueryAlias t2 - : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- Project [s#xL] + : +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL] + : +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST] + : +- Project [t2b#x, t2c#x, t2d#xL] + : +- Filter (t2d#xL = outer(t1d#xL)) + : +- SubqueryAlias t2 + : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t2 + : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] +- SubqueryAlias t1 +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] @@ -919,22 +921,24 @@ Project [t1b#x] +- Filter (cast(t1b#x as int) > scalar-subquery#x [t1a#x]) : +- Aggregate [max(s#x) AS max(s)#x] : +- SubqueryAlias tmp - : +- Window [rank(t3c#x) windowspecdefinition(t3c#x, t2b#x, t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x, t2b#x], [t3c#x ASC NULLS FIRST] - : +- Project [t3c#x, t2b#x] - : +- Filter ((t2c#x = t3c#x) AND (t2a#x = outer(t1a#x))) - : +- Join Inner - : :- SubqueryAlias t2 - : : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) - : : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] - : : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] - : : +- SubqueryAlias t2 - : : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] - : +- SubqueryAlias t3 - : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) - : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] - : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] - : +- SubqueryAlias t3 - : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + : +- Project [s#x] + : +- Project [t3c#x, t2b#x, s#x, s#x] + : +- Window [rank(t3c#x) windowspecdefinition(t3c#x, t2b#x, t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x, t2b#x], [t3c#x ASC NULLS FIRST] + : +- Project [t3c#x, t2b#x] + : +- Filter ((t2c#x = t3c#x) AND (t2a#x = outer(t1a#x))) + : +- Join Inner + : :- SubqueryAlias t2 + : : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : : +- SubqueryAlias t2 + : : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t3 + : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) + : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] + : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + : +- SubqueryAlias t3 + : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] +- SubqueryAlias t1 +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] @@ -950,21 +954,26 @@ WHERE t1b = (SELECT MAX(tmp.s) FROM ( SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s FROM t2) as tmp) -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", - "sqlState" : "0A000", - "messageParameters" : { - "sqlExprs" : "\"sum(t2c) OVER (PARTITION BY t2c ORDER BY (t1d + t2d) ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s\",\"(t1d + t2d) ASC NULLS FIRST\"" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 69, - "stopIndex" : 157, - "fragment" : "SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s\n FROM t2" - } ] -} +Project [1 AS 1#x] ++- Filter (cast(t1b#x as bigint) = scalar-subquery#x [t1d#xL]) + : +- Aggregate [max(s#xL) AS max(s)#xL] + : +- SubqueryAlias tmp + : +- Project [s#xL] + : +- Project [t2c#x, _w1#xL, s#xL, s#xL] + : +- Window [sum(t2c#x) windowspecdefinition(t2c#x, _w1#xL ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS s#xL], [t2c#x], [_w1#xL ASC NULLS FIRST] + : +- Project [t2c#x, (outer(t1d#xL) + t2d#xL) AS _w1#xL] + : +- SubqueryAlias t2 + : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t2 + : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out index 8517da127cefb..0f53d05ac3e7d 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out @@ -1802,25 +1802,3 @@ org.apache.spark.sql.AnalysisException "fragment" : "SELECT sum(t0a) as d\n FROM t1" } ] } - - --- !query -SELECT 1 -FROM t0 -WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp) --- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (t1a#x = outer(t0b#x))\n+- SubqueryAlias t1\n +- View (`t1`, [t1a#x,t1b#x,t1c#x])\n +- Project [cast(col1#x as int) AS t1a#x, cast(col2#x as int) AS t1b#x, cast(col3#x as int) AS t1c#x]\n +- LocalRelation [col1#x, col2#x, col3#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 55, - "stopIndex" : 142, - "fragment" : "SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b" - } ] -} diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index 89bdc653558d7..954084127ef6f 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -478,23 +478,9 @@ WHERE t1b = (SELECT MAX(tmp.s) FROM ( SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s FROM t2) as tmp) -- !query schema -struct<> +struct<1:int> -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_REFERENCE", - "sqlState" : "0A000", - "messageParameters" : { - "sqlExprs" : "\"sum(t2c) OVER (PARTITION BY t2c ORDER BY (t1d + t2d) ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s\",\"(t1d + t2d) ASC NULLS FIRST\"" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 69, - "stopIndex" : 157, - "fragment" : "SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s\n FROM t2" - } ] -} + -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out index 619be3d15f754..d21873cd3c8ed 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-set-op.sql.out @@ -1041,27 +1041,3 @@ org.apache.spark.sql.AnalysisException "fragment" : "SELECT sum(t0a) as d\n FROM t1" } ] } - - --- !query -SELECT 1 -FROM t0 -WHERE t0a = (SELECT MAX(tmp.s) FROM (SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b) as tmp) --- !query schema -struct<> --- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (t1a#x = outer(t0b#x))\n+- SubqueryAlias t1\n +- View (`t1`, [t1a#x,t1b#x,t1c#x])\n +- Project [cast(col1#x as int) AS t1a#x, cast(col2#x as int) AS t1b#x, cast(col3#x as int) AS t1c#x]\n +- LocalRelation [col1#x, col2#x, col3#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 55, - "stopIndex" : 142, - "fragment" : "SELECT SUM(t1a) OVER (partition by t1b order by t1c) as s FROM t1 where t1.t1a = t0.t0b" - } ] -} From 3a3f7f14ee5ab5e7ae920495c99ec61ee143db42 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Tue, 8 Aug 2023 17:43:44 -0700 Subject: [PATCH 07/12] test --- .../analyzer-results/join-lateral.sql.out | 69 +++++++++++-------- .../sql-tests/results/join-lateral.sql.out | 46 ++++--------- .../org/apache/spark/sql/SubquerySuite.scala | 2 +- 3 files changed, 54 insertions(+), 63 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out index 96b6749b86703..388d19068cd04 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out @@ -1384,21 +1384,22 @@ SELECT * FROM t1 JOIN LATERAL FROM t2 WHERE t2.c1 >= t1.c1) -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias spark_catalog.default.t2\n +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 34, - "stopIndex" : 108, - "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n FROM t2\n WHERE t2.c1 >= t1.c1" - } ] -} +Project [c1#x, c2#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] ++- LateralJoin lateral-subquery#x [c1#x], Inner + : +- SubqueryAlias __auto_generated_subquery_name + : +- Project [sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] + : +- Project [c2#x, c1#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] + : +- Window [sum(c2#x) windowspecdefinition(c1#x ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL], [c1#x ASC NULLS FIRST] + : +- Project [c2#x, c1#x] + : +- Filter (c1#x >= outer(c1#x)) + : +- SubqueryAlias spark_catalog.default.t2 + : +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x]) + : +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x] + : +- LocalRelation [col1#x, col2#x] + +- SubqueryAlias spark_catalog.default.t1 + +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x]) + +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x] + +- LocalRelation [col1#x, col2#x] -- !query @@ -1941,21 +1942,29 @@ SELECT * FROM t1 JOIN LATERAL SELECT t4.c2 FROM t4) -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias spark_catalog.default.t2\n +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 34, - "stopIndex" : 108, - "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n FROM t2\n WHERE t2.c1 >= t1.c1" - } ] -} +Project [c1#x, c2#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] ++- LateralJoin lateral-subquery#x [c1#x], Inner + : +- SubqueryAlias __auto_generated_subquery_name + : +- Union false, false + : :- Project [sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] + : : +- Project [c2#x, c1#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL] + : : +- Window [sum(c2#x) windowspecdefinition(c1#x ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL], [c1#x ASC NULLS FIRST] + : : +- Project [c2#x, c1#x] + : : +- Filter (c1#x >= outer(c1#x)) + : : +- SubqueryAlias spark_catalog.default.t2 + : : +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x]) + : : +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x] + : : +- LocalRelation [col1#x, col2#x] + : +- Project [cast(c2#x as bigint) AS c2#xL] + : +- Project [c2#x] + : +- SubqueryAlias spark_catalog.default.t4 + : +- View (`spark_catalog`.`default`.`t4`, [c1#x,c2#x]) + : +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x] + : +- LocalRelation [col1#x, col2#x] + +- SubqueryAlias spark_catalog.default.t1 + +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x]) + +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x] + +- LocalRelation [col1#x, col2#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out index ddee595372b09..7eab5d7d8f072 100644 --- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out @@ -858,23 +858,10 @@ SELECT * FROM t1 JOIN LATERAL FROM t2 WHERE t2.c1 >= t1.c1) -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias spark_catalog.default.t2\n +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 34, - "stopIndex" : 108, - "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n FROM t2\n WHERE t2.c1 >= t1.c1" - } ] -} +0 1 5 +0 1 5 -- !query @@ -1191,23 +1178,18 @@ SELECT * FROM t1 JOIN LATERAL SELECT t4.c2 FROM t4) -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", - "sqlState" : "0A000", - "messageParameters" : { - "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias spark_catalog.default.t2\n +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 34, - "stopIndex" : 108, - "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n FROM t2\n WHERE t2.c1 >= t1.c1" - } ] -} +0 1 1 +0 1 1 +0 1 2 +0 1 3 +0 1 5 +0 1 5 +1 2 1 +1 2 1 +1 2 2 +1 2 3 -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index d235d2a15fea3..2e6ff19d66662 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -860,7 +860,7 @@ class SubquerySuite extends QueryTest checkErrorMatchPVals( exception4, errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." + - "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED", + "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", parameters = Map("treeNode" -> "(?s).*"), sqlState = None, context = ExpectedContext( From abeb2946c8001269b3ddcf129c80bbd90a952a32 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Tue, 8 Aug 2023 18:07:13 -0700 Subject: [PATCH 08/12] test --- .../analyzer-results/subquery/in-subquery/in-group-by.sql.out | 2 +- .../sql-tests/results/subquery/in-subquery/in-group-by.sql.out | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out index 407240c1329fe..b249a75ab6dbd 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out @@ -647,7 +647,7 @@ from t1 where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a) -- !query analysis -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", "sqlState" : "0A000", diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out index ba0ed9236bb5c..b2456891f92cf 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out @@ -362,7 +362,7 @@ where t1f IN (SELECT RANK() OVER (partition by t3c order by t2b) as s -- !query schema struct<> -- !query output -org.apache.spark.sql.AnalysisException +org.apache.spark.sql.catalyst.ExtendedAnalysisException { "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", "sqlState" : "0A000", From ac2e360891f93e968236de4eca47bbea520f1bf2 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 21 Aug 2023 11:28:53 -0700 Subject: [PATCH 09/12] comments --- .../optimizer/DecorrelateInnerQuery.scala | 8 +-- .../exists-subquery/exists-aggregate.sql.out | 24 +++++++ .../scalar-subquery-predicate.sql.out | 63 +++++++++++++++++++ .../sql-tests/inputs/join-lateral.sql | 2 +- .../exists-subquery/exists-aggregate.sql | 7 +++ .../scalar-subquery-predicate.sql | 17 +++++ .../scalar-subquery-select.sql | 3 + .../exists-subquery/exists-aggregate.sql.out | 26 ++++++++ .../scalar-subquery-predicate.sql.out | 39 ++++++++++++ 9 files changed, 184 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index b266aaadd8dce..230b32ea3bf40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -654,19 +654,19 @@ object DecorrelateInnerQuery extends PredicateHelper { val newProject = Project(newProjectList ++ referencesToAdd, newChild) (newProject, joinCond, outerReferenceMap) - case w@Window(projectList, partitionSpec, orderSpec, child) => + case w @ Window(projectList, partitionSpec, orderSpec, child) => val outerReferences = collectOuterReferences(w.expressions) assert(outerReferences.isEmpty, s"Correlated column is not allowed in window " + s"function: $w") val newOuterReferences = parentOuterReferences ++ outerReferences val (newChild, joinCond, outerReferenceMap) = decorrelate(child, newOuterReferences, aggregated = true, underSetOp) + // For now these are no-op, as we don't allow correlated references in the window + // function itself. val newProjectList = replaceOuterReferences(projectList, outerReferenceMap) val newPartitionSpec = replaceOuterReferences(partitionSpec, outerReferenceMap) val newOrderSpec = replaceOuterReferences(orderSpec, outerReferenceMap) - val referencesToAdd = missingReferences( - newPartitionSpec.asInstanceOf[Seq[NamedExpression]], - joinCond) + val referencesToAdd = AttributeSet(joinCond.flatMap(_.references)) val newWindow = Window(newProjectList ++ referencesToAdd, partitionSpec = newPartitionSpec ++ referencesToAdd, diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out index baeb88169d5f7..549a87f37d3a2 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out @@ -310,3 +310,27 @@ Project [emp_name#x, bonus_amt#x] +- Project [emp_name#x, bonus_amt#x] +- SubqueryAlias BONUS +- LocalRelation [emp_name#x, bonus_amt#x] + + +-- !query +SELECT * +FROM BONUS +WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s + FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id + AND DEPT.dept_name < BONUS.emp_name) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "(dept_name#x < outer(emp_name#x))\nFilter ((dept_id#x = dept_id#x) AND (dept_name#x < outer(emp_name#x)))\n+- Join Inner\n :- SubqueryAlias emp\n : +- View (`EMP`, [id#x,emp_name#x,hiredate#x,salary#x,dept_id#x])\n : +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, cast(dept_id#x as int) AS dept_id#x]\n : +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]\n : +- SubqueryAlias EMP\n : +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]\n +- SubqueryAlias dept\n +- View (`DEPT`, [dept_id#x,dept_name#x,state#x])\n +- Project [cast(dept_id#x as int) AS dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]\n +- Project [dept_id#x, dept_name#x, state#x]\n +- SubqueryAlias DEPT\n +- LocalRelation [dept_id#x, dept_name#x, state#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 224, + "fragment" : "SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s\n FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id\n AND DEPT.dept_name < BONUS.emp_name" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index c9c8887383d9e..7f0ed2c512bdf 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -910,6 +910,36 @@ Project [1 AS 1#x] +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] +-- !query +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d <= t1.t1d) as tmp) +-- !query analysis +Project [1 AS 1#x] ++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL]) + : +- Aggregate [max(s#xL) AS max(s)#xL] + : +- SubqueryAlias tmp + : +- Project [s#xL] + : +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL] + : +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST] + : +- Project [t2b#x, t2c#x, t2d#xL] + : +- Filter (t2d#xL <= outer(t1d#xL)) + : +- SubqueryAlias t2 + : +- View (`t2`, [t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x]) + : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x] + : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + : +- SubqueryAlias t2 + : +- LocalRelation [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + + -- !query SELECT t1b FROM t1 @@ -947,6 +977,39 @@ Project [t1b#x] +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] +-- !query +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s + FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, t3c) as g) as tmp) +ORDER BY t1b +-- !query analysis +Sort [t1b#x ASC NULLS FIRST], true ++- Project [t1b#x] + +- Filter (cast(t1b#x as int) > scalar-subquery#x []) + : +- Aggregate [max(s#x) AS max(s)#x] + : +- SubqueryAlias tmp + : +- Project [s#x] + : +- Project [t3c#x, t3d#xL, s#x, s#x] + : +- Window [rank(t3c#x) windowspecdefinition(t3c#x, t3d#xL, t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x, t3d#xL], [t3c#x ASC NULLS FIRST] + : +- Project [t3c#x, t3d#xL] + : +- SubqueryAlias g + : +- Aggregate [t3b#x, t3c#x], [t3b#x, t3c#x, max(t3d#xL) AS t3d#xL] + : +- SubqueryAlias t3 + : +- View (`t3`, [t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x]) + : +- Project [cast(t3a#x as string) AS t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x] + : +- Project [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + : +- SubqueryAlias t3 + : +- LocalRelation [t3a#x, t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x] + +- SubqueryAlias t1 + +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x]) + +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) AS t1i#x] + +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + +- SubqueryAlias t1 + +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, t1i#x] + + -- !query SELECT 1 FROM t1 diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql index 29ff29d6630b9..09e156e7406b0 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql @@ -178,7 +178,7 @@ SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE_OUTER(c2)); SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3; SELECT * FROM t3 LEFT JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3; --- Window func - unsupported +-- Window func SELECT * FROM t1 JOIN LATERAL (SELECT sum(t2.c2) over (order by t2.c1) FROM t2 diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql index ae6a9641aae66..1c4ef982c662f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql @@ -118,3 +118,10 @@ WHERE NOT EXISTS (SELECT 1 FROM dept WHERE emp.dept_id = dept.dept_id GROUP BY dept.dept_id)); + +-- Window functions are not supported in EXISTS subqueries yet +SELECT * +FROM BONUS +WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s + FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id + AND DEPT.dept_name < BONUS.emp_name); diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql index bf9e37736219e..2a53fa89864cb 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql @@ -287,6 +287,14 @@ WHERE t1b < (SELECT MAX(tmp.s) FROM ( SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s FROM t2 WHERE t2.t2d = t1.t1d) as tmp); + +-- SPARK-44549: window function in the correlated subquery with non-equi predicate. +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d <= t1.t1d) as tmp); + -- SPARK-44549: window function in the correlated subquery over joins. SELECT t1b FROM t1 @@ -294,6 +302,15 @@ WHERE t1b > (SELECT MAX(tmp.s) FROM ( SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp); +-- SPARK-44549: window function in the correlated subquery over aggregation. +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s + FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, t3c) as g) as tmp) +ORDER BY t1b; + + -- SPARK-44549: correlation in window function itself is not supported yet. SELECT 1 FROM t1 diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql index e4f7b25a1684d..cc656792864d2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql @@ -258,3 +258,6 @@ where t.c2 is not null; -- SPARK-43838: Subquery on single table with having clause SELECT c1, c2, (SELECT count(*) cnt FROM t1 t2 WHERE t1.c1 = t2.c1 HAVING cnt = 0) FROM t1 + + +SELECT *, (SELECT RANK() OVER(ORDER BY c2) FROM t2 WHERE t1.c2 > t2.c1) FROM t1 \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out index 5a2f3aad7157a..f62efdcbad447 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out @@ -178,3 +178,29 @@ struct -- !query output emp 5 1000.0 emp 6 - no dept 500.0 + + +-- !query +SELECT * +FROM BONUS +WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s + FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id + AND DEPT.dept_name < BONUS.emp_name) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE", + "sqlState" : "0A000", + "messageParameters" : { + "treeNode" : "(dept_name#x < outer(emp_name#x))\nFilter ((dept_id#x = dept_id#x) AND (dept_name#x < outer(emp_name#x)))\n+- Join Inner\n :- SubqueryAlias emp\n : +- View (`EMP`, [id#x,emp_name#x,hiredate#x,salary#x,dept_id#x])\n : +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, cast(dept_id#x as int) AS dept_id#x]\n : +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]\n : +- SubqueryAlias EMP\n : +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]\n +- SubqueryAlias dept\n +- View (`DEPT`, [dept_id#x,dept_name#x,state#x])\n +- Project [cast(dept_id#x as int) AS dept_id#x, cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]\n +- Project [dept_id#x, dept_name#x, state#x]\n +- SubqueryAlias DEPT\n +- LocalRelation [dept_id#x, dept_name#x, state#x]\n" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 34, + "stopIndex" : 224, + "fragment" : "SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s\n FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id\n AND DEPT.dept_name < BONUS.emp_name" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out index b2a3dd6d52f57..05e8b5fb4a822 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out @@ -458,6 +458,23 @@ struct<1:int> 1 +-- !query +SELECT 1 +FROM t1 +WHERE t1b < (SELECT MAX(tmp.s) FROM ( + SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s + FROM t2 WHERE t2.t2d <= t1.t1d) as tmp) +-- !query schema +struct<1:int> +-- !query output +1 +1 +1 +1 +1 +1 + + -- !query SELECT t1b FROM t1 @@ -471,6 +488,28 @@ struct 8 +-- !query +SELECT t1b +FROM t1 +WHERE t1b > (SELECT MAX(tmp.s) FROM ( + SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s + FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, t3c) as g) as tmp) +ORDER BY t1b +-- !query schema +struct +-- !query output +6 +6 +8 +8 +10 +10 +10 +10 +16 +16 + + -- !query SELECT 1 FROM t1 From b0a5ad491f8fe0397b4926d8cf0564a6ec63a069 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 21 Aug 2023 12:00:13 -0700 Subject: [PATCH 10/12] comment --- .../spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index ce1b39bba1dd9..76c8214555ec3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -478,7 +478,8 @@ object DecorrelateInnerQuery extends PredicateHelper { // parentOuterReferences: a set of parent outer references. As we recurse down we collect the // set of outer references that are part of the Domain, and use it to construct the DomainJoins // and join conditions. - // aggregated: a boolean flag indicating whether the result of the plan will be aggregated. + // aggregated: a boolean flag indicating whether the result of the plan will be aggregated + // (or used as an input for a window function) // underSetOp: a boolean flag indicating whether a set operator (e.g. UNION) is a parent of the // inner plan. // From c9c23c100dbd0ab4c10b899a7c47c90368f8aa24 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Mon, 21 Aug 2023 12:52:06 -0700 Subject: [PATCH 11/12] referencesToAdd --- .../spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index 76c8214555ec3..a07177f6e8a08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -667,7 +667,7 @@ object DecorrelateInnerQuery extends PredicateHelper { val newProjectList = replaceOuterReferences(projectList, outerReferenceMap) val newPartitionSpec = replaceOuterReferences(partitionSpec, outerReferenceMap) val newOrderSpec = replaceOuterReferences(orderSpec, outerReferenceMap) - val referencesToAdd = AttributeSet(joinCond.flatMap(_.references)) + val referencesToAdd = missingReferences(newProjectList, joinCond) val newWindow = Window(newProjectList ++ referencesToAdd, partitionSpec = newPartitionSpec ++ referencesToAdd, From b17b2e7bcd8cddff7df4f65aeaabaf97a8ddcd33 Mon Sep 17 00:00:00 2001 From: Andrey Gubichev Date: Tue, 22 Aug 2023 10:35:57 -0700 Subject: [PATCH 12/12] remove invalid test --- .../inputs/subquery/scalar-subquery/scalar-subquery-select.sql | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql index cc656792864d2..e4f7b25a1684d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-select.sql @@ -258,6 +258,3 @@ where t.c2 is not null; -- SPARK-43838: Subquery on single table with having clause SELECT c1, c2, (SELECT count(*) cnt FROM t1 t2 WHERE t1.c1 = t2.c1 HAVING cnt = 0) FROM t1 - - -SELECT *, (SELECT RANK() OVER(ORDER BY c2) FROM t2 WHERE t1.c2 > t2.c1) FROM t1 \ No newline at end of file