Skip to content

Commit

Permalink
[LI-HOTFIX] Fix the SumOfTopicNameLength metric (#388)
Browse files Browse the repository at this point in the history
TICKET = N/A
LI_DESCRIPTION =
There is a bug in the current implementation of sumOfTopicNameLength:
controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk).sum

Since controllerContext.allTopics is a Set, the result of
controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk)
is also a Set, i.e. Set[Int]. That means if two topics have the same length, their size
gets deduped and only one topic's name length gets counted.

This PR fixes the bug by using the foldLeft operator on the allTopics.

EXIT_CRITERIA = N/A
  • Loading branch information
gitlw authored Sep 2, 2022
1 parent 0e7ab47 commit fec5b27
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
9 changes: 3 additions & 6 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package kafka.controller

import java.util
import java.util.concurrent.{Callable, Executors, TimeUnit}

import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.controller.KafkaController.AlterIsrCallback
import kafka.controller.KafkaController.{AlterIsrCallback, AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback, topicNameBytesOverheadOnZk}
import kafka.cluster.Broker
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
Expand Down Expand Up @@ -62,7 +60,7 @@ final case object AdminClientTriggered extends ElectionTrigger
object KafkaController extends Logging {
val InitialControllerEpoch = 0
val InitialControllerEpochZkVersion = 0

val topicNameBytesOverheadOnZk = 4
type ElectLeadersCallback = Map[TopicPartition, Either[ApiError, Int]] => Unit
type ListReassignmentsCallback = Either[Map[TopicPartition, ReplicaAssignment], ApiError] => Unit
type AlterReassignmentsCallback = Either[Map[TopicPartition, ApiError], ApiError] => Unit
Expand Down Expand Up @@ -185,7 +183,6 @@ class KafkaController(val config: KafkaConfig,
@volatile private var globalTopicCount = 0
// https://docs.google.com/document/d/1s9_klx0OFZBf8-Hw87ZaXWPPqs1kf0EEgAckz4n0mzc/edit#heading=h.j3hmr1fm6kkj
// shows that each topic has a 4 bytes overhead when they are counted toward the 1M zk jute.maxbuffer limit.
val topicNameBytesOverheadOnZk = 4
@volatile private var sumOfTopicNameLength = 0
@volatile private var globalPartitionCount = 0
@volatile private var topicsToDeleteCount = 0
Expand Down Expand Up @@ -1807,7 +1804,7 @@ class KafkaController(val config: KafkaConfig,

globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size

sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.map(_.size + topicNameBytesOverheadOnZk).sum
sumOfTopicNameLength = if (!isActive) 0 else controllerContext.allTopics.foldLeft(0)((prevSum: Int, topicName: String) => prevSum + topicName.size + topicNameBytesOverheadOnZk)

globalPartitionCount = if (!isActive) 0 else controllerContext.partitionWithLeadersCount

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double] == 0.0)
}

@Test
def testSumOfTopicNameLength(): Unit = {
servers = makeServers(1)
val topic1 = "topic1"
TestUtils.createTopic(zkClient, topic1, 1, 1, servers)
val topic2 = "topic2"
TestUtils.createTopic(zkClient, topic2, 1, 1, servers)

val sumOfTopicNameLength = TestUtils.yammerMetricValue("SumOfTopicNameLength")
assertEquals(topic1.size + topic2.size + 2 * KafkaController.topicNameBytesOverheadOnZk, sumOfTopicNameLength)
}

// This test case is used to ensure that there will be no correctness issue after we avoid sending out full
// UpdateMetadataRequest to all brokers in the cluster
@Test
Expand Down

0 comments on commit fec5b27

Please sign in to comment.