From f317a93097686f2caefe19181488594841741871 Mon Sep 17 00:00:00 2001 From: skotlov Date: Tue, 12 Dec 2023 14:01:17 +0000 Subject: [PATCH] Report stage properties --- .../com/joom/spark/monitoring/Parts.scala | 1 + .../StatsReportingSparkListener.scala | 19 ++++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala b/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala index 59340e2..977ba88 100644 --- a/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala +++ b/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala @@ -36,6 +36,7 @@ case class StageSummary( inputGB: Double, shuffleWriteGB: Double, peakExecutionMemoryGB: Double, + properties: Map[String, String], ) case class ExecutorMetric( diff --git a/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala b/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala index 3a0339d..e6cd3a3 100644 --- a/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala +++ b/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala @@ -18,7 +18,8 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.JavaConverters._ -class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends SparkListener { +class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String, + stagePropertyNames: Set[String] = Set()) extends SparkListener { def this(sparkConf: SparkConf) = { this(sparkConf, { @@ -45,7 +46,8 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends var completed: Boolean = false, var sent: Boolean = false, var startedTaskCount: Int = 0, - var failureReason: Option[String] = None) + var failureReason: Option[String] = None, + var properties: Map[String, String] = Map()) private val tasksPerStage = mutable.Map[StageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]]() private val stageState = mutable.Map[StageFullId, StageState]() private val appStart: Instant = Instant.now() @@ -148,7 +150,11 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val stageFullId = StageFullId(stageSubmitted.stageInfo.stageId, stageSubmitted.stageInfo.attemptNumber()) - stageState.getOrElseUpdate(stageFullId, StageState()) + val properties = stagePropertyNames.flatMap { prop => + Option(stageSubmitted.properties.getProperty(prop)).map((prop, _)) + }.toMap + + stageState.getOrElseUpdate(stageFullId, StageState()).properties = properties } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { @@ -227,7 +233,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends val failureReason = state.failureReason val startTime = state.startTime val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason, - startTime, tasks.toSeq) + startTime, tasks.toSeq, state.properties) implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make send("stages", summary.get) @@ -237,6 +243,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String) extends // needed for those. In long running processes, such as Zeppelin, these can consume a lot of // memory. Clean them up. tasksPerStage.remove(stageFullId) + stageState.remove(stageFullId) } } } @@ -330,7 +337,8 @@ object StatsReportingSparkListener { private def summarizeStage(appId: String, stageId: Int, attemptNumber: Int, succeeded: Boolean, failureReason: Option[String], startTime: Instant, - rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)]): Option[StageSummary] = { + rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)], + properties: Map[String, String]): Option[StageSummary] = { val taskMetrics = rawTaskMetrics.map(_._1) .filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances. val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0) @@ -363,6 +371,7 @@ object StatsReportingSparkListener { inputGB = taskMetrics.map(_.inputMetrics.bytesRead).sum / GiB, shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB, peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB, + properties = properties, )) } }