Skip to content

Commit

Permalink
[SPARK-45080][SS] Explicitly call out support for columnar in DSv2 st…
Browse files Browse the repository at this point in the history
…reaming data sources

### What changes were proposed in this pull request?

This PR proposes to override `Scan.columnarSupportMode` for DSv2 streaming data sources. All of them don't support columnar. This applies [SPARK-44505](https://issues.apache.org/jira/browse/SPARK-44505) to the DSv2 streaming data sources.

Rationalization will be explained in the next section.

### Why are the changes needed?

The default value for `Scan.columnarSupportMode` is `PARTITION_DEFINED`, which requires `inputPartitions` to be called/evaluated. That could be referenced multiple times during planning.

In `MicrobatchScanExec`, we define `inputPartitions` as lazy val, so that there is no multiple evaluation of inputPartitions, which calls `MicroBatchStream.planInputPartitions`. But we missed that there is no guarantee that the instance will be initialized only once (although the actual execution will happen once) - for example, executedPlan clones the plan (internally we call constructor to make a deep copy of the node), explain (internally called to build a SQL execution start event), etc...

I see `MicroBatchStream.planInputPartitions` gets called 4 times per microbatch, which can be concerning if the overhead of planInputPartitions is non-trivial, specifically Kafka.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #42823 from HeartSaVioR/SPARK-45080.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Sep 7, 2023
1 parent af1615d commit 0e6e15c
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(new OffsetOutOfRangeMetric, new DataLossMetric)
}

override def columnarSupportMode(): Scan.ColumnarSupportMode =
Scan.ColumnarSupportMode.UNSUPPORTED
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
stream.asInstanceOf[ContinuousStream]
}

override def columnarSupportMode(): Scan.ColumnarSupportMode =
Scan.ColumnarSupportMode.UNSUPPORTED
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class RatePerMicroBatchTable(
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
throw new UnsupportedOperationException("continuous mode is not supported!")
}

override def columnarSupportMode(): Scan.ColumnarSupportMode =
Scan.ColumnarSupportMode.UNSUPPORTED
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class RateStreamTable(

override def toContinuousStream(checkpointLocation: String): ContinuousStream =
new RateStreamContinuousStream(rowsPerSecond, numPartitions)

override def columnarSupportMode(): Scan.ColumnarSupportMode =
Scan.ColumnarSupportMode.UNSUPPORTED
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
new TextSocketContinuousStream(host, port, numPartitions, options)
}

override def columnarSupportMode(): Scan.ColumnarSupportMode =
Scan.ColumnarSupportMode.UNSUPPORTED
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(progress.processedRowsPerSecond === 4.0)

assert(progress.durationMs.get("latestOffset") === 50)
assert(progress.durationMs.get("queryPlanning") === 100)
assert(progress.durationMs.get("queryPlanning") === 0)
assert(progress.durationMs.get("walCommit") === 0)
assert(progress.durationMs.get("commitOffsets") === 0)
assert(progress.durationMs.get("addBatch") === 350)
assert(progress.durationMs.get("addBatch") === 450)
assert(progress.durationMs.get("triggerExecution") === 500)

assert(progress.sources.length === 1)
Expand Down

0 comments on commit 0e6e15c

Please sign in to comment.