Skip to content

Commit

Permalink
Report stage properties
Browse files Browse the repository at this point in the history
  • Loading branch information
kotlovs committed Dec 12, 2023
1 parent 8a3f61c commit f317a93
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/src/main/scala/com/joom/spark/monitoring/Parts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ case class StageSummary(
inputGB: Double,
shuffleWriteGB: Double,
peakExecutionMemoryGB: Double,
properties: Map[String, String],
)

case class ExecutorMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.collection.JavaConverters._

class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends SparkListener {
class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
stagePropertyNames: Set[String] = Set()) extends SparkListener {

def this(sparkConf: SparkConf) = {
this(sparkConf, {
Expand All @@ -45,7 +46,8 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends
var completed: Boolean = false,
var sent: Boolean = false,
var startedTaskCount: Int = 0,
var failureReason: Option[String] = None)
var failureReason: Option[String] = None,
var properties: Map[String, String] = Map())
private val tasksPerStage = mutable.Map[StageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]]()
private val stageState = mutable.Map[StageFullId, StageState]()
private val appStart: Instant = Instant.now()
Expand Down Expand Up @@ -148,7 +150,11 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
val stageFullId = StageFullId(stageSubmitted.stageInfo.stageId, stageSubmitted.stageInfo.attemptNumber())
stageState.getOrElseUpdate(stageFullId, StageState())
val properties = stagePropertyNames.flatMap { prop =>
Option(stageSubmitted.properties.getProperty(prop)).map((prop, _))
}.toMap

stageState.getOrElseUpdate(stageFullId, StageState()).properties = properties
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
Expand Down Expand Up @@ -227,7 +233,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends
val failureReason = state.failureReason
val startTime = state.startTime
val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason,
startTime, tasks.toSeq)
startTime, tasks.toSeq, state.properties)

implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make
send("stages", summary.get)
Expand All @@ -237,6 +243,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends
// needed for those. In long running processes, such as Zeppelin, these can consume a lot of
// memory. Clean them up.
tasksPerStage.remove(stageFullId)
stageState.remove(stageFullId)
}
}
}
Expand Down Expand Up @@ -330,7 +337,8 @@ object StatsReportingSparkListener {

private def summarizeStage(appId: String, stageId: Int, attemptNumber: Int, succeeded: Boolean,
failureReason: Option[String], startTime: Instant,
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)]): Option[StageSummary] = {
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)],
properties: Map[String, String]): Option[StageSummary] = {
val taskMetrics = rawTaskMetrics.map(_._1)
.filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances.
val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0)
Expand Down Expand Up @@ -363,6 +371,7 @@ object StatsReportingSparkListener {
inputGB = taskMetrics.map(_.inputMetrics.bytesRead).sum / GiB,
shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB,
peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB,
properties = properties,
))
}
}

0 comments on commit f317a93

Please sign in to comment.