Skip to content

Commit

Permalink
[MINOR][SQL] Move iterator.hasNext into try block in executeTask
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch moves `iterator.hasNext` into the try block of `tryWithSafeFinallyAndFailureCallbacks` in `FileFormatWriter.executeTask`.

### Why are the changes needed?

Not only `dataWriter.writeWithIterator(iterator)` causes error, `iterator.hasNext` could cause error like:

```
org.apache.spark.shuffle.FetchFailedException: Block shuffle_1_106_21 is corrupted but checksum verification passed
```

As it is not wrapped in the try block, `abort` won't be called on the committer. But as `setupTask` is called, it is safer to call `abort` in any case that error happens after it.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48360 from viirya/try_block.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: huaxingao <[email protected]>
  • Loading branch information
viirya authored and huaxingao committed Oct 6, 2024
1 parent 37f2966 commit 06c70ba
Showing 1 changed file with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,32 +383,41 @@ object FileFormatWriter extends Logging {

committer.setupTask(taskAttemptContext)

val dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
new DynamicPartitionDataConcurrentWriter(
description, taskAttemptContext, committer, spec)
case _ =>
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
}
}
var dataWriter: FileFormatDataWriter = null

Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
dataWriter =
if (sparkPartitionId != 0 && !iterator.hasNext) {
// In case of empty job, leave first partition to save meta for file format like parquet.
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
new DynamicPartitionDataConcurrentWriter(
description, taskAttemptContext, committer, spec)
case _ =>
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
}
}

// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
dataWriter.commit()
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
logError(log"Job ${MDC(JOB_ID, jobId)} aborted.")
if (dataWriter != null) {
dataWriter.abort()
} else {
committer.abortTask(taskAttemptContext)
}
logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)}, " +
log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.")
}, finallyBlock = {
dataWriter.close()
if (dataWriter != null) {
dataWriter.close()
}
})
}

Expand Down

0 comments on commit 06c70ba

Please sign in to comment.