Skip to content

Commit

Permalink
[SPARK-41387][SS] Assert current end offset from Kafka data source fo…
Browse files Browse the repository at this point in the history
…r Trigger.AvailableNow

### What changes were proposed in this pull request?

This PR proposes to assert the end offset of current batch for Kafka data source with Trigger.AvailableNow, via fetching the latest offset from known topic partitions and compare with end offset. Kafka data source will throw error when the latest information from topic-partition is a loss e.g. topic-partition is missing, latest offset is less than the offset in end offset.

This way we construct a safeguard for Trigger.AvailableNow to avoid hanging, although it won't prevent 100% of cases for future bugs.

### Why are the changes needed?

This change will let Kafka data source do its best effort to fail fast when the state of the query is not able to reach the end state of Trigger.AvailableNow.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test cases.

Closes apache#38911 from HeartSaVioR/SPARK-41387.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Dec 16, 2022
1 parent 5aca8b4 commit bc6351d
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.{ErrorClassesJsonReader, SparkException}

object KafkaExceptions {
private val errorClassesJsonReader: ErrorClassesJsonReader =
new ErrorClassesJsonReader(
Seq(getClass.getClassLoader.getResource("error/kafka-error-classes.json")))

def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
tpsForPrefetched: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException = {
val errMsg = errorClassesJsonReader.getErrorMessage(
"MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED",
Map(
"tpsForPrefetched" -> tpsForPrefetched.toString(),
"tpsForEndOffset" -> tpsForEndOffset.toString()
)
)
new SparkException(errMsg)
}

def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
prefetchedOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException = {
val errMsg = errorClassesJsonReader.getErrorMessage(
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED",
Map(
"prefetchedOffset" -> prefetchedOffset.toString(),
"endOffset" -> endOffset.toString()
)
)
new SparkException(errMsg)
}

def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
tpsForLatestOffset: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException = {
val errMsg = errorClassesJsonReader.getErrorMessage(
"LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW",
Map(
"tpsForLatestOffset" -> tpsForLatestOffset.toString(),
"tpsForEndOffset" -> tpsForEndOffset.toString()
)
)
new SparkException(errMsg)
}

def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
latestOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException = {
val errMsg = errorClassesJsonReader.getErrorMessage(
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_LATEST_WITH_TRIGGER_AVAILABLENOW",
Map(
"latestOffset" -> latestOffset.toString(),
"endOffset" -> endOffset.toString()
)
)
new SparkException(errMsg)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ private[kafka010] class KafkaMicroBatchStream(
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets

if (allDataForTriggerAvailableNow != null) {
verifyEndOffsetForTriggerAvailableNow(endPartitionOffsets)
}

val offsetRanges = kafkaOffsetReader.getOffsetRangesFromResolvedOffsets(
startPartitionOffsets,
endPartitionOffsets,
Expand Down Expand Up @@ -313,6 +317,50 @@ private[kafka010] class KafkaMicroBatchStream(
}
}

private def verifyEndOffsetForTriggerAvailableNow(
endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
val tpsForEndOffset = endPartitionOffsets.keySet

if (tpsForPrefetched != tpsForEndOffset) {
throw KafkaExceptions.mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
tpsForPrefetched, tpsForEndOffset)
}

val endOffsetHasGreaterThanPrefetched = {
allDataForTriggerAvailableNow.keySet.exists { tp =>
val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
val offsetFromEndOffset = endPartitionOffsets(tp)
offsetFromEndOffset > offsetFromPrefetched
}
}
if (endOffsetHasGreaterThanPrefetched) {
throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
allDataForTriggerAvailableNow, endPartitionOffsets)
}

val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))
val tpsForLatestOffsets = latestOffsets.keySet

if (!tpsForEndOffset.subsetOf(tpsForLatestOffsets)) {
throw KafkaExceptions.lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
tpsForLatestOffsets, tpsForEndOffset)
}

val endOffsetHasGreaterThenLatest = {
tpsForEndOffset.exists { tp =>
val offsetFromLatest = latestOffsets(tp)
val offsetFromEndOffset = endPartitionOffsets(tp)
offsetFromEndOffset > offsetFromLatest
}
}
if (endOffsetHasGreaterThenLatest) {
throw KafkaExceptions
.endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
latestOffsets, endPartitionOffsets)
}
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets(
Some(getOrCreateInitialPartitionOffsets()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ private[kafka010] class KafkaSource(

logInfo(s"GetBatch called with start = $start, end = $end")
val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)

if (allDataForTriggerAvailableNow != null) {
verifyEndOffsetForTriggerAvailableNow(untilPartitionOffsets)
}

// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
Expand Down Expand Up @@ -349,6 +354,50 @@ private[kafka010] class KafkaSource(
}
}

private def verifyEndOffsetForTriggerAvailableNow(
endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
val tpsForEndOffset = endPartitionOffsets.keySet

if (tpsForPrefetched != tpsForEndOffset) {
throw KafkaExceptions.mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
tpsForPrefetched, tpsForEndOffset)
}

val endOffsetHasGreaterThanPrefetched = {
allDataForTriggerAvailableNow.keySet.exists { tp =>
val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
val offsetFromEndOffset = endPartitionOffsets(tp)
offsetFromEndOffset > offsetFromPrefetched
}
}
if (endOffsetHasGreaterThanPrefetched) {
throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
allDataForTriggerAvailableNow, endPartitionOffsets)
}

