Skip to content

Commit

Permalink
[SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DS…
Browse files Browse the repository at this point in the history
…v2 streaming source are co-used

### What changes were proposed in this pull request?

This PR proposes to fix the metrics issue for streaming query when DSv1 streaming source and DSv2 streaming source are co-used. If the streaming query has both DSv1 streaming source and DSv2 streaming source, only DSv1 streaming source produced correct metrics.

There is a bug in ProgressReporter that it tries to match logical node for DSv2 streaming source with OffsetHolder (association map has OffsetHolder instances for DSv2 streaming sources), which will be never matched. Given that physical node for DSv2 streaming source contains both source information and metrics, we can simply deduce all the necessary information from the physical node rather than trying to find the source from association map.

### Why are the changes needed?

The logic of collecting metrics does not collect metrics for DSv2 streaming sources properly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test case.

Closes apache#38719 from HeartSaVioR/SPARK-41999.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Nov 19, 2022
1 parent 82a41d8 commit 2584968
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,14 @@ trait ProgressReporter extends Logging {
val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
case (_, ep: MicroBatchScanExec) =>
// SPARK-41199: `logicalPlanLeafToSource` contains OffsetHolder instance for DSv2
// streaming source, hence we cannot lookup the actual source from the map.
// The physical node for DSv2 streaming source contains the information of the source
// by itself, so leverage it.
Some(ep -> ep.stream)
case (lp, ep) =>
logicalPlanLeafToSource.get(lp).map { source => ep -> source }
}
val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) =>
val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -657,6 +657,60 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") {
withTable("parquet_streaming_tbl") {
val streamInput = MemoryStream[Int]
val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream")

spark.sql(
"""
|CREATE TABLE parquet_streaming_tbl
|(
| key integer,
| value_stream integer
|)
|USING parquet
|""".stripMargin)

val streamDf2 = spark.readStream.table("parquet_streaming_tbl")
val unionedDf = streamDf.union(streamDf2)

val clock = new StreamManualClock()
testStream(unionedDf)(
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
AddData(streamInput, 1, 2, 3),
Execute { _ =>
spark.range(4, 6).selectExpr("id AS key", "id AS value_stream")
.write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl")
},
AdvanceManualClock(150),
waitUntilBatchProcessed(clock),
CheckLastBatch((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)),
AssertOnQuery { q =>
val lastProgress = getLastProgressWithData(q)
assert(lastProgress.nonEmpty)
assert(lastProgress.get.numInputRows == 5)
assert(lastProgress.get.sources.length == 2)
assert(lastProgress.get.sources(0).numInputRows == 3)
assert(lastProgress.get.sources(1).numInputRows == 2)
true
}
)
}
}

private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}

testQuietly("StreamExecution metadata garbage collection") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)
Expand Down

0 comments on commit 2584968

Please sign in to comment.