Skip to content

Commit

Permalink
Add companion metrics for all nsTiming metrics without semaphore (#11331
Browse files Browse the repository at this point in the history
)

* Add companion metrics for all nsTiming metrics

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

* refine companion metrics

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>

---------

Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone authored Sep 6, 2024
1 parent 77615e6 commit 7de3fc4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
42 changes: 37 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids

import scala.collection.immutable.TreeMap

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.filecache.FileCacheConf
Expand Down Expand Up @@ -127,9 +129,18 @@ object GpuMetric extends Logging {
case i => throw new IllegalArgumentException(s"found unsupported GpuMetric ${i.getClass}")
}

def unwrap(input: Map[String, GpuMetric]): Map[String, SQLMetric] = input.collect {
// remove the metrics that are not registered
case (k, w) if w != NoopMetric => (k, unwrap(w))
def unwrap(input: Map[String, GpuMetric]): Map[String, SQLMetric] = {
val ret = input.collect {
// remove the metrics that are not registered
case (k, w) if w != NoopMetric => (k, unwrap(w))
}
val companions = input.collect {
// add the companions
case (k, w) if w.companionGpuMetric.isDefined =>
(k + "_exSemWait", unwrap(w.companionGpuMetric.get))
}

TreeMap.apply((ret ++ companions).toSeq: _*)
}

def wrap(input: SQLMetric): GpuMetric = WrappedGpuMetric(input)
Expand Down Expand Up @@ -161,9 +172,15 @@ sealed abstract class GpuMetric extends Serializable {

private var isTimerActive = false

// For timing GpuMetrics, we additionally create a companion GpuMetric to track elapsed time
// excluding semaphore wait time
var companionGpuMetric: Option[GpuMetric] = None
private var semWaitTimeWhenActivated = 0L

final def tryActivateTimer(): Boolean = {
if (!isTimerActive) {
isTimerActive = true
semWaitTimeWhenActivated = GpuTaskMetrics.get.getSemWaitTime()
true
} else {
false
Expand All @@ -173,6 +190,9 @@ sealed abstract class GpuMetric extends Serializable {
final def deactivateTimer(duration: Long): Unit = {
if (isTimerActive) {
isTimerActive = false
companionGpuMetric.foreach(c =>
c.add(duration - (GpuTaskMetrics.get.getSemWaitTime() - semWaitTimeWhenActivated)))
semWaitTimeWhenActivated = 0L
add(duration)
}
}
Expand All @@ -198,7 +218,18 @@ object NoopMetric extends GpuMetric {
override def value: Long = 0
}

final case class WrappedGpuMetric(sqlMetric: SQLMetric) extends GpuMetric {
final case class WrappedGpuMetric(sqlMetric: SQLMetric, withMetricsExclSemWait: Boolean = false)
extends GpuMetric {

if (withMetricsExclSemWait) {
// SQLMetrics.NS_TIMING_METRIC and SQLMetrics.TIMING_METRIC is private,
// so we have to use the string directly
if (sqlMetric.metricType == "nsTiming") {
companionGpuMetric = Some(WrappedGpuMetric.apply(SQLMetrics.createNanoTimingMetric(
SparkSession.getActiveSession.get.sparkContext, sqlMetric.name.get + " (excl. SemWait)")))
}
}

def +=(v: Long): Unit = sqlMetric.add(v)
def add(v: Long): Unit = sqlMetric.add(v)
override def set(v: Long): Unit = sqlMetric.set(v)
Expand Down Expand Up @@ -279,7 +310,8 @@ trait GpuExec extends SparkPlan {

private [this] def createMetricInternal(level: MetricsLevel, f: => SQLMetric): GpuMetric = {
if (level >= metricsConf) {
WrappedGpuMetric(f)
// only enable companion metrics (excluding semaphore wait time) for DEBUG_LEVEL
WrappedGpuMetric(f, withMetricsExclSemWait = GpuMetric.DEBUG_LEVEL >= metricsConf)
} else {
NoopMetric
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class GpuTaskMetrics extends Serializable {
}
}

def getSemWaitTime(): Long = semWaitTimeNs.value.value

def semWaitTime[A](f: => A): A = timeIt(semWaitTimeNs, "Acquire GPU", NvtxColor.RED, f)

def spillToHostTime[A](f: => A): A = {
Expand Down

0 comments on commit 7de3fc4

Please sign in to comment.