val latestOffsets = kafkaReader.fetchLatestOffsets(Some(endPartitionOffsets))
val tpsForLatestOffsets = latestOffsets.keySet

if (!tpsForEndOffset.subsetOf(tpsForLatestOffsets)) {
throw KafkaExceptions.lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
tpsForLatestOffsets, tpsForEndOffset)
}

val endOffsetHasGreaterThenLatest = {
tpsForEndOffset.exists { tp =>
val offsetFromLatest = latestOffsets(tp)
val offsetFromEndOffset = endPartitionOffsets(tp)
offsetFromEndOffset > offsetFromLatest
}
}
if (endOffsetHasGreaterThenLatest) {
throw KafkaExceptions
.endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
latestOffsets, endPartitionOffsets)
}
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaReader.fetchLatestOffsets(Some(initialPartitionOffsets))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED" : {
"message" : [
"Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. The error could be transient - restart your query, and report if you still see the same issue.",
"topic-partitions for pre-fetched offset: <tpsForPrefetched>, topic-partitions for end offset: <tpsForEndOffset>."
]
},
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
"message" : [
"For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset. The error could be transient - restart your query, and report if you still see the same issue.",
"pre-fetched offset: <prefetchedOffset>, end offset: <endOffset>."
]
},
"LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : {
"message" : [
"Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. The error could be transient - restart your query, and report if you still see the same issue.",
"topic-partitions for latest offset: <tpsForLatestOffset>, topic-partitions for end offset: <tpsForEndOffset>"
]
},
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_LATEST_WITH_TRIGGER_AVAILABLENOW" : {
"message" : [
"Some of partitions in Kafka topic(s) report available offset which is less than end offset during running query with Trigger.AvailableNow. The error could be transient - restart your query, and report if you still see the same issue.",
"latest offset: <latestOffset>, end offset: <endOffset>"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.TestUtils
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
assert(index == 3)
}

test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
"during subsequent batches") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)

testUtils.sendMessages(topic, (0 until 15).map { case x =>
s"foo-$x"
}.toArray, Some(0))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 5)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
// the query should fail regardless of this option
.option("failOnDataLoss", "true")
.load()

def startTriggerAvailableNowQuery(): StreamingQuery = {
reader.writeStream
.foreachBatch((_: Dataset[Row], batchId: Long) => {
testUtils.deleteTopic(topic)
// create partitions less than the kafka data source figured out as an end state
testUtils.createTopic(topic, partitions = 3)
// offset will keep the same
testUtils.sendMessages(topic, (0 until 15).map { case x =>
s"foo-$x"
}.toArray, Some(0))
null.asInstanceOf[Unit]
})
.trigger(Trigger.AvailableNow)
.start()
}

val exc = intercept[Exception] {
val query = startTriggerAvailableNowQuery()
try {
assert(query.awaitTermination(streamingTimeout.toMillis))
} finally {
query.stop()
}
}
TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +
"during running query with Trigger.AvailableNow.")
TestUtils.assertExceptionMsg(exc, "topic-partitions for latest offset: ")
TestUtils.assertExceptionMsg(exc, "topic-partitions for end offset: ")
}

test("Query with Trigger.AvailableNow should throw error when offset(s) in planned topic " +
"partitions got unavailable during subsequent batches") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)

testUtils.sendMessages(topic, (0 until 15).map { case x =>
s"foo-$x"
}.toArray, Some(0))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 5)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
// the query should fail regardless of this option
.option("failOnDataLoss", "true")
.load()

def startTriggerAvailableNowQuery(): StreamingQuery = {
reader.writeStream
.foreachBatch((_: Dataset[Row], batchId: Long) => {
testUtils.deleteTopic(topic)
// the number of topic partitions remain the same
testUtils.createTopic(topic, partitions = 5)
// the number of available records will change to lower than the end state
testUtils.sendMessages(topic, (0 until 10).map { case x =>
s"foo-$x"
}.toArray, Some(0))
null.asInstanceOf[Unit]
})
.trigger(Trigger.AvailableNow)
.start()
}

val exc = intercept[StreamingQueryException] {
val query = startTriggerAvailableNowQuery()
try {
assert(query.awaitTermination(streamingTimeout.toMillis))
} finally {
query.stop()
}
}
TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) report available" +
" offset which is less than end offset during running query with Trigger.AvailableNow.")
TestUtils.assertExceptionMsg(exc, "latest offset: ")
TestUtils.assertExceptionMsg(exc, "end offset: ")
}

test("(de)serialization of initial offsets") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
Expand Down

0 comments on commit bc6351d

Please sign in to comment.