diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 592c594e8e1fe..040016147eb0d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,13 +18,11 @@ package kafka.controller import java.util import java.util.concurrent.{Callable, Executors, TimeUnit} - import kafka.admin.AdminOperationException import kafka.api._ import kafka.common._ -import kafka.controller.KafkaController.AlterIsrCallback +import kafka.controller.KafkaController.{AlterIsrCallback, AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback, topicNameBytesOverheadOnZk} import kafka.cluster.Broker -import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback} import kafka.coordinator.transaction.ZkProducerIdManager import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ @@ -62,7 +60,7 @@ final case object AdminClientTriggered extends ElectionTrigger object KafkaController extends Logging { val InitialControllerEpoch = 0 val InitialControllerEpochZkVersion = 0 - + val topicNameBytesOverheadOnZk = 4 type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit @@ -185,7 +183,6 @@ class KafkaController(val config: KafkaConfig, @volatile private var globalTopicCount = 0 // https://docs.google.com/document/d/1s9_klx0OFZBf8-Hw87ZaXWPPqs1kf0EEgAckz4n0mzc/edit#heading=h.j3hmr1fm6kkj // shows that each topic has a 4 bytes overhead when they are counted toward the 1M zk jute.maxbuffer limit. - val topicNameBytesOverheadOnZk = 4 @volatile private var sumOfTopicNameLength = 0 @volatile private var globalPartitionCount = 0 @volatile private var topicsToDeleteCount = 0 @@ -1807,7 +1804,7 @@ class KafkaController(val config: KafkaConfig, globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size - sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk).sum + sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.foldLeft(0)((prevSum: Int, topicName: String) => prevSum + topicName.size + topicNameBytesOverheadOnZk) globalPartitionCount = if (!isActive) 0 else controllerContext.partitionWithLeadersCount diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index f61901b450024..dfaae2f1b317c 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -120,6 +120,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double] == 0.0) } + @Test + def testSumOfTopicNameLength(): Unit = { + servers = makeServers(1) + val topic1 = "topic1" + TestUtils.createTopic(zkClient, topic1, 1, 1, servers) + val topic2 = "topic2" + TestUtils.createTopic(zkClient, topic2, 1, 1, servers) + + val sumOfTopicNameLength = TestUtils.yammerMetricValue("SumOfTopicNameLength") + assertEquals(topic1.size + topic2.size + 2 * KafkaController.topicNameBytesOverheadOnZk, sumOfTopicNameLength) + } + // This test case is used to ensure that there will be no correctness issue after we avoid sending out full // UpdateMetadataRequest to all brokers in the cluster @Test