Skip to content

Commit

Permalink
[SPARK-41198][SS] Fix metrics in streaming query having CTE and DSv1 …
Browse files Browse the repository at this point in the history
…streaming source

### What changes were proposed in this pull request?

This PR proposes to fix the broken metrics when the streaming query has CTE and DSv1 streaming source, via applying InlineCTE manually against analyzed plan when collecting metrics.

Suppose a streaming query contains below part as batch side which is joined with streaming source:

```
with batch_tbl as (
  SELECT col1, col2 FROM parquet_tbl
)

SELECT col1 AS key, col2 as value_batch FROM batch_tbl
```

Currently, Spark adds WithCTE node with CTERelationDef and CTERelationRef when there is a usage of CTE. Below is an analyzed plan:

```
WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
+- Project [key#15, value_stream#16, value_batch#9L]
   +- Join Inner, (cast(key#15 as bigint) = key#8L)
      :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
      :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
      :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
      +- WithCTE
         :- CTERelationDef 0, false
         :  +- SubqueryAlias batch_tbl
         :     +- Project [col1#10L, col2#11L]
         :        +- SubqueryAlias spark_catalog.default.parquet_tbl
         :           +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
         +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
            +- SubqueryAlias batch_tbl
               +- CTERelationRef 0, true, [col1#10L, col2#11L]
```

Here, there are 3 leaf nodes in the plan, but the actual sources in the leaf nodes are 2. During the optimization, inlining CTE happens and there are 2 leaf nodes. Below is the optimized plan:

```
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite622c7c7f]
+- Project [key#55, value_stream#56, value_batch#9L]
   +- Join Inner, (cast(key#55 as bigint) = key#8L)
      :- Filter isnotnull(key#55)
      :  +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
      +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
         +- Filter isnotnull(col1#10L)
            +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
```

Hence executed plan will also have 2 leaf nodes, which does not match with the number of leaf nodes in analyzed plan, and ProgressReporter will give up collecting metrics.

Applying InlineCTE against analyzed plan during collecting metrics would resolve this. For example, below is the logical plan which applies InlineCTE against above analyzed plan.

```
WriteToMicroBatchDataSource MemorySink, 2cbb2afa-6513-4a23-b4c2-37910fc9cdf9, Append, 0
+- Project [key#15, value_stream#16, value_batch#9L]
   +- Join Inner, (cast(key#15 as bigint) = key#8L)
      :- SubqueryAlias spark_catalog.default.parquet_streaming_tbl
      :  +- Project [key#55 AS key#15, value_stream#56 AS value_stream#16]
      :     +- Relation spark_catalog.default.parquet_streaming_tbl[key#55,value_stream#56] parquet
      +- Project [col1#10L AS key#8L, col2#11L AS value_batch#9L]
         +- SubqueryAlias batch_tbl
            +- SubqueryAlias batch_tbl
               +- Project [col1#10L, col2#11L]
                  +- SubqueryAlias spark_catalog.default.parquet_tbl
                     +- Relation spark_catalog.default.parquet_tbl[col1#10L,col2#11L] parquet
```

Note that this is only required for the case where there is at least one of DSv1 streaming source in the streaming query. If streaming query only contains DSv2 data sources as streaming sources, ProgressReporter can just pick up dedicated physical node(s) from executed plan.

### Why are the changes needed?

The metrics in streaming query are broken if the query contains CTE.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test case.

Closes apache#38717 from HeartSaVioR/SPARK-41198.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Nov 20, 2022
1 parent 865c5f6 commit 1669af1
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.catalyst.optimizer.InlineCTE
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.Table
Expand Down Expand Up @@ -293,6 +294,19 @@ trait ProgressReporter extends Logging {
tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for each source
}

def unrollCTE(plan: LogicalPlan): LogicalPlan = {
val containsCTE = plan.exists {
case _: WithCTE => true
case _ => false
}

if (containsCTE) {
InlineCTE(alwaysInline = true).apply(plan)
} else {
plan
}
}

val onlyDataSourceV2Sources = {
// Check whether the streaming query's logical plan has only V2 micro-batch data sources
val allStreamingLeaves = logicalPlan.collect {
Expand Down Expand Up @@ -341,7 +355,13 @@ trait ProgressReporter extends Logging {
val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
logicalPlan.collectLeaves().map { leaf => leaf -> source }
}
val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming

// SPARK-41198: CTE is inlined in optimization phase, which ends up with having different
// number of leaf nodes between (analyzed) logical plan and executed plan. Here we apply
// inlining CTE against logical plan manually if there is a CTE node.
val finalLogicalPlan = unrollCTE(lastExecution.logical)

val allLogicalPlanLeaves = finalLogicalPlan.collectLeaves() // includes non-streaming
val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,58 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-41198: input row calculation with CTE") {
withTable("parquet_tbl", "parquet_streaming_tbl") {
spark.range(0, 10).selectExpr("id AS col1", "id AS col2")
.write.format("parquet").saveAsTable("parquet_tbl")

val dfWithClause = spark.sql(
"""
|with batch_tbl as (
| SELECT col1, col2 FROM parquet_tbl
|)
|
|SELECT col1 AS key, col2 as value_batch FROM batch_tbl
|""".stripMargin)

spark.sql(
"""
|CREATE TABLE parquet_streaming_tbl
|(
| key integer,
| value_stream integer
|)
|USING parquet
|""".stripMargin)

// NOTE: if we only have DSv2 streaming source(s) as all streaming sources in the query, it
// simply collects the corresponding physical nodes from executed plan and does not encounter
// the issue. Here we use DSv1 streaming source to reproduce the issue.
val streamDf = spark.readStream.table("parquet_streaming_tbl")
val joinedDf = streamDf.join(dfWithClause, Seq("key"), "inner")

val clock = new StreamManualClock()
testStream(joinedDf)(
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
Execute { _ =>
spark.range(1, 5).selectExpr("id AS key", "id AS value_stream")
.write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
},
AdvanceManualClock(150),
waitUntilBatchProcessed(clock),
CheckLastBatch((1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)),
AssertOnQuery { q =>
val lastProgress = getLastProgressWithData(q)
assert(lastProgress.nonEmpty)
assert(lastProgress.get.numInputRows == 4)
assert(lastProgress.get.sources.length == 1)
assert(lastProgress.get.sources(0).numInputRows == 4)
true
}
)
}
}

test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
withTable("parquet_streaming_tbl") {
val streamInput = MemoryStream[Int]
Expand Down

0 comments on commit 1669af1

Please sign in to comment.