Skip to content

Commit

Permalink
Spark supports printing parameters to task logs
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Oct 16, 2023
1 parent 247616a commit 033b2a8
Showing 1 changed file with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
}
val kind: Kind = getKind
var preCode = code
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
)

val isFirstParagraph = (engineExecutorContext.getCurrentParagraph == 1)
if (isFirstParagraph == true) {
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}")
)
}

// Pre-execution hook
var executionHook: SparkPreExecutionHook = null
Utils.tryCatch {
Expand Down Expand Up @@ -138,6 +143,37 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
logger.info("Set jobGroup to " + jobGroup)
sc.setJobGroup(jobGroup, _code, true)

// print job configuration, only the first paragraph
if (isFirstParagraph == true) {
Utils.tryCatch({
val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
val executorMem: Long =
ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.executor.memory"))
val driverMem: Long = ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.driver.memory"))
val sparkExecutorCores = sc.getConf.get("spark.executor.cores", "2").toInt
val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
val queue = sc.getConf.get("spark.yarn.queue")
// with unit if set configuration with unit
// if not set sc get will get the value of spark.yarn.executor.memoryOverhead such as 512(without unit)
val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", "1G")

val sb = new StringBuilder
sb.append(s"spark.executor.instances=$executorNum\n")
sb.append(s"spark.executor.memory=${executorMem}G\n")
sb.append(s"spark.driver.memory=${driverMem}G\n")
sb.append(s"spark.executor.cores=$sparkExecutorCores\n")
sb.append(s"spark.driver.cores=$sparkDriverCores\n")
sb.append(s"spark.yarn.queue=$queue\n")
sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n")
sb.append("\n")
engineExecutionContext.appendStdout(
LogUtils.generateInfo(s" Your spark job exec with configs:\n${sb.toString()}")
)
})(t => {
logger.warn("Get actual used resource exception", t)
})
}

val response = Utils.tryFinally(runCode(this, _code, engineExecutorContext, jobGroup)) {
// Utils.tryAndWarn(this.engineExecutionContext.pushProgress(1, getProgressInfo("")))
jobGroup = null
Expand Down

0 comments on commit 033b2a8

Please sign in to comment.