Skip to content

Commit

Permalink
[SPARK-49269][SQL] Eagerly evaluate VALUES() list in AstBuilder
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This is a continuation of a prior performance improvement: #47428 that eagerly evaluates memory-heavy `UnresolvedUnlineTables` parse tree nodes as soon as they are constructed in the AstBuilder.

This PR applies this optimization to any statement that might contain one or more `VALUES()` clauses (such as subqueries etc), instead of just applying that optimization to `INSERT INTO ... VALUES` statements, which is what the prior PR did.

### Why are the changes needed?
With these changes we not only reduce the memory footprint of every statement that can contain the `VALUES()` clause, but we also improve upon the previous optimization as we avoid unnecessary traversals of the parse tree, which not only improves the runtime performance, but also minimizes the amount of time in which the `UnresolvedInlineTable` is kept in memory.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Provided scala tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47791 from costas-db/eagerlyEvaluateUnresolvedInlineTableInAstBuilder.

Authored-by: Costas Zarifis <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
costas-db authored and HyukjinKwon committed Aug 21, 2024
1 parent ba208b9 commit af4c738
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,19 +468,14 @@ class AstBuilder extends DataTypeAstBuilder
val (relationCtx, options, cols, partition, ifPartitionNotExists, byName)
= visitInsertIntoTable(table)
withIdentClause(relationCtx, Seq(query), (ident, otherPlans) => {
val insertIntoStatement = InsertIntoStatement(
InsertIntoStatement(
createUnresolvedRelation(relationCtx, ident, options),
partition,
cols,
otherPlans.head,
overwrite = false,
ifPartitionNotExists,
byName)
if (conf.getConf(SQLConf.OPTIMIZE_INSERT_INTO_VALUES_PARSER)) {
EvaluateUnresolvedInlineTable.evaluate(insertIntoStatement)
} else {
insertIntoStatement
}
})
case table: InsertOverwriteTableContext =>
val (relationCtx, options, cols, partition, ifPartitionNotExists, byName)
Expand Down Expand Up @@ -1897,7 +1892,12 @@ class AstBuilder extends DataTypeAstBuilder
Seq.tabulate(rows.head.size)(i => s"col${i + 1}")
}

val table = UnresolvedInlineTable(aliases, rows.toSeq)
val unresolvedTable = UnresolvedInlineTable(aliases, rows.toSeq)
val table = if (conf.getConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED)) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable)
} else {
unresolvedTable
}
table.optionalMap(ctx.tableAlias.strictIdentifier)(aliasPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,8 @@ import org.apache.spark.sql.types.{StructField, StructType}
object EvaluateUnresolvedInlineTable extends SQLConfHelper
with AliasHelper with EvalHelper with CastSupport {

def evaluate(plan: LogicalPlan): LogicalPlan = {
traversePlanAndEvalUnresolvedInlineTable(plan)
}

def traversePlanAndEvalUnresolvedInlineTable(plan: LogicalPlan): LogicalPlan = {
plan match {
case table: UnresolvedInlineTable if table.expressionsResolved =>
evaluateUnresolvedInlineTable(table)
case _ => plan.mapChildren(traversePlanAndEvalUnresolvedInlineTable)
}
}
def evaluate(plan: UnresolvedInlineTable): LogicalPlan =
if (plan.expressionsResolved) evaluateUnresolvedInlineTable(plan) else plan

def evaluateUnresolvedInlineTable(table: UnresolvedInlineTable): LogicalPlan = {
validateInputDimension(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,11 +969,11 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val OPTIMIZE_INSERT_INTO_VALUES_PARSER =
buildConf("spark.sql.parser.optimizeInsertIntoValuesParser")
val EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED =
buildConf("spark.sql.parser.eagerEvalOfUnresolvedInlineTable")
.internal()
.doc("Controls whether we optimize the ASTree that gets generated when parsing " +
"`insert into ... values` DML statements.")
"VALUES lists (UnresolvedInlineTable) by eagerly evaluating it in the AST Builder.")
.booleanConf
.createWithDefault(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ class DDLParserSuite extends AnalysisTest {

for (optimizeInsertIntoValues <- Seq(true, false)) {
withSQLConf(
SQLConf.OPTIMIZE_INSERT_INTO_VALUES_PARSER.key ->
SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeInsertIntoValues.toString) {
comparePlans(parsePlan(dateTypeSql), insertPartitionPlan(
"2019-01-02", optimizeInsertIntoValues))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import scala.annotation.nowarn

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{EvaluateUnresolvedInlineTable, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedParameter, PosParameter, RelationTimeTravel, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, UnresolvedTVFAliases}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -1000,14 +1000,28 @@ class PlanParserSuite extends AnalysisTest {
}

test("inline table") {
assertEqual("values 1, 2, 3, 4",
UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x)))))
for (optimizeValues <- Seq(true, false)) {
withSQLConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeValues.toString) {
val unresolvedTable1 =
UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x))))
val table1 = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable1)
} else {
unresolvedTable1
}
assertEqual("values 1, 2, 3, 4", table1)

assertEqual(
"values (1, 'a'), (2, 'b') as tbl(a, b)",
UnresolvedInlineTable(
Seq("a", "b"),
Seq(Literal(1), Literal("a")) :: Seq(Literal(2), Literal("b")) :: Nil).as("tbl"))
val unresolvedTable2 = UnresolvedInlineTable(
Seq("a", "b"), Seq(Literal(1), Literal("a")) :: Seq(Literal(2), Literal("b")) :: Nil)
val table2 = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable2)
} else {
unresolvedTable2
}
assertEqual("values (1, 'a'), (2, 'b') as tbl(a, b)", table2.as("tbl"))
}
}
}

test("simple select query with !> and !<") {
Expand Down Expand Up @@ -1907,12 +1921,22 @@ class PlanParserSuite extends AnalysisTest {
}

test("SPARK-42553: NonReserved keyword 'interval' can be column name") {
comparePlans(
parsePlan("SELECT interval FROM VALUES ('abc') AS tbl(interval);"),
UnresolvedInlineTable(
Seq("interval"),
Seq(Literal("abc")) :: Nil).as("tbl").select($"interval")
)
for (optimizeValues <- Seq(true, false)) {
withSQLConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeValues.toString) {
val unresolvedTable =
UnresolvedInlineTable(Seq("interval"), Seq(Literal("abc")) :: Nil)
val table = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable)
} else {
unresolvedTable
}
comparePlans(
parsePlan("SELECT interval FROM VALUES ('abc') AS tbl(interval);"),
table.as("tbl").select($"interval")
)
}
}
}

test("SPARK-44066: parsing of positional parameters") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select * from values ("one", 2.0), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -157,7 +157,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select * from values ("one"), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ SELECT a, b,
SUM(b) OVER(ORDER BY A ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
FROM (VALUES(1,1),(2,2),(3,(cast('nan' as int))),(4,3),(5,4)) t(a,b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select udf(a), udf(b) from values ("one", 2.0), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -143,7 +143,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select udf(a), udf(b) from values ("one"), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ select * from values ("one", 2.0), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -177,7 +177,7 @@ select * from values ("one"), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ FROM (VALUES(1,1),(2,2),(3,(cast('nan' as int))),(4,3),(5,4)) t(a,b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ select udf(a), udf(b) from values ("one", 2.0), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -161,7 +161,7 @@ select udf(a), udf(b) from values ("one"), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Loading

0 comments on commit af4c738

Please sign in to comment.