Skip to content

Commit

Permalink
[LI-HOTFIX] Add the li.alter.isr.enable config and use the ZK based A…
Browse files Browse the repository at this point in the history
…lterIsrManager by default (#386)

TICKET = LIKAFKA-46520
LI_DESCRIPTION =
We are seeing the AlterISR request can take a long time in extreme conditions.

And when a leader has an inflight AlterISR request, the subsequent AlterISR requests are dropped,
indicated by the following log from the leader:

"Failed to enqueue ISR change state LeaderAndIsr(leader=..., leaderEpoch=.., isr=List(...),
zkVersion=...) for partition ..."
This means the followers are not able to join the ISR quickly and hence cause UnderMinISR partitions
to last for a long time.

EXIT_CRITERIA = When we make sure the AlterISR requests are efficient enough for our use case.
  • Loading branch information
gitlw authored Sep 1, 2022
1 parent 3fcfb1c commit 634e3d2
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ object Defaults {

/** Linkedin Internal states */
val LiCombinedControlRequestEnabled = false
val LiAlterIsrEnabled = false
val LiAsyncFetcherEnabled = false
val LiNumControllerInitThreads = 1
}
Expand Down Expand Up @@ -436,6 +437,7 @@ object KafkaConfig {
val PreferredControllerProp = "preferred.controller"
val LiAsyncFetcherEnableProp = "li.async.fetcher.enable"
val LiCombinedControlRequestEnableProp = "li.combined.control.request.enable"
val LiAlterIsrEnableProp = "li.alter.isr.enable"
val LiNumControllerInitThreadsProp = "li.num.controller.init.threads"
val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback"
val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable"
Expand Down Expand Up @@ -771,6 +773,7 @@ object KafkaConfig {
val PreferredControllerDoc = "Specifies whether the broker is a dedicated controller node. If set to true, the broker is a preferred controller node."
val LiAsyncFetcherEnableDoc = "Specifies whether the event-based async fetcher should be used."
val LiCombinedControlRequestEnableDoc = "Specifies whether the controller should use the LiCombinedControlRequest."
val LiAlterIsrEnabledDoc = "Specifies whether the brokers should use the AlterISR request to propagate ISR changes to the controller. If set to false, brokers will propagate the updates via Zookeeper."
val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover."
// Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry
// here allows its value to be obtained without holding the dynamic broker configuration lock.
Expand Down Expand Up @@ -1209,6 +1212,7 @@ object KafkaConfig {
.define(PreferredControllerProp, BOOLEAN, Defaults.PreferredController, HIGH, PreferredControllerDoc)
.define(LiAsyncFetcherEnableProp, BOOLEAN, Defaults.LiAsyncFetcherEnabled, HIGH, LiAsyncFetcherEnableDoc)
.define(LiCombinedControlRequestEnableProp, BOOLEAN, Defaults.LiCombinedControlRequestEnabled, HIGH, LiCombinedControlRequestEnableDoc)
.define(LiAlterIsrEnableProp, BOOLEAN, Defaults.LiAlterIsrEnabled, HIGH, LiAlterIsrEnabledDoc)
.define(LiNumControllerInitThreadsProp, INT, Defaults.LiNumControllerInitThreads, atLeast(1), LOW, LiNumControllerInitThreadsDoc)
.define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc)
.define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc)
Expand Down Expand Up @@ -1717,6 +1721,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

val liAsyncFetcherEnable = getBoolean(KafkaConfig.LiAsyncFetcherEnableProp)
def liCombinedControlRequestEnable = getBoolean(KafkaConfig.LiCombinedControlRequestEnableProp)
def liAlterIsrEnable = getBoolean(KafkaConfig.LiAlterIsrEnableProp)
def liNumControllerInitThreads = getInt(KafkaConfig.LiNumControllerInitThreadsProp)

def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class KafkaServer(
socketServer.startup(startProcessingRequests = false)

/* start replica manager */
alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) {
alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported && config.liAlterIsrEnable) {
AlterIsrManager(
config = config,
metadataCache = metadataCache,
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.api.ApiVersion
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.{Disabled, Test}

import java.util.Properties

Expand Down Expand Up @@ -116,6 +116,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
}

@Test
@Disabled // LIKAFKA-46520 Disabled since the AlterISR request is not efficient enough for the LI use case
def testAlterIsrManager(): Unit = {
val props = TestUtils.createBrokerConfigs(1, zkConnect).head
props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.toString)
Expand Down

0 comments on commit 634e3d2

Please sign in to comment.