From fa4d9b9928e477dd3841349f97dcb5ebeaf9316f Mon Sep 17 00:00:00 2001 From: everpcpc Date: Wed, 30 May 2018 17:46:10 +0800 Subject: [PATCH] Fix getting topic/partition for LogSegments/Directory with jmx --- app/kafka/manager/jmx/KafkaJMX.scala | 59 +++++++++++++--------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/app/kafka/manager/jmx/KafkaJMX.scala b/app/kafka/manager/jmx/KafkaJMX.scala index 8127275ba..dbaff3f17 100644 --- a/app/kafka/manager/jmx/KafkaJMX.scala +++ b/app/kafka/manager/jmx/KafkaJMX.scala @@ -23,7 +23,7 @@ import scala.util.matching.Regex import scala.util.{Failure, Try} object KafkaJMX extends Logging { - + private[this] val defaultJmxConnectorProperties = Map[String, Any] ( "jmx.remote.x.request.waiting.timeout" -> "3000", "jmx.remote.x.notification.fetch.timeout" -> "3000", @@ -99,7 +99,7 @@ object KafkaMetrics { private def getBrokerTopicMeterMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, metricName: String, topicOption: Option[String]) = { getMeterMetric(mbsc, getObjectName(kafkaVersion, metricName, topicOption)) } - + private def getSep(kafkaVersion: KafkaVersion) : String = { kafkaVersion match { case Kafka_0_8_1_1 => "\"" @@ -110,7 +110,7 @@ object KafkaMetrics { def getObjectName(kafkaVersion: KafkaVersion, name: String, topicOption: Option[String] = None) = { val sep = getSep(kafkaVersion) val topicAndName = kafkaVersion match { - case Kafka_0_8_1_1 => + case Kafka_0_8_1_1 => topicOption.map( topic => s"${sep}$topic-$name${sep}").getOrElse(s"${sep}AllTopics$name${sep}") case _ => val topicProp = topicOption.map(topic => s",topic=$topic").getOrElse("") @@ -127,11 +127,11 @@ object KafkaMetrics { /* Gauge, Value : 0 */ private val replicaFetcherManagerMaxLag = new ObjectName( "kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica") - + /* Gauge, Value : 0 */ private val kafkaControllerActiveControllerCount = new ObjectName( "kafka.controller:type=KafkaController,name=ActiveControllerCount") - + /* Gauge, Value : 0 */ private val kafkaControllerOfflinePartitionsCount = new ObjectName( "kafka.controller:type=KafkaController,name=OfflinePartitionsCount") @@ -144,16 +144,18 @@ object KafkaMetrics { private val operatingSystemObjectName = new ObjectName("java.lang:type=OperatingSystem") /* Log Segments */ - private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=*-LogSegments") + private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=LogSegments,topic=*,partition=*") - private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=*-Directory") + private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=Directory,topic=*,partition=*") - private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition") + // exp: kafka.log:type=Log,name=LogSegments,topic=WEB_post,partition=19/Value (ArrayList) = [baseOffset=0, created=1527666489299, logSize=55232565, indexSize=252680] + // private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition") - private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition") + // exp: kafka.log:type=Log,name=Directory,topic=WEB_post,partition=16/Value (String) = /log/kafka/WEB_post-16 + // private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition") val LogSegmentRegex = new Regex( - "baseOffset=(.*), created=(.*), logSize=(.*), indexSize=(.*)", + "baseOffset=(.*); created=(.*); logSize=(.*); indexSize=(.*)", "baseOffset", "created", "logSize", "indexSize" ) @@ -172,7 +174,7 @@ object KafkaMetrics { case _: InstanceNotFoundException => OSMetric(0D, 0D) } } - + private def getMeterMetric(mbsc: MBeanServerConnection, name: ObjectName) = { import scala.collection.JavaConverters._ try { @@ -187,7 +189,7 @@ object KafkaMetrics { case _: InstanceNotFoundException => MeterMetric(0,0,0,0,0) } } - + private def getLongValue(attributes: Seq[Attribute], name: String) = { attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Long]).getOrElse(0L) } @@ -196,27 +198,22 @@ object KafkaMetrics { attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Double]).getOrElse(0D) } - private def topicAndPartition(name: String, regex: Regex) = { + private def topicAndPartition(objectName: ObjectName) = { try { - val matches = regex.findAllIn(name).matchData.toSeq - require(matches.size == 1) - val m = matches.head - - val topic = m.group("topic") - val partition = m.group("partition").toInt - + val topic = objectName.getKeyProperty("topic") + val partition = objectName.getKeyProperty("partition").toInt (topic, partition) } catch { case e: Exception => - throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(name), e) + throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(objectName), e) } } private def queryValues[K, V]( mbsc: MBeanServerConnection, objectName: ObjectName, - keyConverter: String => K, + keyConverter: ObjectName => K, valueConverter: Object => V ) = { val logsSizeObjectNames = mbsc.queryNames(objectName, null).asScala.toSeq @@ -228,12 +225,12 @@ object KafkaMetrics { private def queryValue[K, V]( mbsc: MBeanServerConnection, objectName: ObjectName, - keyConverter: String => K, + keyConverter: ObjectName => K, valueConverter: Object => V ) = { - val name = objectName.getKeyProperty("name") + // val name = objectName.getKeyProperty("name") val mbean = MBeanServerInvocationHandler.newProxyInstance(mbsc, objectName, classOf[GaugeMBean], true) - (keyConverter(name), valueConverter(mbean.getValue)) + (keyConverter(objectName), valueConverter(mbean.getValue)) } private def parseLogSegment(str: String): LogSegment = { @@ -259,7 +256,7 @@ object KafkaMetrics { queryValues( mbsc, logSegmentObjectName, - key => topicAndPartition(key, LogSegmentsNameRegex), + key => topicAndPartition(key), value => { val lst = value.asInstanceOf[ju.List[String]] lst.asScala.map(parseLogSegment).toSeq @@ -271,7 +268,7 @@ object KafkaMetrics { queryValues( mbsc, directoryObjectName, - key => topicAndPartition(key, DirectoryNameRegex), + key => topicAndPartition(key), value => value.asInstanceOf[String] ) }.toMap @@ -355,10 +352,10 @@ case class MeterMetric(count: Long, def +(o: MeterMetric) : MeterMetric = { MeterMetric( - o.count + count, - o.fifteenMinuteRate + fifteenMinuteRate, - o.fiveMinuteRate + fiveMinuteRate, - o.oneMinuteRate + oneMinuteRate, + o.count + count, + o.fifteenMinuteRate + fifteenMinuteRate, + o.fiveMinuteRate + fiveMinuteRate, + o.oneMinuteRate + oneMinuteRate, o.meanRate + meanRate) } }