Skip to content

Commit

Permalink
[SPARK-40844][SS] Flip the default value of Kafka offset fetching config
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to flip the default value of Kafka offset fetching config (`spark.sql.streaming.kafka.useDeprecatedOffsetFetching`) from `true` to `false`, which enables AdminClient based offset fetching by default.

### Why are the changes needed?

We had been encountered several production issues with old offset fetching (e.g. hang, issue with Kafka consumer group rebalance) which could be mitigated with new offset fetching. Despite the breaking change on the ACL, there is no need for moderate users to suffer from the old way.

The discussion went through the dev. mailing list: https://lists.apache.org/thread/spkco94gw33sj8355mhlxz1vl7gl1g5c

### Does this PR introduce _any_ user-facing change?

Yes, especially users who relies on Kafka ACL based on consumer group. They need to either adjust the ACL to topic based one, or set the value to `true` for `spark.sql.streaming.kafka.useDeprecatedOffsetFetching` to use the old approach.

### How was this patch tested?

Existing UTs.

Closes apache#38306 from HeartSaVioR/SPARK-40844.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
HeartSaVioR committed Oct 19, 2022
1 parent 3ed2732 commit 706862c
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 3 deletions.
2 changes: 2 additions & 0 deletions docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.

- Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer [SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more details.

- Since Spark 3.4, the default value of configuration for Kafka offset fetching (`spark.sql.streaming.kafka.useDeprecatedOffsetFetching`) is changed from `true` to `false`. The default no longer relies consumer group based scheduling, which affect the required ACL. For further details please see [Structured Streaming Kafka Integration](structured-streaming-kafka-integration.html#offset-fetching).

## Upgrading from Structured Streaming 3.2 to 3.3

- Since Spark 3.3, all stateful operators require hash partitioning with exact grouping keys. In previous versions, all stateful operators except stream-stream join require loose partitioning criteria which opens the possibility on correctness issue. (See [SPARK-38204](https://issues.apache.org/jira/browse/SPARK-38204) for more details.) To ensure backward compatibility, we retain the old behavior with the checkpoint built from older versions.
Expand Down
5 changes: 3 additions & 2 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,9 @@ Timestamp offset options require Kafka 0.10.1.0 or higher.
### Offset fetching

In Spark 3.0 and before Spark uses <code>KafkaConsumer</code> for offset fetching which could cause infinite wait in the driver.
In Spark 3.1 a new configuration option added <code>spark.sql.streaming.kafka.useDeprecatedOffsetFetching</code> (default: <code>true</code>)
which could be set to `false` allowing Spark to use new offset fetching mechanism using <code>AdminClient</code>.
In Spark 3.1 a new configuration option added <code>spark.sql.streaming.kafka.useDeprecatedOffsetFetching</code> (default: <code>false</code>)
which allows Spark to use new offset fetching mechanism using <code>AdminClient</code>. (Set this to `true` to use old offset fetching with <code>KafkaConsumer</code>.)

When the new mechanism used the following applies.

First of all the new approach supports Kafka brokers `0.11.0.0+`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ object SQLConf {
"Integration Guide.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED =
buildConf("spark.sql.streaming.statefulOperator.checkCorrectness.enabled")
Expand Down

0 comments on commit 706862c

Please sign in to comment.