From 3bf75a2b78962b0a83099d8b0767bef0dbbb989d Mon Sep 17 00:00:00 2001 From: zhengtao Date: Wed, 27 Nov 2024 12:16:34 +0800 Subject: [PATCH 1/9] clb1734 bug fix --- .../metrics/source/AbstractSource.scala | 163 ++++++++++-------- .../service/deploy/worker/WorkerSource.scala | 2 +- 2 files changed, 96 insertions(+), 69 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 56b7dd6d353..974988ca08e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -18,10 +18,12 @@ package org.apache.celeborn.common.metrics.source import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Random import com.codahale.metrics._ @@ -59,8 +61,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricsCapacity: Int = conf.metricsCapacity - val innerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() - val timerSupplier = new TimerSupplier(metricsSlidingWindowSize) val metricsCleaner: ScheduledExecutorService = @@ -79,11 +79,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" - protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = - JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() + val atomicSortNum = new AtomicInteger(0) + + val timerMetricsMap: ConcurrentHashMap[String, Int] = + JavaUtils.newConcurrentHashMap[String, Int]() + + protected val namedGauges: ConcurrentHashMap[String, (Int, NamedGauge[_])] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedGauge[_])]() + + protected val namedTimers + : ConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])]() + + protected val namedCounters: ConcurrentHashMap[String, (Int, NamedCounter)] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedCounter)]() - protected val namedMeters: ConcurrentHashMap[String, NamedMeter] = - JavaUtils.newConcurrentHashMap[String, NamedMeter]() + protected val namedMeters: ConcurrentHashMap[String, (Int, NamedMeter)] = + JavaUtils.newConcurrentHashMap[String, (Int, NamedMeter)]() + + def addTimerMetricsMap(namedTimer: NamedTimer): Unit = { + val timerMetrics = getTimerMetrics(namedTimer) + timerMetricsMap.putIfAbsent(timerMetrics, atomicSortNum.getAndIncrement()) + } def addGauge[T]( name: String, @@ -93,7 +110,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - NamedGauge(name, gauge, labels ++ staticLabels)) + (atomicSortNum.getAndIncrement(), NamedGauge(name, gauge, labels ++ staticLabels))) } else { logWarning( s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") @@ -124,7 +141,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) meter: Meter): Unit = { namedMeters.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - NamedMeter(name, meter, labels ++ staticLabels)) + (atomicSortNum.getAndIncrement(), NamedMeter(name, meter, labels ++ staticLabels))) } def addMeter( @@ -145,10 +162,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) addMeter(name, Map.empty[String, String], meter) } - protected val namedTimers - : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] = - JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]() - def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String]) def addTimer(name: String, labels: Map[String, String]): Unit = { @@ -161,32 +174,31 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricRegistry.timer(metricNameWithLabel, timerSupplier), labels ++ staticLabels) val values = JavaUtils.newConcurrentHashMap[String, Long]() - (namedTimer, values) + (atomicSortNum.getAndIncrement(), namedTimer, values) }) } - protected val namedCounters: ConcurrentHashMap[String, NamedCounter] = - JavaUtils.newConcurrentHashMap[String, NamedCounter]() - def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) def addCounter(name: String, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) namedCounters.putIfAbsent( metricNameWithLabel, - NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)) + ( + atomicSortNum.getAndIncrement(), + NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))) } def counters(): List[NamedCounter] = { - namedCounters.values().asScala.toList + namedCounters.values().asScala.toList.sortBy(_._1).map(_._2) } def gauges(): List[NamedGauge[_]] = { - namedGauges.values().asScala.toList + namedGauges.values().asScala.toList.sortBy(_._1).map(_._2) } def meters(): List[NamedMeter] = { - namedMeters.values().asScala.toList + namedMeters.values().asScala.toList.sortBy(_._1).map(_._2) } def histograms(): List[NamedHistogram] = { @@ -194,7 +206,11 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } def timers(): List[NamedTimer] = { - namedTimers.values().asScala.toList.map(_._1) + namedTimers.values().asScala.toList.sortBy(_._1).map(_._2) + } + + def timerMetrics(): List[String] = { + timerMetricsMap.asScala.toList.sortBy(_._2).map(_._1) } def gaugeExists(name: String, labels: Map[String, String]): Boolean = { @@ -267,7 +283,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) val pair = namedTimers.get(metricNameWithLabel) if (pair != null) { - pair._2.put(key, System.nanoTime()) + pair._3.put(key, System.nanoTime()) } else { logWarning(s"Metric $metricNameWithLabel not found!") } @@ -276,13 +292,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def doStopTimer(metricsName: String, key: String, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) try { - val (namedTimer, map) = namedTimers.get(metricNameWithLabel) + val (_, namedTimer, map) = namedTimers.get(metricNameWithLabel) val startTime = Option(map.remove(key)) startTime match { case Some(t) => namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS) if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) { - recordTimer(namedTimer) + addTimerMetricsMap(namedTimer) } case None => } @@ -301,7 +317,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (!namedTimers.containsKey(metricNameWithLabel)) { addTimer(metricsName, labels) } - val (namedTimer, _) = namedTimers.get(metricNameWithLabel) + val (_, namedTimer, _) = namedTimers.get(metricNameWithLabel) namedTimer.timer.update(value, TimeUnit.NANOSECONDS) } @@ -315,7 +331,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def incCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) - val counter = namedCounters.get(metricNameWithLabel) + val (_, counter) = namedCounters.get(metricNameWithLabel) if (counter != null) { counter.counter.inc(incV) } else { @@ -341,37 +357,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def startCleaner(): Unit = { val cleanTask: Runnable = new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - namedTimers.values.asScala.toArray.map(_._2).foreach(clearOldValues) + namedTimers.values.asScala.toArray.map(_._3).foreach(clearOldValues) } } metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES) } - private def updateInnerMetrics(str: String): Unit = { - innerMetrics.synchronized { - if (innerMetrics.size() >= metricsCapacity) { - innerMetrics.remove() - } - innerMetrics.offer(str) - } - } - - def recordCounter(nc: NamedCounter): Unit = { + def getCounterMetrics(nc: NamedCounter): String = { val timestamp = System.currentTimeMillis val label = nc.labelString - updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n") + val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} $timestamp\n" + str } - def recordGauge(ng: NamedGauge[_]): Unit = { + def getGaugeMetrics(ng: NamedGauge[_]): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = ng.labelString sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordMeter(nm: NamedMeter): Unit = { + def getMeterMetrics(nm: NamedMeter): String = { val timestamp = System.currentTimeMillis val sb = new StringBuilder val label = nm.labelString @@ -383,11 +390,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s"${normalizeKey(nm.name)}FiveMinuteRate$label ${nm.meter.getFiveMinuteRate} $timestamp\n") sb.append( s"${normalizeKey(nm.name)}FifteenMinuteRate$label ${nm.meter.getFifteenMinuteRate} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordHistogram(nh: NamedHistogram): Unit = { + def getHistogramMetrics(nh: NamedHistogram): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nh.histogram.getSnapshot @@ -409,11 +415,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } - def recordTimer(nt: NamedTimer): Unit = { + def getTimerMetrics(nt: NamedTimer): String = { val timestamp = System.currentTimeMillis val sb = new mutable.StringBuilder val snapshot = nt.timer.getSnapshot @@ -435,32 +440,53 @@ abstract class AbstractSource(conf: CelebornConf, role: String) s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n") sb.append(s"${prefix}999thPercentile$label" + s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n") - - updateInnerMetrics(sb.toString()) + sb.toString() } override def getMetrics(): String = { - innerMetrics.synchronized { - counters().foreach(c => recordCounter(c)) - gauges().foreach(g => recordGauge(g)) - meters().foreach(m => recordMeter(m)) - histograms().foreach(h => { - recordHistogram(h) + var leftMetricsNum = metricsCapacity + val metricsSnapshot = ArrayBuffer[String]() + leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot) + val sb = new mutable.StringBuilder + metricsSnapshot.foreach(metric => sb.append(metric)) + if (leftMetricsNum <= 0) { + logWarning("The number of metrics exceed the output metrics strings capacity!") + } + sb.toString() + } + + private def fillInnerMetricsSnapshot( + metricList: List[AnyRef], + leftNum: Int, + metricsSnapshot: ArrayBuffer[String]): Int = { + if (leftNum <= 0) { + return 0 + } + val addList = metricList.take(leftNum) + addList.foreach { + case c: NamedCounter => + metricsSnapshot += getCounterMetrics(c) + case g: NamedGauge[_] => + metricsSnapshot += getGaugeMetrics(g) + case m: NamedMeter => + metricsSnapshot += getMeterMetrics(m) + case h: NamedHistogram => + metricsSnapshot += getHistogramMetrics(h) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - timers().foreach(t => { - recordTimer(t) + case t: NamedTimer => + metricsSnapshot += getTimerMetrics(t) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() - }) - val sb = new mutable.StringBuilder - while (!innerMetrics.isEmpty) { - sb.append(innerMetrics.poll()) - } - innerMetrics.clear() - sb.toString() + case s => + metricsSnapshot += s.toString } + leftNum - addList.size } override def destroy(): Unit = { @@ -469,7 +495,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedGauges.clear() namedMeters.clear() namedTimers.clear() - innerMetrics.clear() + timerMetricsMap.clear() + atomicSortNum.set(0) metricRegistry.removeMatching(new MetricFilter { override def matches(s: String, metric: Metric): Boolean = true }) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 26532a6bf9a..33316dbcf1a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) def getCounterCount(metricsName: String): Long = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) - namedCounters.get(metricNameWithLabel).counter.getCount + namedCounters.get(metricNameWithLabel)._2.counter.getCount } def connectionActive(client: TransportClient): Unit = { From 41f183475cc9e23d461f676cf2de4ccef723e34d Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 28 Nov 2024 10:52:01 +0800 Subject: [PATCH 2/9] remove sort --- .../metrics/source/AbstractSource.scala | 57 +++++++++---------- .../service/deploy/worker/WorkerSource.scala | 2 +- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 974988ca08e..10c337d5666 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,9 +17,9 @@ package org.apache.celeborn.common.metrics.source +import java.{lang, util} import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -79,27 +79,25 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" - val atomicSortNum = new AtomicInteger(0) + val timerMetricsMap: ConcurrentHashMap.KeySetView[String, lang.Boolean] = + ConcurrentHashMap.newKeySet[String]() - val timerMetricsMap: ConcurrentHashMap[String, Int] = - JavaUtils.newConcurrentHashMap[String, Int]() - - protected val namedGauges: ConcurrentHashMap[String, (Int, NamedGauge[_])] = - JavaUtils.newConcurrentHashMap[String, (Int, NamedGauge[_])]() + protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = + JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() protected val namedTimers - : ConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])] = - JavaUtils.newConcurrentHashMap[String, (Int, NamedTimer, ConcurrentHashMap[String, Long])]() + : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])] = + JavaUtils.newConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, Long])]() - protected val namedCounters: ConcurrentHashMap[String, (Int, NamedCounter)] = - JavaUtils.newConcurrentHashMap[String, (Int, NamedCounter)]() + protected val namedCounters: ConcurrentHashMap[String, NamedCounter] = + JavaUtils.newConcurrentHashMap[String, NamedCounter]() - protected val namedMeters: ConcurrentHashMap[String, (Int, NamedMeter)] = - JavaUtils.newConcurrentHashMap[String, (Int, NamedMeter)]() + protected val namedMeters: ConcurrentHashMap[String, NamedMeter] = + JavaUtils.newConcurrentHashMap[String, NamedMeter]() def addTimerMetricsMap(namedTimer: NamedTimer): Unit = { val timerMetrics = getTimerMetrics(namedTimer) - timerMetricsMap.putIfAbsent(timerMetrics, atomicSortNum.getAndIncrement()) + timerMetricsMap.add(timerMetrics) } def addGauge[T]( @@ -110,7 +108,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (gauge.getValue.isInstanceOf[Number]) { namedGauges.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - (atomicSortNum.getAndIncrement(), NamedGauge(name, gauge, labels ++ staticLabels))) + NamedGauge(name, gauge, labels ++ staticLabels)) } else { logWarning( s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") @@ -141,7 +139,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) meter: Meter): Unit = { namedMeters.putIfAbsent( metricNameWithCustomizedLabels(name, labels), - (atomicSortNum.getAndIncrement(), NamedMeter(name, meter, labels ++ staticLabels))) + NamedMeter(name, meter, labels ++ staticLabels)) } def addMeter( @@ -174,7 +172,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricRegistry.timer(metricNameWithLabel, timerSupplier), labels ++ staticLabels) val values = JavaUtils.newConcurrentHashMap[String, Long]() - (atomicSortNum.getAndIncrement(), namedTimer, values) + (namedTimer, values) }) } @@ -184,21 +182,19 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) namedCounters.putIfAbsent( metricNameWithLabel, - ( - atomicSortNum.getAndIncrement(), - NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))) + NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)) } def counters(): List[NamedCounter] = { - namedCounters.values().asScala.toList.sortBy(_._1).map(_._2) + namedCounters.values().asScala.toList } def gauges(): List[NamedGauge[_]] = { - namedGauges.values().asScala.toList.sortBy(_._1).map(_._2) + namedGauges.values().asScala.toList } def meters(): List[NamedMeter] = { - namedMeters.values().asScala.toList.sortBy(_._1).map(_._2) + namedMeters.values().asScala.toList } def histograms(): List[NamedHistogram] = { @@ -206,11 +202,11 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } def timers(): List[NamedTimer] = { - namedTimers.values().asScala.toList.sortBy(_._1).map(_._2) + namedTimers.values().asScala.toList.map(_._1) } def timerMetrics(): List[String] = { - timerMetricsMap.asScala.toList.sortBy(_._2).map(_._1) + timerMetricsMap.asScala.toList } def gaugeExists(name: String, labels: Map[String, String]): Boolean = { @@ -283,7 +279,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) val pair = namedTimers.get(metricNameWithLabel) if (pair != null) { - pair._3.put(key, System.nanoTime()) + pair._2.put(key, System.nanoTime()) } else { logWarning(s"Metric $metricNameWithLabel not found!") } @@ -292,7 +288,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def doStopTimer(metricsName: String, key: String, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) try { - val (_, namedTimer, map) = namedTimers.get(metricNameWithLabel) + val (namedTimer, map) = namedTimers.get(metricNameWithLabel) val startTime = Option(map.remove(key)) startTime match { case Some(t) => @@ -317,7 +313,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) if (!namedTimers.containsKey(metricNameWithLabel)) { addTimer(metricsName, labels) } - val (_, namedTimer, _) = namedTimers.get(metricNameWithLabel) + val (namedTimer, _) = namedTimers.get(metricNameWithLabel) namedTimer.timer.update(value, TimeUnit.NANOSECONDS) } @@ -331,7 +327,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def incCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) - val (_, counter) = namedCounters.get(metricNameWithLabel) + val counter = namedCounters.get(metricNameWithLabel) if (counter != null) { counter.counter.inc(incV) } else { @@ -357,7 +353,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected def startCleaner(): Unit = { val cleanTask: Runnable = new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - namedTimers.values.asScala.toArray.map(_._3).foreach(clearOldValues) + namedTimers.values.asScala.toArray.map(_._2).foreach(clearOldValues) } } metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES) @@ -496,7 +492,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedMeters.clear() namedTimers.clear() timerMetricsMap.clear() - atomicSortNum.set(0) metricRegistry.removeMatching(new MetricFilter { override def matches(s: String, metric: Metric): Boolean = true }) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 33316dbcf1a..26532a6bf9a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -84,7 +84,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) def getCounterCount(metricsName: String): Long = { val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) - namedCounters.get(metricNameWithLabel)._2.counter.getCount + namedCounters.get(metricNameWithLabel).counter.getCount } def connectionActive(client: TransportClient): Unit = { From f5d21acf9e6e65af85fb350b9f7edb3453ae60fc Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 28 Nov 2024 10:56:40 +0800 Subject: [PATCH 3/9] remove useless sort --- .../apache/celeborn/common/metrics/source/AbstractSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 10c337d5666..6ac1bc8ca28 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.common.metrics.source -import java.{lang, util} +import java.lang import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} From 1696f163bd3c7f0e3d60ae4387171d0958616176 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Fri, 29 Nov 2024 09:40:04 +0800 Subject: [PATCH 4/9] fill log --- .../common/metrics/source/AbstractSource.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 6ac1bc8ca28..d4b38b432e9 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -439,6 +439,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String) sb.toString() } + def getAllMetricsNum: Int = { + val sum = timerMetricsMap.size() + + namedTimers.size() + + namedMeters.size() + + namedGauges.size() + + namedCounters.size() + sum + } + override def getMetrics(): String = { var leftMetricsNum = metricsCapacity val metricsSnapshot = ArrayBuffer[String]() @@ -451,7 +460,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val sb = new mutable.StringBuilder metricsSnapshot.foreach(metric => sb.append(metric)) if (leftMetricsNum <= 0) { - logWarning("The number of metrics exceed the output metrics strings capacity!") + logWarning(s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") } sb.toString() } From 52d7cd80289626b6d2bad776bd7ffc2a03a6413d Mon Sep 17 00:00:00 2001 From: zhengtao Date: Fri, 29 Nov 2024 09:57:26 +0800 Subject: [PATCH 5/9] format --- .../apache/celeborn/common/metrics/source/AbstractSource.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index d4b38b432e9..f507bb62ea9 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -460,7 +460,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val sb = new mutable.StringBuilder metricsSnapshot.foreach(metric => sb.append(metric)) if (leftMetricsNum <= 0) { - logWarning(s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") + logWarning( + s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") } sb.toString() } From b7021cb76220bee1c92fffa1b2f9a946f8f21660 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Mon, 2 Dec 2024 17:35:37 +0800 Subject: [PATCH 6/9] change ds --- .../metrics/source/AbstractSource.scala | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index f507bb62ea9..e94722c43ac 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -17,13 +17,11 @@ package org.apache.celeborn.common.metrics.source -import java.lang import java.util.{Map => JMap} -import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import scala.util.Random import com.codahale.metrics._ @@ -79,8 +77,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" - val timerMetricsMap: ConcurrentHashMap.KeySetView[String, lang.Boolean] = - ConcurrentHashMap.newKeySet[String]() + val timerMetricsMap: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() @@ -450,15 +447,14 @@ abstract class AbstractSource(conf: CelebornConf, role: String) override def getMetrics(): String = { var leftMetricsNum = metricsCapacity - val metricsSnapshot = ArrayBuffer[String]() - leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot) - leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot) val sb = new mutable.StringBuilder - metricsSnapshot.foreach(metric => sb.append(metric)) + leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb) + if (leftMetricsNum <= 0) { logWarning( s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") @@ -469,28 +465,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) private def fillInnerMetricsSnapshot( metricList: List[AnyRef], leftNum: Int, - metricsSnapshot: ArrayBuffer[String]): Int = { + sb: mutable.StringBuilder): Int = { if (leftNum <= 0) { return 0 } val addList = metricList.take(leftNum) addList.foreach { case c: NamedCounter => - metricsSnapshot += getCounterMetrics(c) + sb.append(getCounterMetrics(c)) case g: NamedGauge[_] => - metricsSnapshot += getGaugeMetrics(g) + sb.append(getGaugeMetrics(g)) case m: NamedMeter => - metricsSnapshot += getMeterMetrics(m) + sb.append(getMeterMetrics(m)) case h: NamedHistogram => - metricsSnapshot += getHistogramMetrics(h) + sb.append(getHistogramMetrics(h)) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case t: NamedTimer => - metricsSnapshot += getTimerMetrics(t) + sb.append(getTimerMetrics(t)) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case s => - metricsSnapshot += s.toString + sb.append(s.toString) } leftNum - addList.size } From 77bea73e398034e336b47fca0f917048d5089bc1 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 3 Dec 2024 10:33:49 +0800 Subject: [PATCH 7/9] clear timerMetrics after getMetrics --- .../metrics/source/AbstractSource.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e94722c43ac..6e1d3392a57 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -77,7 +77,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val applicationLabel = "applicationId" - val timerMetricsMap: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() + val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] = JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]() @@ -92,9 +92,9 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected val namedMeters: ConcurrentHashMap[String, NamedMeter] = JavaUtils.newConcurrentHashMap[String, NamedMeter]() - def addTimerMetricsMap(namedTimer: NamedTimer): Unit = { - val timerMetrics = getTimerMetrics(namedTimer) - timerMetricsMap.add(timerMetrics) + def addTimerMetrics(namedTimer: NamedTimer): Unit = { + val timerMetricsString = getTimerMetrics(namedTimer) + timerMetrics.add(timerMetricsString) } def addGauge[T]( @@ -202,8 +202,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedTimers.values().asScala.toList.map(_._1) } - def timerMetrics(): List[String] = { - timerMetricsMap.asScala.toList + def timerMetricsList(): List[String] = { + timerMetrics.asScala.toList } def gaugeExists(name: String, labels: Map[String, String]): Boolean = { @@ -291,7 +291,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) case Some(t) => namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS) if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) { - addTimerMetricsMap(namedTimer) + addTimerMetrics(namedTimer) } case None => } @@ -437,7 +437,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } def getAllMetricsNum: Int = { - val sum = timerMetricsMap.size() + + val sum = timerMetrics.size() + namedTimers.size() + namedMeters.size() + namedGauges.size() + @@ -448,17 +448,17 @@ abstract class AbstractSource(conf: CelebornConf, role: String) override def getMetrics(): String = { var leftMetricsNum = metricsCapacity val sb = new mutable.StringBuilder - leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(timerMetricsList(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb) - if (leftMetricsNum <= 0) { logWarning( s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") } + timerMetrics.clear() sb.toString() } @@ -497,7 +497,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedGauges.clear() namedMeters.clear() namedTimers.clear() - timerMetricsMap.clear() + timerMetrics.clear() metricRegistry.removeMatching(new MetricFilter { override def matches(s: String, metric: Metric): Boolean = true }) From 785699d915fc6b9243b486ce1cc44d278448cbed Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 3 Dec 2024 11:11:53 +0800 Subject: [PATCH 8/9] add clear lock --- .../common/metrics/source/AbstractSource.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 6e1d3392a57..777b8a2f863 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -202,8 +202,12 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedTimers.values().asScala.toList.map(_._1) } - def timerMetricsList(): List[String] = { - timerMetrics.asScala.toList + def getAndClearTimerMetrics(): List[String] = { + timerMetrics.synchronized { + val timerList = timerMetrics.asScala.toList + timerMetrics.clear() + timerList + } } def gaugeExists(name: String, labels: Map[String, String]): Boolean = { @@ -448,7 +452,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) override def getMetrics(): String = { var leftMetricsNum = metricsCapacity val sb = new mutable.StringBuilder - leftMetricsNum = fillInnerMetricsSnapshot(timerMetricsList(), leftMetricsNum, sb) + leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb) leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb) @@ -458,7 +462,6 @@ abstract class AbstractSource(conf: CelebornConf, role: String) logWarning( s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum") } - timerMetrics.clear() sb.toString() } From 2ee3531fe70d2997b5abf06d3c352041f2ba0e02 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Tue, 3 Dec 2024 14:25:44 +0800 Subject: [PATCH 9/9] use poll to replace clear --- .../common/metrics/source/AbstractSource.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 777b8a2f863..e9c4cfa3b0a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.util.Random import com.codahale.metrics._ @@ -204,9 +205,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def getAndClearTimerMetrics(): List[String] = { timerMetrics.synchronized { - val timerList = timerMetrics.asScala.toList - timerMetrics.clear() - timerList + var timerMetricsSize = timerMetrics.size() + val timerMetricsList = ArrayBuffer[String]() + while (timerMetricsSize > 0) { + timerMetricsList.append(timerMetrics.poll()) + timerMetricsSize = timerMetricsSize - 1 + } + timerMetricsList.toList } }