diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 91749ddd794fb..5e6107c4f49c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -383,32 +383,41 @@ object FileFormatWriter extends Logging { committer.setupTask(taskAttemptContext) - val dataWriter = - if (sparkPartitionId != 0 && !iterator.hasNext) { - // In case of empty job, leave first partition to save meta for file format like parquet. - new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) - } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { - new SingleDirectoryDataWriter(description, taskAttemptContext, committer) - } else { - concurrentOutputWriterSpec match { - case Some(spec) => - new DynamicPartitionDataConcurrentWriter( - description, taskAttemptContext, committer, spec) - case _ => - new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) - } - } + var dataWriter: FileFormatDataWriter = null Utils.tryWithSafeFinallyAndFailureCallbacks(block = { + dataWriter = + if (sparkPartitionId != 0 && !iterator.hasNext) { + // In case of empty job, leave first partition to save meta for file format like parquet. + new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + concurrentOutputWriterSpec match { + case Some(spec) => + new DynamicPartitionDataConcurrentWriter( + description, taskAttemptContext, committer, spec) + case _ => + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + // Execute the task to write rows out and commit the task. dataWriter.writeWithIterator(iterator) dataWriter.commit() })(catchBlock = { // If there is an error, abort the task - dataWriter.abort() - logError(log"Job ${MDC(JOB_ID, jobId)} aborted.") + if (dataWriter != null) { + dataWriter.abort() + } else { + committer.abortTask(taskAttemptContext) + } + logError(log"Job: ${MDC(JOB_ID, jobId)}, Task: ${MDC(TASK_ID, taskId)}, " + + log"Task attempt ${MDC(TASK_ATTEMPT_ID, taskAttemptId)} aborted.") }, finallyBlock = { - dataWriter.close() + if (dataWriter != null) { + dataWriter.close() + } }) }