From 7de3fc4a9b23b819be632c6cfea85a706cdafef3 Mon Sep 17 00:00:00 2001 From: "Hongbin Ma (Mahone)" Date: Fri, 6 Sep 2024 09:47:41 +0800 Subject: [PATCH] Add companion metrics for all nsTiming metrics without semaphore (#11331) * Add companion metrics for all nsTiming metrics Signed-off-by: Hongbin Ma (Mahone) * refine companion metrics Signed-off-by: Hongbin Ma (Mahone) --------- Signed-off-by: Hongbin Ma (Mahone) --- .../com/nvidia/spark/rapids/GpuExec.scala | 42 ++++++++++++++++--- .../spark/sql/rapids/GpuTaskMetrics.scala | 2 + 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala index 9b60cd1efca..17f2c35a8eb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala @@ -16,6 +16,8 @@ package com.nvidia.spark.rapids +import scala.collection.immutable.TreeMap + import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.filecache.FileCacheConf @@ -127,9 +129,18 @@ object GpuMetric extends Logging { case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}") } - def unwrap(input: Map[String, GpuMetric]): Map[String, SQLMetric] = input.collect { - // remove the metrics that are not registered - case (k, w) if w != NoopMetric => (k, unwrap(w)) + def unwrap(input: Map[String, GpuMetric]): Map[String, SQLMetric] = { + val ret = input.collect { + // remove the metrics that are not registered + case (k, w) if w != NoopMetric => (k, unwrap(w)) + } + val companions = input.collect { + // add the companions + case (k, w) if w.companionGpuMetric.isDefined => + (k + "_exSemWait", unwrap(w.companionGpuMetric.get)) + } + + TreeMap.apply((ret ++ companions).toSeq: _*) } def wrap(input: SQLMetric): GpuMetric = WrappedGpuMetric(input) @@ -161,9 +172,15 @@ sealed abstract class GpuMetric extends Serializable { private var isTimerActive = false + // For timing GpuMetrics, we additionally create a companion GpuMetric to track elapsed time + // excluding semaphore wait time + var companionGpuMetric: Option[GpuMetric] = None + private var semWaitTimeWhenActivated = 0L + final def tryActivateTimer(): Boolean = { if (!isTimerActive) { isTimerActive = true + semWaitTimeWhenActivated = GpuTaskMetrics.get.getSemWaitTime() true } else { false @@ -173,6 +190,9 @@ sealed abstract class GpuMetric extends Serializable { final def deactivateTimer(duration: Long): Unit = { if (isTimerActive) { isTimerActive = false + companionGpuMetric.foreach(c => + c.add(duration - (GpuTaskMetrics.get.getSemWaitTime() - semWaitTimeWhenActivated))) + semWaitTimeWhenActivated = 0L add(duration) } } @@ -198,7 +218,18 @@ object NoopMetric extends GpuMetric { override def value: Long = 0 } -final case class WrappedGpuMetric(sqlMetric: SQLMetric) extends GpuMetric { +final case class WrappedGpuMetric(sqlMetric: SQLMetric, withMetricsExclSemWait: Boolean = false) + extends GpuMetric { + + if (withMetricsExclSemWait) { + // SQLMetrics.NS_TIMING_METRIC and SQLMetrics.TIMING_METRIC is private, + // so we have to use the string directly + if (sqlMetric.metricType == "nsTiming") { + companionGpuMetric = Some(WrappedGpuMetric.apply(SQLMetrics.createNanoTimingMetric( + SparkSession.getActiveSession.get.sparkContext, sqlMetric.name.get + " (excl. SemWait)"))) + } + } + def +=(v: Long): Unit = sqlMetric.add(v) def add(v: Long): Unit = sqlMetric.add(v) override def set(v: Long): Unit = sqlMetric.set(v) @@ -279,7 +310,8 @@ trait GpuExec extends SparkPlan { private [this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = { if (level >= metricsConf) { - WrappedGpuMetric(f) + // only enable companion metrics (excluding semaphore wait time) for DEBUG_LEVEL + WrappedGpuMetric(f, withMetricsExclSemWait = GpuMetric.DEBUG_LEVEL >= metricsConf) } else { NoopMetric } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala index da4f385ddfc..c89e26f0a24 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuTaskMetrics.scala @@ -137,6 +137,8 @@ class GpuTaskMetrics extends Serializable { } } + def getSemWaitTime(): Long = semWaitTimeNs.value.value + def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f) def spillToHostTime[A](f: => A): A = {