From 0e6e15ca6331d37a6c38c970556903c6df5d5dfb Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 7 Sep 2023 19:42:58 +0900 Subject: [PATCH] [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 streaming data sources ### What changes were proposed in this pull request? This PR proposes to override `Scan.columnarSupportMode` for DSv2 streaming data sources. All of them don't support columnar. This applies [SPARK-44505](https://issues.apache.org/jira/browse/SPARK-44505) to the DSv2 streaming data sources. Rationalization will be explained in the next section. ### Why are the changes needed? The default value for `Scan.columnarSupportMode` is `PARTITION_DEFINED`, which requires `inputPartitions` to be called/evaluated. That could be referenced multiple times during planning. In `MicrobatchScanExec`, we define `inputPartitions` as lazy val, so that there is no multiple evaluation of inputPartitions, which calls `MicroBatchStream.planInputPartitions`. But we missed that there is no guarantee that the instance will be initialized only once (although the actual execution will happen once) - for example, executedPlan clones the plan (internally we call constructor to make a deep copy of the node), explain (internally called to build a SQL execution start event), etc... I see `MicroBatchStream.planInputPartitions` gets called 4 times per microbatch, which can be concerning if the overhead of planInputPartitions is non-trivial, specifically Kafka. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42823 from HeartSaVioR/SPARK-45080. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 3 +++ .../org/apache/spark/sql/execution/streaming/memory.scala | 3 +++ .../streaming/sources/RatePerMicroBatchProvider.scala | 3 +++ .../sql/execution/streaming/sources/RateStreamProvider.scala | 3 +++ .../streaming/sources/TextSocketSourceProvider.scala | 3 +++ .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 4 ++-- 6 files changed, 17 insertions(+), 2 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index de78992533b22..d9e3a1256ea47 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -525,6 +525,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def supportedCustomMetrics(): Array[CustomMetric] = { Array(new OffsetOutOfRangeMetric, new DataLossMetric) } + + override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 34076f26fe82a..732eaa8d783d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -139,6 +139,9 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w override def toContinuousStream(checkpointLocation: String): ContinuousStream = { stream.asInstanceOf[ContinuousStream] } + + override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala index 41878a6a54975..17cc1860fbdcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala @@ -111,6 +111,9 @@ class RatePerMicroBatchTable( override def toContinuousStream(checkpointLocation: String): ContinuousStream = { throw new UnsupportedOperationException("continuous mode is not supported!") } + + override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index bf2cc770d79bc..24e283f4ad6fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -100,6 +100,9 @@ class RateStreamTable( override def toContinuousStream(checkpointLocation: String): ContinuousStream = new RateStreamContinuousStream(rowsPerSecond, numPartitions) + + override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index 1ab88cd41d875..e4251cc7d3936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -98,6 +98,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest override def toContinuousStream(checkpointLocation: String): ContinuousStream = { new TextSocketContinuousStream(host, port, numPartitions, options) } + + override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 4a6325eb06074..c3729d50ed09c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -329,10 +329,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.processedRowsPerSecond === 4.0) assert(progress.durationMs.get("latestOffset") === 50) - assert(progress.durationMs.get("queryPlanning") === 100) + assert(progress.durationMs.get("queryPlanning") === 0) assert(progress.durationMs.get("walCommit") === 0) assert(progress.durationMs.get("commitOffsets") === 0) - assert(progress.durationMs.get("addBatch") === 350) + assert(progress.durationMs.get("addBatch") === 450) assert(progress.durationMs.get("triggerExecution") === 500) assert(progress.sources.length === 1)