diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index dc01c65be84..d58d47450bd 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -18,10 +18,12 @@ package org.apache.celeborn.common.metrics.source import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Random import com.codahale.metrics._ @@ -59,8 +61,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricsCapacity: Int = conf.metricsCapacity - val innerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() - val timerSupplier = new TimerSupplier(metricsSlidingWindowSize) val metricsCleaner: ScheduledExecutorService = @@ -79,11 +79,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" - protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = - JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() + val atomicSortNum = new AtomicInteger(0) + + val timerMetricsMap: ConcurrentHashMap[String, Int] = + JavaUtils.newConcurrentHashMap[String, Int]() + + protected val namedGauges: ConcurrentHashMap[String, (Int, NamedGauge[_])] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedGauge[_])]() + + protected val namedTimers + : ConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])]() + + protected val namedCounters: ConcurrentHashMap[String, (Int, NamedCounter)] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedCounter)]() - protected val namedMeters: ConcurrentHashMap[String, NamedMeter] = - JavaUtils.newConcurrentHashMap[String, NamedMeter]() + protected val namedMeters: ConcurrentHashMap[String, (Int, NamedMeter)] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedMeter)]() + + def addTimerMetricsMap(namedTimer: NamedTimer): Unit = { + val timerMetrics = getTimerMetrics(namedTimer) + timerMetricsMap.putIfAbsent(timerMetrics, atomicSortNum.getAndIncrement()) + } def addGauge[T]( name: String, @@ -93,7 +110,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - NamedGauge(name, gauge, labels ++ staticLabels)) + (atomicSortNum.getAndIncrement(), NamedGauge(name, gauge, labels ++ staticLabels))) } else { logWarning( s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") @@ -124,7 +141,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) meter: Meter): Unit = { namedMeters.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - NamedMeter(name, meter, labels ++ staticLabels)) + (atomicSortNum.getAndIncrement(), NamedMeter(name, meter, labels ++ staticLabels))) } def addMeter( @@ -145,10 +162,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) addMeter(name, Map.empty[String, String], meter) } - protected val namedTimers - : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] = - JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]() - def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String]) def addTimer(name: String, labels: Map[String, String]): Unit = { @@ -161,32 +174,31 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricRegistry.timer(metricNameWithLabel, timerSupplier), labels ++ staticLabels) val values = JavaUtils.newConcurrentHashMap[String, Long]() - (namedTimer, values) + (atomicSortNum.getAndIncrement(), namedTimer, values) }) } - protected val namedCounters: ConcurrentHashMap[String, NamedCounter] = - JavaUtils.newConcurrentHashMap[String, NamedCounter]() - def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) def addCounter(name: String, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) namedCounters.putIfAbsent( metricNameWithLabel, - NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)) + ( + atomicSortNum.getAndIncrement(), + NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))) } def counters(): List[NamedCounter] = { - namedCounters.values().asScala.toList + namedCounters.values().asScala.toList.sortBy(_._1).map(_._2) } def gauges(): List[NamedGauge[_]] = { - namedGauges.values().asScala.toList + namedGauges.values().asScala.toList.sortBy(_._1).map(_._2) } def meters(): List[NamedMeter] = { - namedMeters.values().asScala.toList + namedMeters.values().asScala.toList.sortBy(_._1).map(_._2) } def histograms(): List[NamedHistogram] = { @@ -194,7 +206,11 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } def timers(): List[NamedTimer] = { - namedTimers.values().asScala.toList.map(_._1) + namedTimers.values().asScala.toList.sortBy(_._1).map(_._2) + } + + def timerMetrics(): List[String] = { + timerMetricsMap.asScala.toList.sortBy(_._2).map(_._1) } def gaugeExists(name: String, labels: Map[String, String]): Boolean = { @@ -267,7 +283,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) val pair = namedTimers.get(metricNameWithLabel) if (pair != null) { - pair._2.put(key, System.nanoTime()) + pair._3.put(key, System.nanoTime()) } else { logWarning(s"Metric $metricNameWithLabel not found!") } @@ -276,13 +292,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def doStopTimer(metricsName: String, key: String, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) try { - val (namedTimer, map) = namedTimers.get(metricNameWithLabel) + val (_, namedTimer, map) = namedTimers.get(metricNameWithLabel) val startTime = Option(map.remove(key)) startTime match { case Some(t) => namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS) if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) { - recordTimer(namedTimer) + addTimerMetricsMap(namedTimer) } case None => } @@ -298,7 +314,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def updateTimer(metricsName: String, value: Long, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) - val (namedTimer, _) = namedTimers.get(metricNameWithLabel) + val (_, namedTimer, _) = namedTimers.get(metricNameWithLabel) namedTimer.timer.update(value, TimeUnit.NANOSECONDS) } @@ -312,7 +328,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def incCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) - val counter = namedCounters.get(metricNameWithLabel) + val (_, counter) = namedCounters.get(metricNameWithLabel) if (counter != null) { counter.counter.inc(incV) } else { @@ -338,37 +354,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def startCleaner(): Unit = { val cleanTask: Runnable = new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - namedTimers.values.asScala.toArray.map(_._2).foreach(clearOldValues) + namedTimers.values.asScala.toArray.map(_._3).foreach(clearOldValues) } } metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES) } - private def updateInnerMetrics(str: String): Unit = { - innerMetrics.synchronized { - if (innerMetrics.size() >= metricsCapacity) { - innerMetrics.remove() - } - innerMetrics.offer(str) - } - } - - def recordCounter(nc: NamedCounter): Unit = { + def getCounterMetrics(nc: NamedCounter): String = { val timestamp = System.currentTimeMillis val label = nc.labelString - updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n") + val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n" + str } - def recordGauge(ng: NamedGauge[_]): Unit = { + def getGaugeMetrics(ng: NamedGauge[_]): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = ng.labelString sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordMeter(nm: NamedMeter): Unit = { + def getMeterMetrics(nm: NamedMeter): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = nm.labelString @@ -380,11 +387,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n") sb.append( s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordHistogram(nh: NamedHistogram): Unit = { + def getHistogramMetrics(nh: NamedHistogram): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nh.histogram.getSnapshot @@ -406,11 +412,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordTimer(nt: NamedTimer): Unit = { + def getTimerMetrics(nt: NamedTimer): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nt.timer.getSnapshot @@ -432,32 +437,53 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } override def getMetrics(): String = { - innerMetrics.synchronized { - counters().foreach(c => recordCounter(c)) - gauges().foreach(g => recordGauge(g)) - meters().foreach(m => recordMeter(m)) - histograms().foreach(h => { - recordHistogram(h) + var leftMetricsNum = metricsCapacity + val metricsSnapshot = ArrayBuffer[String]() + leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot) + val sb = new mutable.StringBuilder + metricsSnapshot.foreach(metric => sb.append(metric)) + if (leftMetricsNum <= 0) { + logWarning("The number of metrics exceed the output metrics strings capacity!") + } + sb.toString() + } + + private def fillInnerMetricsSnapshot( + metricList: List[AnyRef], + leftNum: Int, + metricsSnapshot: ArrayBuffer[String]): Int = { + if (leftNum <= 0) { + return 0 + } + val addList = metricList.take(leftNum) + addList.foreach { + case c: NamedCounter => + metricsSnapshot += getCounterMetrics(c) + case g: NamedGauge[_] => + metricsSnapshot += getGaugeMetrics(g) + case m: NamedMeter => + metricsSnapshot += getMeterMetrics(m) + case h: NamedHistogram => + metricsSnapshot += getHistogramMetrics(h) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - timers().foreach(t => { - recordTimer(t) + case t: NamedTimer => + metricsSnapshot += getTimerMetrics(t) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - val sb = new mutable.StringBuilder - while (!innerMetrics.isEmpty) { - sb.append(innerMetrics.poll()) - } - innerMetrics.clear() - sb.toString() + case s => + metricsSnapshot += s.toString } + leftNum - addList.size } override def destroy(): Unit = { @@ -466,7 +492,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedGauges.clear() namedMeters.clear() namedTimers.clear() - innerMetrics.clear() + timerMetricsMap.clear() + atomicSortNum.set(0) metricRegistry.removeMatching(new MetricFilter { override def matches(s: String, metric: Metric): Boolean = true }) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 26532a6bf9a..33316dbcf1a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) def getCounterCount(metricsName: String): Long = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) - namedCounters.get(metricNameWithLabel).counter.getCount + namedCounters.get(metricNameWithLabel)._2.counter.getCount } def connectionActive(client: TransportClient): Unit = {