Skip to content

Commit

Permalink
Fix getting topic/partition for LogSegments/Directory with jmx
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed May 31, 2018
1 parent bf068d0 commit ea71ab3
Showing 1 changed file with 27 additions and 30 deletions.
57 changes: 27 additions & 30 deletions app/kafka/manager/jmx/KafkaJMX.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.matching.Regex
import scala.util.{Failure, Try}

object KafkaJMX extends Logging {

private[this] val defaultJmxConnectorProperties = Map[String, Any] (
"jmx.remote.x.request.waiting.timeout" -> "3000",
"jmx.remote.x.notification.fetch.timeout" -> "3000",
Expand Down Expand Up @@ -99,7 +99,7 @@ object KafkaMetrics {
private def getBrokerTopicMeterMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, metricName: String, topicOption: Option[String]) = {
getMeterMetric(mbsc, getObjectName(kafkaVersion, metricName, topicOption))
}

private def getSep(kafkaVersion: KafkaVersion) : String = {
kafkaVersion match {
case Kafka_0_8_1_1 => "\""
Expand All @@ -110,7 +110,7 @@ object KafkaMetrics {
def getObjectName(kafkaVersion: KafkaVersion, name: String, topicOption: Option[String] = None) = {
val sep = getSep(kafkaVersion)
val topicAndName = kafkaVersion match {
case Kafka_0_8_1_1 =>
case Kafka_0_8_1_1 =>
topicOption.map( topic => s"${sep}$topic-$name${sep}").getOrElse(s"${sep}AllTopics$name${sep}")
case _ =>
val topicProp = topicOption.map(topic => s",topic=$topic").getOrElse("")
Expand All @@ -127,11 +127,11 @@ object KafkaMetrics {
/* Gauge, Value : 0 */
private val replicaFetcherManagerMaxLag = new ObjectName(
"kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica")

/* Gauge, Value : 0 */
private val kafkaControllerActiveControllerCount = new ObjectName(
"kafka.controller:type=KafkaController,name=ActiveControllerCount")

/* Gauge, Value : 0 */
private val kafkaControllerOfflinePartitionsCount = new ObjectName(
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount")
Expand All @@ -144,13 +144,15 @@ object KafkaMetrics {
private val operatingSystemObjectName = new ObjectName("java.lang:type=OperatingSystem")

/* Log Segments */
private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=*-LogSegments")
private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=LogSegments,topic=*,partition=*")

private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=*-Directory")
private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=Directory,topic=*,partition=*")

private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition")
// exp: kafka.log:type=Log,name=LogSegments,topic=WEB_post,partition=19/Value (ArrayList) = [baseOffset=0, created=1527666489299, logSize=55232565, indexSize=252680]
// private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition")

private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition")
// exp: kafka.log:type=Log,name=Directory,topic=WEB_post,partition=16/Value (String) = /log/kafka/WEB_post-16
// private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition")

val LogSegmentRegex = new Regex(
"baseOffset=(.*), created=(.*), logSize=(.*), indexSize=(.*)",
Expand All @@ -172,7 +174,7 @@ object KafkaMetrics {
case _: InstanceNotFoundException => OSMetric(0D, 0D)
}
}

private def getMeterMetric(mbsc: MBeanServerConnection, name: ObjectName) = {
import scala.collection.JavaConverters._
try {
Expand All @@ -187,7 +189,7 @@ object KafkaMetrics {
case _: InstanceNotFoundException => MeterMetric(0,0,0,0,0)
}
}

private def getLongValue(attributes: Seq[Attribute], name: String) = {
attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Long]).getOrElse(0L)
}
Expand All @@ -196,27 +198,22 @@ object KafkaMetrics {
attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Double]).getOrElse(0D)
}

private def topicAndPartition(name: String, regex: Regex) = {
private def topicAndPartition(objectName: ObjectName) = {
try {
val matches = regex.findAllIn(name).matchData.toSeq
require(matches.size == 1)
val m = matches.head

val topic = m.group("topic")
val partition = m.group("partition").toInt

val topic = objectName.getKeyProperty("topic")
val partition = objectName.getKeyProperty("partition").toInt
(topic, partition)
}
catch {
case e: Exception =>
throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(name), e)
throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(objectName), e)
}
}

private def queryValues[K, V](
mbsc: MBeanServerConnection,
objectName: ObjectName,
keyConverter: String => K,
keyConverter: ObjectName => K,
valueConverter: Object => V
) = {
val logsSizeObjectNames = mbsc.queryNames(objectName, null).asScala.toSeq
Expand All @@ -228,12 +225,12 @@ object KafkaMetrics {
private def queryValue[K, V](
mbsc: MBeanServerConnection,
objectName: ObjectName,
keyConverter: String => K,
keyConverter: ObjectName => K,
valueConverter: Object => V
) = {
val name = objectName.getKeyProperty("name")
// val name = objectName.getKeyProperty("name")
val mbean = MBeanServerInvocationHandler.newProxyInstance(mbsc, objectName, classOf[GaugeMBean], true)
(keyConverter(name), valueConverter(mbean.getValue))
(keyConverter(objectName), valueConverter(mbean.getValue))
}

private def parseLogSegment(str: String): LogSegment = {
Expand All @@ -259,7 +256,7 @@ object KafkaMetrics {
queryValues(
mbsc,
logSegmentObjectName,
key => topicAndPartition(key, LogSegmentsNameRegex),
key => topicAndPartition(key),
value => {
val lst = value.asInstanceOf[ju.List[String]]
lst.asScala.map(parseLogSegment).toSeq
Expand All @@ -271,7 +268,7 @@ object KafkaMetrics {
queryValues(
mbsc,
directoryObjectName,
key => topicAndPartition(key, DirectoryNameRegex),
key => topicAndPartition(key),
value => value.asInstanceOf[String]
)
}.toMap
Expand Down Expand Up @@ -355,10 +352,10 @@ case class MeterMetric(count: Long,

def +(o: MeterMetric) : MeterMetric = {
MeterMetric(
o.count + count,
o.fifteenMinuteRate + fifteenMinuteRate,
o.fiveMinuteRate + fiveMinuteRate,
o.oneMinuteRate + oneMinuteRate,
o.count + count,
o.fifteenMinuteRate + fifteenMinuteRate,
o.fiveMinuteRate + fiveMinuteRate,
o.oneMinuteRate + oneMinuteRate,
o.meanRate + meanRate)
}
}
Expand Down

0 comments on commit ea71ab3

Please sign in to comment.