Skip to content

Commit

Permalink
Log Stage and Task ID to understand I/O bottlenecks.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Sep 13, 2023
1 parent 11df887 commit 0b476c0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.shuffle

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging

import java.io.{IOException, OutputStream}
Expand Down Expand Up @@ -52,8 +53,12 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
isOpen = false
super.close()

val tc = TaskContext.get()
val sId = tc.stageId()
val sAt = tc.stageAttemptNumber()
val t = timings / 1000000
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
logInfo(s"Statistics: Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.apache.spark.storage

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.helper.S3ShuffleDispatcher

Expand Down Expand Up @@ -152,6 +153,9 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]

private def printStatistics(): Unit = synchronized {
val totalRuntime = System.nanoTime() - startTime
val tc = TaskContext.get()
val sId = tc.stageId()
val sAt = tc.stageAttemptNumber()
try {
val tR = totalRuntime / 1000000
val wPer = 100 * timeWaiting / totalRuntime
Expand All @@ -169,7 +173,8 @@ class S3BufferedPrefetchIterator(iter: Iterator[(BlockId, S3ShuffleBlockStream)]
val bs = bR / r
// Threads
val ta = desiredActiveThreads.get()
logInfo(s"Statistics: ${bR} bytes, ${tW} ms waiting (${atW} avg), " +
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
s"${bR} bytes, ${tW} ms waiting (${atW} avg), " +
s"${tP} ms prefetching (avg: ${atP} ms - ${bs} block size - ${bW} MiB/s). " +
s"Total: ${tR} ms - ${wPer}% waiting. ${ta} active threads.")
} catch {
Expand Down

0 comments on commit 0b476c0

Please sign in to comment.