Skip to content

Commit

Permalink
Add endTs to StageSummary
Browse files Browse the repository at this point in the history
  • Loading branch information
kotlovs committed Oct 17, 2024
1 parent 111a7b4 commit 656198c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/src/main/scala/com/joom/spark/monitoring/Parts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ case class StageSummary(
shuffleWriteGB: Double,
peakExecutionMemoryGB: Double,
properties: Map[String, String],
endTs: Long,
)

case class ExecutorMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
stageState.filter { case (_, state) => !state.sent && state.completed }.foreach { case (stageFullId, _) =>
sendStageSummaryIfReady(stageFullId, force = true)
sendStageSummaryIfReady(stageFullId, Some(applicationEnd.time), force = true)
}

send("apps", ApplicationSummary(appStart.toEpochMilli, appId, appName, sparkConf.getAll.toMap,
Expand All @@ -154,27 +154,30 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
Option(stageSubmitted.properties.getProperty(prop)).map((prop, _))
}.toMap

stageState.getOrElseUpdate(stageFullId, StageState()).properties = properties
val startTime = stageSubmitted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now())
stageState.getOrElseUpdate(stageFullId, StageState(startTime)).properties = properties
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageFullId = StageFullId(stageCompleted.stageInfo.stageId, stageCompleted.stageInfo.attemptNumber())
val state = stageState.getOrElseUpdate(stageFullId, StageState())
val startTime = stageCompleted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now())
val state = stageState.getOrElseUpdate(stageFullId, StageState(startTime))
state.failureReason = stageCompleted.stageInfo.failureReason
state.completed = true
sendStageSummaryIfReady(stageFullId)
sendStageSummaryIfReady(stageFullId, stageCompleted.stageInfo.completionTime)
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageFullId = StageFullId(taskStart.stageId, taskStart.stageAttemptId)
stageState.getOrElseUpdate(stageFullId, StageState()).startedTaskCount += 1
val startTime = Instant.ofEpochMilli(taskStart.taskInfo.launchTime)
stageState.getOrElseUpdate(stageFullId, StageState(startTime)).startedTaskCount += 1
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageFullId = StageFullId(taskEnd.stageId, taskEnd.stageAttemptId)
tasksPerStage.getOrElseUpdate(stageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]())
.append((taskEnd.taskMetrics, taskEnd.reason))
sendStageSummaryIfReady(stageFullId)
sendStageSummaryIfReady(stageFullId, Some(taskEnd.taskInfo.finishTime))
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
Expand Down Expand Up @@ -224,7 +227,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
addedExecutorIds.add(executorId)
}

private def sendStageSummaryIfReady(stageFullId: StageFullId, force: Boolean = false): Unit = {
private def sendStageSummaryIfReady(stageFullId: StageFullId, endTs: Option[Long], force: Boolean = false): Unit = {
stageState.get(stageFullId).foreach { state =>
val tasks = tasksPerStage.getOrElse(stageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]())

Expand All @@ -233,7 +236,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String,
val failureReason = state.failureReason
val startTime = state.startTime
val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason,
startTime, tasks.toSeq, state.properties)
startTime, tasks.toSeq, state.properties, endTs)

implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make
send("stages", summary.get)
Expand Down Expand Up @@ -338,15 +341,18 @@ object StatsReportingSparkListener {
private def summarizeStage(appId: String, stageId: Int, attemptNumber: Int, succeeded: Boolean,
failureReason: Option[String], startTime: Instant,
rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)],
properties: Map[String, String]): Option[StageSummary] = {
properties: Map[String, String],
endTime: Option[Long]): Option[StageSummary] = {
val taskMetrics = rawTaskMetrics.map(_._1)
.filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances.
val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0)
val shuffleRemoteReadGb = taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead / GiB)
val failedTaskMetrics = rawTaskMetrics.filter(_._2 != Success).map(_._1)
val startTs = startTime.toEpochMilli
val endTs = endTime.getOrElse(Instant.now().toEpochMilli)

Some(StageSummary(
ts = startTime.toEpochMilli,
ts = startTs,
appId = appId,
stageId = stageId,
attemptNumber = attemptNumber,
Expand All @@ -372,6 +378,7 @@ object StatsReportingSparkListener {
shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB,
peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB,
properties = properties,
endTs = endTs,
))
}
}

0 comments on commit 656198c

Please sign in to comment.