From fec5b27c56b15780bbd21d76024fef6019947343 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 2 Sep 2022 15:20:51 -0700 Subject: [PATCH] [LI-HOTFIX] Fix the SumOfTopicNameLength metric (#388) TICKET = N/A LI_DESCRIPTION = There is a bug in the current implementation of sumOfTopicNameLength: controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk).sum Since controllerContext.allTopics is a Set, the result of controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk) is also a Set, i.e. Set[Int]. That means if two topics have the same length, their size gets deduped and only one topic's name length gets counted. This PR fixes the bug by using the foldLeft operator on the allTopics. EXIT_CRITERIA = N/A --- .../scala/kafka/controller/KafkaController.scala | 9 +++------ .../kafka/controller/ControllerIntegrationTest.scala | 12 ++++++++++++ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 592c594e8e1fe..040016147eb0d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,13 +18,11 @@ package kafka.controller import java.util import java.util.concurrent.{Callable, Executors, TimeUnit} - import kafka.admin.AdminOperationException import kafka.api._ import kafka.common._ -import kafka.controller.KafkaController.AlterIsrCallback +import kafka.controller.KafkaController.{AlterIsrCallback, AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback, topicNameBytesOverheadOnZk} import kafka.cluster.Broker -import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback} import kafka.coordinator.transaction.ZkProducerIdManager import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ @@ -62,7 +60,7 @@ final case object AdminClientTriggered extends ElectionTrigger object KafkaController extends Logging { val InitialControllerEpoch = 0 val InitialControllerEpochZkVersion = 0 - + val topicNameBytesOverheadOnZk = 4 type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit @@ -185,7 +183,6 @@ class KafkaController(val config: KafkaConfig, @volatile private var globalTopicCount = 0 // https://docs.google.com/document/d/1s9_klx0OFZBf8-Hw87ZaXWPPqs1kf0EEgAckz4n0mzc/edit#heading=h.j3hmr1fm6kkj // shows that each topic has a 4 bytes overhead when they are counted toward the 1M zk jute.maxbuffer limit. - val topicNameBytesOverheadOnZk = 4 @volatile private var sumOfTopicNameLength = 0 @volatile private var globalPartitionCount = 0 @volatile private var topicsToDeleteCount = 0 @@ -1807,7 +1804,7 @@ class KafkaController(val config: KafkaConfig, globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size - sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk).sum + sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.foldLeft(0)((prevSum: Int, topicName: String) => prevSum + topicName.size + topicNameBytesOverheadOnZk) globalPartitionCount = if (!isActive) 0 else controllerContext.partitionWithLeadersCount diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index f61901b450024..dfaae2f1b317c 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -120,6 +120,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double] == 0.0) } + @Test + def testSumOfTopicNameLength(): Unit = { + servers = makeServers(1) + val topic1 = "topic1" + TestUtils.createTopic(zkClient, topic1, 1, 1, servers) + val topic2 = "topic2" + TestUtils.createTopic(zkClient, topic2, 1, 1, servers) + + val sumOfTopicNameLength = TestUtils.yammerMetricValue("SumOfTopicNameLength") + assertEquals(topic1.size + topic2.size + 2 * KafkaController.topicNameBytesOverheadOnZk, sumOfTopicNameLength) + } + // This test case is used to ensure that there will be no correctness issue after we avoid sending out full // UpdateMetadataRequest to all brokers in the cluster @Test