diff --git a/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala b/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala index 977ba88..09eaef6 100644 --- a/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala +++ b/lib/src/main/scala/com/joom/spark/monitoring/Parts.scala @@ -37,6 +37,7 @@ case class StageSummary( shuffleWriteGB: Double, peakExecutionMemoryGB: Double, properties: Map[String, String], + endTs: Long, ) case class ExecutorMetric( diff --git a/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala b/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala index e6cd3a3..ee41362 100644 --- a/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala +++ b/lib/src/main/scala/com/joom/spark/monitoring/StatsReportingSparkListener.scala @@ -134,7 +134,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String, override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { stageState.filter { case (_, state) => !state.sent && state.completed }.foreach { case (stageFullId, _) => - sendStageSummaryIfReady(stageFullId, force = true) + sendStageSummaryIfReady(stageFullId, Some(applicationEnd.time), force = true) } send("apps", ApplicationSummary(appStart.toEpochMilli, appId, appName, sparkConf.getAll.toMap, @@ -154,27 +154,30 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String, Option(stageSubmitted.properties.getProperty(prop)).map((prop, _)) }.toMap - stageState.getOrElseUpdate(stageFullId, StageState()).properties = properties + val startTime = stageSubmitted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now()) + stageState.getOrElseUpdate(stageFullId, StageState(startTime)).properties = properties } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageFullId = StageFullId(stageCompleted.stageInfo.stageId, stageCompleted.stageInfo.attemptNumber()) - val state = stageState.getOrElseUpdate(stageFullId, StageState()) + val startTime = stageCompleted.stageInfo.submissionTime.map(Instant.ofEpochMilli).getOrElse(Instant.now()) + val state = stageState.getOrElseUpdate(stageFullId, StageState(startTime)) state.failureReason = stageCompleted.stageInfo.failureReason state.completed = true - sendStageSummaryIfReady(stageFullId) + sendStageSummaryIfReady(stageFullId, stageCompleted.stageInfo.completionTime) } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { val stageFullId = StageFullId(taskStart.stageId, taskStart.stageAttemptId) - stageState.getOrElseUpdate(stageFullId, StageState()).startedTaskCount += 1 + val startTime = Instant.ofEpochMilli(taskStart.taskInfo.launchTime) + stageState.getOrElseUpdate(stageFullId, StageState(startTime)).startedTaskCount += 1 } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val stageFullId = StageFullId(taskEnd.stageId, taskEnd.stageAttemptId) tasksPerStage.getOrElseUpdate(stageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]()) .append((taskEnd.taskMetrics, taskEnd.reason)) - sendStageSummaryIfReady(stageFullId) + sendStageSummaryIfReady(stageFullId, Some(taskEnd.taskInfo.finishTime)) } override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { @@ -224,7 +227,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String, addedExecutorIds.add(executorId) } - private def sendStageSummaryIfReady(stageFullId: StageFullId, force: Boolean = false): Unit = { + private def sendStageSummaryIfReady(stageFullId: StageFullId, endTs: Option[Long], force: Boolean = false): Unit = { stageState.get(stageFullId).foreach { state => val tasks = tasksPerStage.getOrElse(stageFullId, mutable.ArrayBuffer[(TaskMetrics, TaskEndReason)]()) @@ -233,7 +236,7 @@ class StatsReportingSparkListener(sparkConf: SparkConf, apiKey: String, val failureReason = state.failureReason val startTime = state.startTime val summary = summarizeStage(appId, stageFullId.stageId, stageFullId.attemptNumber, success, failureReason, - startTime, tasks.toSeq, state.properties) + startTime, tasks.toSeq, state.properties, endTs) implicit val codec: JsonValueCodec[StageSummary] = JsonCodecMaker.make send("stages", summary.get) @@ -338,15 +341,18 @@ object StatsReportingSparkListener { private def summarizeStage(appId: String, stageId: Int, attemptNumber: Int, succeeded: Boolean, failureReason: Option[String], startTime: Instant, rawTaskMetrics: Seq[(TaskMetrics, TaskEndReason)], - properties: Map[String, String]): Option[StageSummary] = { + properties: Map[String, String], + endTime: Option[Long]): Option[StageSummary] = { val taskMetrics = rawTaskMetrics.map(_._1) .filter(_ != null) // For failed tasks, there will be 'null' TaskMetrics instances. val runTimes = taskMetrics.map(_.executorRunTime.toDouble / 1000.0) val shuffleRemoteReadGb = taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead / GiB) val failedTaskMetrics = rawTaskMetrics.filter(_._2 != Success).map(_._1) + val startTs = startTime.toEpochMilli + val endTs = endTime.getOrElse(Instant.now().toEpochMilli) Some(StageSummary( - ts = startTime.toEpochMilli, + ts = startTs, appId = appId, stageId = stageId, attemptNumber = attemptNumber, @@ -372,6 +378,7 @@ object StatsReportingSparkListener { shuffleWriteGB = taskMetrics.map(_.shuffleWriteMetrics.bytesWritten).sum / GiB, peakExecutionMemoryGB = taskMetrics.map(_.peakExecutionMemory).sum / GiB, properties = properties, + endTs = endTs, )) } }