Skip to content

Commit

Permalink
Add a new NVTX range for task GPU ownership (#11596)
Browse files Browse the repository at this point in the history
* Add nvtx range for task owning GPU

Signed-off-by: Jihoon Son <[email protected]>

* review comments

* Unused import

---------

Signed-off-by: Jihoon Son <[email protected]>
  • Loading branch information
jihoonson authored Oct 29, 2024
1 parent a6c4b34 commit 6b27556
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.{NvtxColor, NvtxRange, NvtxUniqueRange}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -162,7 +162,7 @@ object GpuSemaphore {
* this is considered to be okay as there are other mechanisms in place, and it should be rather
* rare.
*/
private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) extends Logging {
/**
* This holds threads that are not on the GPU yet. Most of the time they are
* blocked waiting for the semaphore to let them on, but it may hold one
Expand All @@ -179,6 +179,7 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
*/
private val activeThreads = new util.LinkedHashSet[Thread]()
private lazy val numPermits = GpuSemaphore.computeNumPermits(SQLConf.get)
private lazy val trackSemaphore = RapidsConf.TRACE_TASK_GPU_OWNERSHIP.get(SQLConf.get)
/**
* If this task holds the GPU semaphore or not.
*/
Expand All @@ -187,6 +188,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {

type GpuBackingSemaphore = PrioritySemaphore[Long]

var nvtxRange: Option[NvtxUniqueRange] = None

/**
* Does this task have the GPU semaphore or not. Be careful because it can change at
* any point in time. So only use it for logging.
Expand Down Expand Up @@ -258,6 +261,11 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
// We now own the semaphore so we need to wake up all of the other tasks that are
// waiting.
hasSemaphore = true
if (trackSemaphore) {
nvtxRange =
Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU",
NvtxColor.ORANGE))
}
moveToActive(t)
notifyAll()
done = true
Expand Down Expand Up @@ -309,6 +317,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging {
semaphore.release(numPermits)
hasSemaphore = false
lastHeld = System.currentTimeMillis()
nvtxRange.foreach(_.close())
nvtxRange = None
}
// It should be impossible for the current thread to be blocked when releasing the semaphore
// because no blocked thread should ever leave `blockUntilReady`, which is where we put it in
Expand All @@ -324,7 +334,9 @@ private final class GpuSemaphore() extends Logging {

type GpuBackingSemaphore = PrioritySemaphore[Long]
private val semaphore = new GpuBackingSemaphore(MAX_PERMITS)
// Keep track of all tasks that are both active on the GPU and blocked waiting on the GPU
// A map of taskAttemptId => semaphoreTaskInfo.
// This map keeps track of all tasks that are both active on the GPU and blocked waiting
// on the GPU.
private val tasks = new ConcurrentHashMap[Long, SemaphoreTaskInfo]

def tryAcquire(context: TaskContext): TryAcquireResult = {
Expand All @@ -333,7 +345,7 @@ private final class GpuSemaphore() extends Logging {
val taskAttemptId = context.taskAttemptId()
val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => {
onTaskCompletion(context, completeTask)
new SemaphoreTaskInfo(taskAttemptId)
new SemaphoreTaskInfo(context.stageId(), taskAttemptId)
})
if (taskInfo.tryAcquire(semaphore, taskAttemptId)) {
GpuDeviceManager.initializeFromTask()
Expand All @@ -357,7 +369,7 @@ private final class GpuSemaphore() extends Logging {
val taskAttemptId = context.taskAttemptId()
val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => {
onTaskCompletion(context, completeTask)
new SemaphoreTaskInfo(taskAttemptId)
new SemaphoreTaskInfo(context.stageId(), taskAttemptId)
})
taskInfo.blockUntilReady(semaphore)
GpuDeviceManager.initializeFromTask()
Expand Down Expand Up @@ -426,4 +438,4 @@ private final class GpuSemaphore() extends Logging {
logDebug(s"shutting down with ${tasks.size} tasks still registered")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2388,6 +2388,13 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(true)

val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.nvtx.traceTaskGpuOwnership")
.doc("Enable tracing of the GPU ownership of tasks. This can be useful for debugging " +
"deadlocks and other issues related to GPU semaphore.")
.internal()
.booleanConf
.createWithDefault(false)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down

0 comments on commit 6b27556

Please sign in to comment.