From 6b27556c43ea1d83238dc95b508af308786a3298 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 29 Oct 2024 13:42:24 -0700 Subject: [PATCH] Add a new NVTX range for task GPU ownership (#11596) * Add nvtx range for task owning GPU Signed-off-by: Jihoon Son * review comments * Unused import --------- Signed-off-by: Jihoon Son --- .../nvidia/spark/rapids/GpuSemaphore.scala | 24 ++++++++++++++----- .../com/nvidia/spark/rapids/RapidsConf.scala | 7 ++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala index 78d05efb0c2..719c4525373 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSemaphore.scala @@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{NvtxColor, NvtxRange} +import ai.rapids.cudf.{NvtxColor, NvtxRange, NvtxUniqueRange} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import org.apache.spark.TaskContext @@ -162,7 +162,7 @@ object GpuSemaphore { * this is considered to be okay as there are other mechanisms in place, and it should be rather * rare. */ -private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { +private final class SemaphoreTaskInfo(val stageId: Int, val taskAttemptId: Long) extends Logging { /** * This holds threads that are not on the GPU yet. Most of the time they are * blocked waiting for the semaphore to let them on, but it may hold one @@ -179,6 +179,7 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { */ private val activeThreads = new util.LinkedHashSet[Thread]() private lazy val numPermits = GpuSemaphore.computeNumPermits(SQLConf.get) + private lazy val trackSemaphore = RapidsConf.TRACE_TASK_GPU_OWNERSHIP.get(SQLConf.get) /** * If this task holds the GPU semaphore or not. */ @@ -187,6 +188,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { type GpuBackingSemaphore = PrioritySemaphore[Long] + var nvtxRange: Option[NvtxUniqueRange] = None + /** * Does this task have the GPU semaphore or not. Be careful because it can change at * any point in time. So only use it for logging. @@ -258,6 +261,11 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { // We now own the semaphore so we need to wake up all of the other tasks that are // waiting. hasSemaphore = true + if (trackSemaphore) { + nvtxRange = + Some(new NvtxUniqueRange(s"Stage ${stageId} Task ${taskAttemptId} owning GPU", + NvtxColor.ORANGE)) + } moveToActive(t) notifyAll() done = true @@ -309,6 +317,8 @@ private final class SemaphoreTaskInfo(val taskAttemptId: Long) extends Logging { semaphore.release(numPermits) hasSemaphore = false lastHeld = System.currentTimeMillis() + nvtxRange.foreach(_.close()) + nvtxRange = None } // It should be impossible for the current thread to be blocked when releasing the semaphore // because no blocked thread should ever leave `blockUntilReady`, which is where we put it in @@ -324,7 +334,9 @@ private final class GpuSemaphore() extends Logging { type GpuBackingSemaphore = PrioritySemaphore[Long] private val semaphore = new GpuBackingSemaphore(MAX_PERMITS) - // Keep track of all tasks that are both active on the GPU and blocked waiting on the GPU + // A map of taskAttemptId => semaphoreTaskInfo. + // This map keeps track of all tasks that are both active on the GPU and blocked waiting + // on the GPU. private val tasks = new ConcurrentHashMap[Long, SemaphoreTaskInfo] def tryAcquire(context: TaskContext): TryAcquireResult = { @@ -333,7 +345,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo(taskAttemptId) + new SemaphoreTaskInfo(context.stageId(), taskAttemptId) }) if (taskInfo.tryAcquire(semaphore, taskAttemptId)) { GpuDeviceManager.initializeFromTask() @@ -357,7 +369,7 @@ private final class GpuSemaphore() extends Logging { val taskAttemptId = context.taskAttemptId() val taskInfo = tasks.computeIfAbsent(taskAttemptId, _ => { onTaskCompletion(context, completeTask) - new SemaphoreTaskInfo(taskAttemptId) + new SemaphoreTaskInfo(context.stageId(), taskAttemptId) }) taskInfo.blockUntilReady(semaphore) GpuDeviceManager.initializeFromTask() @@ -426,4 +438,4 @@ private final class GpuSemaphore() extends Logging { logDebug(s"shutting down with ${tasks.size} tasks still registered") } } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index ca906baffcc..a83ad716d34 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2388,6 +2388,13 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. .booleanConf .createWithDefault(true) + val TRACE_TASK_GPU_OWNERSHIP = conf("spark.rapids.sql.nvtx.traceTaskGpuOwnership") + .doc("Enable tracing of the GPU ownership of tasks. This can be useful for debugging " + + "deadlocks and other issues related to GPU semaphore.") + .internal() + .booleanConf + .createWithDefault(false) + private def printSectionHeader(category: String): Unit = println(s"\n### $category")