From 634e3d21601fdd7359c33888832c07faa1ff007c Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 1 Sep 2022 16:39:09 -0700 Subject: [PATCH] [LI-HOTFIX] Add the li.alter.isr.enable config and use the ZK based AlterIsrManager by default (#386) TICKET = LIKAFKA-46520 LI_DESCRIPTION = We are seeing the AlterISR request can take a long time in extreme conditions. And when a leader has an inflight AlterISR request, the subsequent AlterISR requests are dropped, indicated by the following log from the leader: "Failed to enqueue ISR change state LeaderAndIsr(leader=..., leaderEpoch=.., isr=List(...), zkVersion=...) for partition ..." This means the followers are not able to join the ISR quickly and hence cause UnderMinISR partitions to last for a long time. EXIT_CRITERIA = When we make sure the AlterISR requests are efficient enough for our use case. --- core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +++++ core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/test/scala/unit/kafka/server/KafkaServerTest.scala | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 591b031c33460..bab804c65de0a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -322,6 +322,7 @@ object Defaults { /** Linkedin Internal states */ val LiCombinedControlRequestEnabled = false + val LiAlterIsrEnabled = false val LiAsyncFetcherEnabled = false val LiNumControllerInitThreads = 1 } @@ -436,6 +437,7 @@ object KafkaConfig { val PreferredControllerProp = "preferred.controller" val LiAsyncFetcherEnableProp = "li.async.fetcher.enable" val LiCombinedControlRequestEnableProp = "li.combined.control.request.enable" + val LiAlterIsrEnableProp = "li.alter.isr.enable" val LiNumControllerInitThreadsProp = "li.num.controller.init.threads" val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback" val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable" @@ -771,6 +773,7 @@ object KafkaConfig { val PreferredControllerDoc = "Specifies whether the broker is a dedicated controller node. If set to true, the broker is a preferred controller node." val LiAsyncFetcherEnableDoc = "Specifies whether the event-based async fetcher should be used." val LiCombinedControlRequestEnableDoc = "Specifies whether the controller should use the LiCombinedControlRequest." + val LiAlterIsrEnabledDoc = "Specifies whether the brokers should use the AlterISR request to propagate ISR changes to the controller. If set to false, brokers will propagate the updates via Zookeeper." val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover." // Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry // here allows its value to be obtained without holding the dynamic broker configuration lock. @@ -1209,6 +1212,7 @@ object KafkaConfig { .define(PreferredControllerProp, BOOLEAN, Defaults.PreferredController, HIGH, PreferredControllerDoc) .define(LiAsyncFetcherEnableProp, BOOLEAN, Defaults.LiAsyncFetcherEnabled, HIGH, LiAsyncFetcherEnableDoc) .define(LiCombinedControlRequestEnableProp, BOOLEAN, Defaults.LiCombinedControlRequestEnabled, HIGH, LiCombinedControlRequestEnableDoc) + .define(LiAlterIsrEnableProp, BOOLEAN, Defaults.LiAlterIsrEnabled, HIGH, LiAlterIsrEnabledDoc) .define(LiNumControllerInitThreadsProp, INT, Defaults.LiNumControllerInitThreads, atLeast(1), LOW, LiNumControllerInitThreadsDoc) .define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc) .define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc) @@ -1717,6 +1721,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val liAsyncFetcherEnable = getBoolean(KafkaConfig.LiAsyncFetcherEnableProp) def liCombinedControlRequestEnable = getBoolean(KafkaConfig.LiCombinedControlRequestEnableProp) + def liAlterIsrEnable = getBoolean(KafkaConfig.LiAlterIsrEnableProp) def liNumControllerInitThreads = getInt(KafkaConfig.LiNumControllerInitThreadsProp) def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b3d1b0d761179..0a39c45694ce3 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -337,7 +337,7 @@ class KafkaServer( socketServer.startup(startProcessingRequests = false) /* start replica manager */ - alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) { + alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported && config.liAlterIsrEnable) { AlterIsrManager( config = config, metadataCache = metadataCache, diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala index 8056e23a909de..aaf24712fc4a5 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -21,7 +21,7 @@ import kafka.api.ApiVersion import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail} -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.{Disabled, Test} import java.util.Properties @@ -116,6 +116,7 @@ class KafkaServerTest extends ZooKeeperTestHarness { } @Test + @Disabled // LIKAFKA-46520 Disabled since the AlterISR request is not efficient enough for the LI use case def testAlterIsrManager(): Unit = { val props = TestUtils.createBrokerConfigs(1, zkConnect).head props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.toString)