Skip to content

Commit

Permalink
[SPARK-45942][CORE] Only do the thread interruption check for putIter…
Browse files Browse the repository at this point in the history
…ator on executors

### What changes were proposed in this pull request?
Only do the thread interruption check for putIterator on executors

### Why are the changes needed?

https://issues.apache.org/jira/browse/SPARK-45025

introduces a peaceful thread interruption handling. However, there is an edge case: when a streaming query is stopped on the driver, it interrupts the stream execution thread. If the streaming query is doing memory store operations on driver and performs doPutIterator at the same time, the [unroll process will be broken](https://github.com/apache/spark/blob/39fc6108bfaaa0ce471f6460880109f948ba5c62/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L224) and [returns used memory](https://github.com/apache/spark/blob/39fc6108bfaaa0ce471f6460880109f948ba5c62/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L245-L247).

This can result in closeChannelException as it falls into this [case clause](https://github.com/apache/spark/blob/aa646d3050028272f7333deaef52f20e6975e0ed/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1614-L1622) which opens an I/O channel and persists the data into the disk. However, because the thread is interrupted, the channel will be closed at the begin: https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/nio/channels/spi/AbstractInterruptibleChannel.java#L172 and throws out closeChannelException

On executors, [the task will be killed if the thread is interrupted](https://github.com/apache/spark/blob/39fc6108bfaaa0ce471f6460880109f948ba5c62/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L374), however, we don't do it on the driver.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Ran MemoryStoreSuite
```
[info] MemoryStoreSuite:
[info] - reserve/release unroll memory (36 milliseconds)
[info] - safely unroll blocks (70 milliseconds)
[info] - safely unroll blocks through putIteratorAsValues (10 milliseconds)
[info] - safely unroll blocks through putIteratorAsValues off-heap (21 milliseconds)
[info] - safely unroll blocks through putIteratorAsBytes (138 milliseconds)
[info] - PartiallySerializedBlock.valuesIterator (6 milliseconds)
[info] - PartiallySerializedBlock.finishWritingToStream (5 milliseconds)
[info] - multiple unrolls by the same thread (8 milliseconds)
[info] - lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore (3 milliseconds)
[info] - put a small ByteBuffer to MemoryStore (3 milliseconds)
[info] - SPARK-22083: Release all locks in evictBlocksToFreeSpace (43 milliseconds)
[info] - put user-defined objects to MemoryStore and remove (5 milliseconds)
[info] - put user-defined objects to MemoryStore and clear (4 milliseconds)
[info] Run completed in 1 second, 587 milliseconds.
[info] Total number of tests run: 13
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43823 from huanliwang-db/fix-interrupt.

Lead-authored-by: Huanli Wang <[email protected]>
Co-authored-by: Huanli Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
2 people authored and HyukjinKwon committed Nov 20, 2023
1 parent ed3a2ed commit 6f7ce91
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ private[spark] class MemoryStore(
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}

// Only do the thread interruption check on the executors.
val shouldCheckThreadInterruption = Option(TaskContext.get()).isDefined

// Unroll this block safely, checking whether we have exceeded our threshold periodically
// and if no thread interrupts have been received.
while (values.hasNext && keepUnrolling && !Thread.currentThread().isInterrupted) {
while (values.hasNext && keepUnrolling &&
(!shouldCheckThreadInterruption || !Thread.currentThread().isInterrupted)) {
valuesHolder.storeValue(values.next())
if (elementsUnrolled % memoryCheckPeriod == 0) {
val currentSize = valuesHolder.estimatedSize()
Expand All @@ -242,7 +246,7 @@ private[spark] class MemoryStore(

// SPARK-45025 - if a thread interrupt was received, we log a warning and return used memory
// to avoid getting killed by task reaper eventually.
if (Thread.currentThread().isInterrupted) {
if (shouldCheckThreadInterruption && Thread.currentThread().isInterrupted) {
logInfo(s"Failed to unroll block=$blockId since thread interrupt was received")
Left(unrollMemoryUsedByThisBlock)
} else if (keepUnrolling) {
Expand Down

0 comments on commit 6f7ce91

Please sign in to comment.