From 25849684b78cca6651e25d6efc9644a576e7e20f Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Sat, 19 Nov 2022 22:42:26 +0900 Subject: [PATCH] [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used ### What changes were proposed in this pull request? This PR proposes to fix the metrics issue for streaming query when DSv1 streaming source and DSv2 streaming source are co-used. If the streaming query has both DSv1 streaming source and DSv2 streaming source, only DSv1 streaming source produced correct metrics. There is a bug in ProgressReporter that it tries to match logical node for DSv2 streaming source with OffsetHolder (association map has OffsetHolder instances for DSv2 streaming sources), which will be never matched. Given that physical node for DSv2 streaming source contains both source information and metrics, we can simply deduce all the necessary information from the physical node rather than trying to find the source from association map. ### Why are the changes needed? The logic of collecting metrics does not collect metrics for DSv2 streaming sources properly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test case. Closes #38719 from HeartSaVioR/SPARK-41999. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../streaming/ProgressReporter.scala | 9 ++- .../sql/streaming/StreamingQuerySuite.scala | 56 ++++++++++++++++++- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 8a89ca7b85dba..a4c975861c5f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -345,7 +345,14 @@ trait ProgressReporter extends Logging { val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { - case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } + case (_, ep: MicroBatchScanExec) => + // SPARK-41199: `logicalPlanLeafToSource` contains OffsetHolder instance for DSv2 + // streaming source, hence we cannot lookup the actual source from the map. + // The physical node for DSv2 streaming source contains the information of the source + // by itself, so leverage it. + Some(ep -> ep.stream) + case (lp, ep) => + logicalPlanLeafToSource.get(lp).map { source => ep -> source } } val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) => val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 090a2081219f2..71eb4c15701e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -657,6 +657,60 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { + withTable("parquet_streaming_tbl") { + val streamInput = MemoryStream[Int] + val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") + + spark.sql( + """ + |CREATE TABLE parquet_streaming_tbl + |( + | key integer, + | value_stream integer + |) + |USING parquet + |""".stripMargin) + + val streamDf2 = spark.readStream.table("parquet_streaming_tbl") + val unionedDf = streamDf.union(streamDf2) + + val clock = new StreamManualClock() + testStream(unionedDf)( + StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)), + AddData(streamInput, 1, 2, 3), + Execute { _ => + spark.range(4, 6).selectExpr("id AS key", "id AS value_stream") + .write.format("parquet").mode(SaveMode.Append).saveAsTable("parquet_streaming_tbl") + }, + AdvanceManualClock(150), + waitUntilBatchProcessed(clock), + CheckLastBatch((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)), + AssertOnQuery { q => + val lastProgress = getLastProgressWithData(q) + assert(lastProgress.nonEmpty) + assert(lastProgress.get.numInputRows == 5) + assert(lastProgress.get.sources.length == 2) + assert(lastProgress.get.sources(0).numInputRows == 3) + assert(lastProgress.get.sources(1).numInputRows == 2) + true + } + ) + } + } + + private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + testQuietly("StreamExecution metadata garbage collection") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _)