Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistent shuffle write time sum results in Profiler output #1450

Merged
merged 8 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perStageRec.durationMax,
perStageRec.durationMin,
perStageRec.durationAvg,
perStageRec.executorCPUTimeSum,
perStageRec.executorDeserializeCpuTimeSum,
perStageRec.executorCPUTimeSum, // converted to milliseconds by the aggregator
perStageRec.executorDeserializeCpuTimeSum, // converted to milliseconds by the aggregator
perStageRec.executorDeserializeTimeSum,
perStageRec.executorRunTimeSum,
perStageRec.inputBytesReadSum,
Expand All @@ -448,7 +448,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
perStageRec.srTotalBytesReadSum,
perStageRec.swBytesWrittenSum,
perStageRec.swRecordsWrittenSum,
perStageRec.swWriteTimeSum)
perStageRec.swWriteTimeSum) // converted to milliseconds by the aggregator
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.tool.analysis.util

import java.util.concurrent.TimeUnit

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

import org.apache.spark.sql.rapids.tool.store.TaskModel
Expand All @@ -42,6 +44,12 @@ class AggAccumHelper {
val resRec = createStageAccumRecord()
taskRecords.foreach(resRec.addRecord)
resRec.finalizeAggregation()
// convert the nanoseconds units to milliseconds for stage level.
// This helps to avoid overflow when aggregating across multiple stages on the level of SQL/Job.
resRec.executorCPUTimeSum = TimeUnit.NANOSECONDS.toMillis(resRec.executorCPUTimeSum)
resRec.executorDeserializeCpuTimeSum =
TimeUnit.NANOSECONDS.toMillis(resRec.executorDeserializeCpuTimeSum)
resRec.swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(resRec.swWriteTimeSum)
resRec
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.nvidia.spark.rapids.tool.analysis.util

import java.util.concurrent.TimeUnit

import com.nvidia.spark.rapids.tool.profiling.StageAggTaskMetricsProfileResult

/**
Expand All @@ -43,7 +41,7 @@ case class StageAggPhoton(
// Re-calculate the photon specific fields only if the accumulator has tasks.
// Otherwise, leave it as 0.
if (shuffleWriteValues.nonEmpty) {
swWriteTimeSum = TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum)
swWriteTimeSum = shuffleWriteValues.sum
}
if (peakMemValues.nonEmpty) {
peakExecutionMemoryMax = peakMemValues.max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ object GenerateTimeline {
tc.sr_fetchWaitTime
val opTimeMs = opMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000
val writeTimeMs = writeMetrics.flatMap(_.taskUpdatesMap.get(taskId)).sum / 1000000 +
tc.sw_writeTime
tc.sw_writeTime / 1000000
val taskInfo = new TimelineTaskInfo(stageId, taskId, launchTime, finishTime, duration,
tc.executorDeserializeTime, readTimeMs, semTimeMs, opTimeMs, writeTimeMs)
val execHost = s"$execId/$host"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ trait BaseJobStageAggTaskMetricsProfileResult extends ProfileResult {
def durationMax: Long
def durationMin: Long
def durationAvg: Double
def executorCPUTimeSum: Long
def executorDeserializeCpuTimeSum: Long
def executorCPUTimeSum: Long // milliseconds
def executorDeserializeCpuTimeSum: Long // milliseconds
def executorDeserializeTimeSum: Long
def executorRunTimeSum: Long
def inputBytesReadSum: Long
Expand All @@ -443,20 +443,44 @@ trait BaseJobStageAggTaskMetricsProfileResult extends ProfileResult {
def srTotalBytesReadSum: Long
def swBytesWrittenSum: Long
def swRecordsWrittenSum: Long
def swWriteTimeSum: Long
def swWriteTimeSum: Long // milliseconds
amahussein marked this conversation as resolved.
Show resolved Hide resolved

def idHeader: String

override val outputHeaders = Seq("appIndex", idHeader, "numTasks", "Duration",
"diskBytesSpilled_sum", "duration_sum", "duration_max", "duration_min",
"duration_avg", "executorCPUTime_sum", "executorDeserializeCPUTime_sum",
"executorDeserializeTime_sum", "executorRunTime_sum", "input_bytesRead_sum",
"input_recordsRead_sum", "jvmGCTime_sum", "memoryBytesSpilled_sum",
"output_bytesWritten_sum", "output_recordsWritten_sum", "peakExecutionMemory_max",
"resultSerializationTime_sum", "resultSize_max", "sr_fetchWaitTime_sum",
"sr_localBlocksFetched_sum", "sr_localBytesRead_sum", "sr_remoteBlocksFetched_sum",
"sr_remoteBytesRead_sum", "sr_remoteBytesReadToDisk_sum", "sr_totalBytesRead_sum",
"sw_bytesWritten_sum", "sw_recordsWritten_sum", "sw_writeTime_sum")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

override val outputHeaders = {
Seq("appIndex",
idHeader,
"numTasks",
"Duration",
"diskBytesSpilled_sum",
"duration_sum",
"duration_max",
"duration_min",
"duration_avg",
"executorCPUTime_sum",
"executorDeserializeCPUTime_sum",
"executorDeserializeTime_sum",
"executorRunTime_sum",
"input_bytesRead_sum",
"input_recordsRead_sum",
"jvmGCTime_sum",
"memoryBytesSpilled_sum",
"output_bytesWritten_sum",
"output_recordsWritten_sum",
"peakExecutionMemory_max",
"resultSerializationTime_sum",
"resultSize_max",
"sr_fetchWaitTime_sum",
"sr_localBlocksFetched_sum",
"sr_localBytesRead_sum",
"sr_remoteBlocksFetched_sum",
"sr_remoteBytesRead_sum",
"sr_remoteBytesReadToDisk_sum",
"sr_totalBytesRead_sum",
"sw_bytesWritten_sum",
"sw_recordsWritten_sum",
"sw_writeTime_sum")
}

val durStr = duration match {
case Some(dur) => dur.toString
Expand Down Expand Up @@ -511,8 +535,8 @@ case class JobAggTaskMetricsProfileResult(
durationMax: Long,
durationMin: Long,
durationAvg: Double,
executorCPUTimeSum: Long,
executorDeserializeCpuTimeSum: Long,
executorCPUTimeSum: Long, // milliseconds
executorDeserializeCpuTimeSum: Long, // milliseconds
executorDeserializeTimeSum: Long,
executorRunTimeSum: Long,
inputBytesReadSum: Long,
Expand All @@ -533,7 +557,8 @@ case class JobAggTaskMetricsProfileResult(
srTotalBytesReadSum: Long,
swBytesWrittenSum: Long,
swRecordsWrittenSum: Long,
swWriteTimeSum: Long) extends BaseJobStageAggTaskMetricsProfileResult {
swWriteTimeSum: Long // milliseconds
) extends BaseJobStageAggTaskMetricsProfileResult {
override def idHeader = "jobId"
}

Expand All @@ -547,8 +572,8 @@ case class StageAggTaskMetricsProfileResult(
durationMax: Long,
durationMin: Long,
durationAvg: Double,
executorCPUTimeSum: Long,
executorDeserializeCpuTimeSum: Long,
executorCPUTimeSum: Long, // milliseconds
executorDeserializeCpuTimeSum: Long, // milliseconds
executorDeserializeTimeSum: Long,
executorRunTimeSum: Long,
inputBytesReadSum: Long,
Expand All @@ -569,7 +594,8 @@ case class StageAggTaskMetricsProfileResult(
srTotalBytesReadSum: Long,
swBytesWrittenSum: Long,
swRecordsWrittenSum: Long,
swWriteTimeSum: Long) extends BaseJobStageAggTaskMetricsProfileResult {
swWriteTimeSum: Long // milliseconds
) extends BaseJobStageAggTaskMetricsProfileResult {
override def idHeader = "stageId"
}

Expand Down Expand Up @@ -747,16 +773,16 @@ case class SQLTaskAggMetricsProfileResult(
description: String,
numTasks: Int,
duration: Option[Long],
executorCpuTime: Long,
executorCpuTime: Long, // milliseconds
executorRunTime: Long,
executorCpuRatio: Double,
diskBytesSpilledSum: Long,
durationSum: Long,
durationMax: Long,
durationMin: Long,
durationAvg: Double,
executorCPUTimeSum: Long,
executorDeserializeCpuTimeSum: Long,
executorCPUTimeSum: Long, // milliseconds
executorDeserializeCpuTimeSum: Long, // milliseconds
executorDeserializeTimeSum: Long,
executorRunTimeSum: Long,
inputBytesReadSum: Long,
Expand All @@ -779,7 +805,8 @@ case class SQLTaskAggMetricsProfileResult(
srTotalBytesReadSum: Long,
swBytesWrittenSum: Long,
swRecordsWrittenSum: Long,
swWriteTimeSum: Long) extends ProfileResult {
swWriteTimeSum: Long // milliseconds
) extends ProfileResult {

override val outputHeaders = Seq("appIndex", "appID", "sqlID", "description", "numTasks",
"Duration", "executorCPUTime", "executorRunTime", "executorCPURatio",
Expand Down Expand Up @@ -924,12 +951,27 @@ case class IOAnalysisProfileResult(
}
}

case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String,
rootsqlID: Option[Long], sqlID: Long, duration: Option[Long], containsDataset: Boolean,
appDuration: Option[Long], potentialProbs: String,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

case class SQLDurationExecutorTimeProfileResult(
appIndex: Int,
appId: String,
rootsqlID: Option[Long],
sqlID: Long,
duration: Option[Long],
containsDataset: Boolean,
appDuration: Option[Long],
potentialProbs: String,
executorCpuRatio: Double) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "App ID", "RootSqlID", "sqlID", "SQL Duration",
"Contains Dataset or RDD Op", "App Duration", "Potential Problems", "Executor CPU Time Percent")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

override val outputHeaders = {
Seq("appIndex",
"App ID",
"RootSqlID",
"sqlID",
"SQL Duration",
"Contains Dataset or RDD Op",
"App Duration",
"Potential Problems",
"Executor CPU Time Percent")
}
val durStr = duration match {
case Some(dur) => dur.toString
case None => ""
Expand All @@ -950,14 +992,27 @@ case class SQLDurationExecutorTimeProfileResult(appIndex: Int, appId: String,
}

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, rootsqlID.getOrElse("").toString, appId, sqlID.toString, durStr,
containsDataset.toString, appDurStr, potentialStr, execCpuTimePercent)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

Seq(appIndex.toString,
rootsqlID.getOrElse("").toString,
appId,
sqlID.toString,
durStr,
containsDataset.toString,
appDurStr,
potentialStr,
execCpuTimePercent)
}

override def convertToCSVSeq: Seq[String] = {
Seq(appIndex.toString, StringUtils.reformatCSVString(appId), rootsqlID.getOrElse("").toString,
sqlID.toString, durStr, containsDataset.toString, appDurStr,
StringUtils.reformatCSVString(potentialStr), execCpuTimePercent)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated format only for better readability.

Seq(appIndex.toString,
StringUtils.reformatCSVString(appId),
rootsqlID.getOrElse("").toString,
sqlID.toString,
durStr,
containsDataset.toString,
appDurStr,
StringUtils.reformatCSVString(potentialStr),
execCpuTimePercent)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,33 @@ object ToolUtils extends Logging {
df.showString(numRows, 0)
}

// given to duration values, calculate a human readable percent
// rounded to 2 decimal places. ie 39.12%
def calculateDurationPercent(first: Long, total: Long): Double = {
val firstDec = BigDecimal.decimal(first)
val totalDec = BigDecimal.decimal(total)
if (firstDec == 0 || totalDec == 0) {
/**
* Calculate the duration percent given the numerator and total values.
* This is used to calculate the CPURatio which represents the percentage of CPU time to
* the runTime.
* There is an implicit check to ensure that the denominator is not zero. If it is, then the
* ratio will be set to 0.
* There is an option to force the cap to 100% if the calculated value is greater
* than the total. This is possible to happen because the tasks CPUTime is measured in
* nanoseconds, while the runtTime is measured in milliseconds. This leads to a loss of precision
* causing the total percentage to exceed 100%.
* @param numerator the numerator value.
* @param total the total value.
* @param forceCap if true, then the value is capped at 100%.
* @return the calculated percentage.
*/
def calculateDurationPercent(numerator: Long, total: Long, forceCap: Boolean = true): Double = {
if (numerator == 0 || total == 0) {
0.toDouble
} else {
val res = (firstDec / totalDec) * 100
formatDoubleValue(res, 2)
val numeratorDec = BigDecimal.decimal(numerator)
val totalDec = BigDecimal.decimal(total)
val res = formatDoubleValue((numeratorDec / totalDec) * 100, 2)
if (forceCap) {
math.min(res, 100)
} else {
res
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.rapids.tool.store

import java.util.concurrent.TimeUnit

import org.apache.spark.TaskFailedReason
import org.apache.spark.scheduler.SparkListenerTaskEnd
import org.apache.spark.sql.rapids.tool.annotation.Since
Expand All @@ -40,9 +38,9 @@ case class TaskModel(
speculative: Boolean,
gettingResultTime: Long,
executorDeserializeTime: Long,
executorDeserializeCPUTime: Long,
executorRunTime: Long,
executorCPUTime: Long,
executorDeserializeCPUTime: Long, // nanoseconds
executorRunTime: Long, // milliseconds
executorCPUTime: Long, // nanoseconds
amahussein marked this conversation as resolved.
Show resolved Hide resolved
peakExecutionMemory: Long,
resultSize: Long,
jvmGCTime: Long,
Expand All @@ -59,7 +57,7 @@ case class TaskModel(
sr_totalBytesRead: Long,
// Note: sw stands for ShuffleWrite
sw_bytesWritten: Long,
sw_writeTime: Long,
sw_writeTime: Long, // nanoseconds
sw_recordsWritten: Long,
input_bytesRead: Long,
input_recordsRead: Long,
Expand Down Expand Up @@ -92,9 +90,9 @@ object TaskModel {
event.taskInfo.speculative,
event.taskInfo.gettingResultTime,
event.taskMetrics.executorDeserializeTime,
TimeUnit.NANOSECONDS.toMillis(event.taskMetrics.executorDeserializeCpuTime),
event.taskMetrics.executorRunTime,
TimeUnit.NANOSECONDS.toMillis(event.taskMetrics.executorCpuTime),
event.taskMetrics.executorDeserializeCpuTime, // nanoseconds
event.taskMetrics.executorRunTime, // milliseconds
event.taskMetrics.executorCpuTime, // nanoseconds
event.taskMetrics.peakExecutionMemory,
event.taskMetrics.resultSize,
event.taskMetrics.jvmGCTime,
Expand All @@ -109,7 +107,7 @@ object TaskModel {
event.taskMetrics.shuffleReadMetrics.localBytesRead,
event.taskMetrics.shuffleReadMetrics.totalBytesRead,
event.taskMetrics.shuffleWriteMetrics.bytesWritten,
TimeUnit.NANOSECONDS.toMillis(event.taskMetrics.shuffleWriteMetrics.writeTime),
event.taskMetrics.shuffleWriteMetrics.writeTime, // nanoseconds
event.taskMetrics.shuffleWriteMetrics.recordsWritten,
event.taskMetrics.inputMetrics.bytesRead,
event.taskMetrics.inputMetrics.recordsRead,
Expand Down
Loading
Loading