diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 56b7dd6d35..e9c4cfa3b0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Random import com.codahale.metrics._ @@ -59,8 +60,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricsCapacity: Int = conf.metricsCapacity - val innerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() - val timerSupplier = new TimerSupplier(metricsSlidingWindowSize) val metricsCleaner: ScheduledExecutorService = @@ -79,12 +78,26 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" + val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() + protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() + protected val namedTimers + : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] = + JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]() + + protected val namedCounters: ConcurrentHashMap[String, NamedCounter] = + JavaUtils.newConcurrentHashMap[String, NamedCounter]() + protected val namedMeters: ConcurrentHashMap[String, NamedMeter] = JavaUtils.newConcurrentHashMap[String, NamedMeter]() + def addTimerMetrics(namedTimer: NamedTimer): Unit = { + val timerMetricsString = getTimerMetrics(namedTimer) + timerMetrics.add(timerMetricsString) + } + def addGauge[T]( name: String, labels: Map[String, String], @@ -145,10 +158,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) addMeter(name, Map.empty[String, String], meter) } - protected val namedTimers - : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] = - JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]() - def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String]) def addTimer(name: String, labels: Map[String, String]): Unit = { @@ -165,9 +174,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) }) } - protected val namedCounters: ConcurrentHashMap[String, NamedCounter] = - JavaUtils.newConcurrentHashMap[String, NamedCounter]() - def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) def addCounter(name: String, labels: Map[String, String]): Unit = { @@ -197,6 +203,18 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedTimers.values().asScala.toList.map(_._1) } + def getAndClearTimerMetrics(): List[String] = { + timerMetrics.synchronized { + var timerMetricsSize = timerMetrics.size() + val timerMetricsList = ArrayBuffer[String]() + while (timerMetricsSize > 0) { + timerMetricsList.append(timerMetrics.poll()) + timerMetricsSize = timerMetricsSize - 1 + } + timerMetricsList.toList + } + } + def gaugeExists(name: String, labels: Map[String, String]): Boolean = { namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels)) } @@ -282,7 +300,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) case Some(t) => namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS) if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) { - recordTimer(namedTimer) + addTimerMetrics(namedTimer) } case None => } @@ -347,31 +365,22 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES) } - private def updateInnerMetrics(str: String): Unit = { - innerMetrics.synchronized { - if (innerMetrics.size() >= metricsCapacity) { - innerMetrics.remove() - } - innerMetrics.offer(str) - } - } - - def recordCounter(nc: NamedCounter): Unit = { + def getCounterMetrics(nc: NamedCounter): String = { val timestamp = System.currentTimeMillis val label = nc.labelString - updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n") + val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n" + str } - def recordGauge(ng: NamedGauge[_]): Unit = { + def getGaugeMetrics(ng: NamedGauge[_]): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = ng.labelString sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordMeter(nm: NamedMeter): Unit = { + def getMeterMetrics(nm: NamedMeter): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = nm.labelString @@ -383,11 +392,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n") sb.append( s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordHistogram(nh: NamedHistogram): Unit = { + def getHistogramMetrics(nh: NamedHistogram): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nh.histogram.getSnapshot @@ -409,11 +417,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordTimer(nt: NamedTimer): Unit = { + def getTimerMetrics(nt: NamedTimer): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nt.timer.getSnapshot @@ -435,32 +442,61 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") + sb.toString() + } - updateInnerMetrics(sb.toString()) + def getAllMetricsNum: Int = { + val sum = timerMetrics.size() + + namedTimers.size() + + namedMeters.size() + + namedGauges.size() + + namedCounters.size() + sum } override def getMetrics(): String = { - innerMetrics.synchronized { - counters().foreach(c => recordCounter(c)) - gauges().foreach(g => recordGauge(g)) - meters().foreach(m => recordMeter(m)) - histograms().foreach(h => { - recordHistogram(h) + var leftMetricsNum = metricsCapacity + val sb = new mutable.StringBuilder + leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb) + if (leftMetricsNum <= 0) { + logWarning( + s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") + } + sb.toString() + } + + private def fillInnerMetricsSnapshot( + metricList: List[AnyRef], + leftNum: Int, + sb: mutable.StringBuilder): Int = { + if (leftNum <= 0) { + return 0 + } + val addList = metricList.take(leftNum) + addList.foreach { + case c: NamedCounter => + sb.append(getCounterMetrics(c)) + case g: NamedGauge[_] => + sb.append(getGaugeMetrics(g)) + case m: NamedMeter => + sb.append(getMeterMetrics(m)) + case h: NamedHistogram => + sb.append(getHistogramMetrics(h)) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - timers().foreach(t => { - recordTimer(t) + case t: NamedTimer => + sb.append(getTimerMetrics(t)) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - val sb = new mutable.StringBuilder - while (!innerMetrics.isEmpty) { - sb.append(innerMetrics.poll()) - } - innerMetrics.clear() - sb.toString() + case s => + sb.append(s.toString) } + leftNum - addList.size } override def destroy(): Unit = { @@ -469,7 +505,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedGauges.clear() namedMeters.clear() namedTimers.clear() - innerMetrics.clear() + timerMetrics.clear() metricRegistry.removeMatching(new MetricFilter { override def matches(s: String, metric: Metric): Boolean = true })