From 033b2a80db5d882177745244f5bb6a237bb4a9b7 Mon Sep 17 00:00:00 2001 From: peacewong Date: Tue, 10 Oct 2023 21:12:09 +0800 Subject: [PATCH] Spark supports printing parameters to task logs --- .../executor/SparkEngineConnExecutor.scala | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala index 8d97e81525..4a14cd39c7 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala @@ -100,9 +100,14 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) } val kind: Kind = getKind var preCode = code - engineExecutorContext.appendStdout( - LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}") - ) + + val isFirstParagraph = (engineExecutorContext.getCurrentParagraph == 1) + if (isFirstParagraph == true) { + engineExecutorContext.appendStdout( + LogUtils.generateInfo(s"yarn application id: ${sc.applicationId}") + ) + } + // Pre-execution hook var executionHook: SparkPreExecutionHook = null Utils.tryCatch { @@ -138,6 +143,37 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long) logger.info("Set jobGroup to " + jobGroup) sc.setJobGroup(jobGroup, _code, true) + // print job configuration, only the first paragraph + if (isFirstParagraph == true) { + Utils.tryCatch({ + val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt + val executorMem: Long = + ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.executor.memory")) + val driverMem: Long = ByteTimeUtils.byteStringAsGb(sc.getConf.get("spark.driver.memory")) + val sparkExecutorCores = sc.getConf.get("spark.executor.cores", "2").toInt + val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt + val queue = sc.getConf.get("spark.yarn.queue") + // with unit if set configuration with unit + // if not set sc get will get the value of spark.yarn.executor.memoryOverhead such as 512(without unit) + val memoryOverhead = sc.getConf.get("spark.executor.memoryOverhead", "1G") + + val sb = new StringBuilder + sb.append(s"spark.executor.instances=$executorNum\n") + sb.append(s"spark.executor.memory=${executorMem}G\n") + sb.append(s"spark.driver.memory=${driverMem}G\n") + sb.append(s"spark.executor.cores=$sparkExecutorCores\n") + sb.append(s"spark.driver.cores=$sparkDriverCores\n") + sb.append(s"spark.yarn.queue=$queue\n") + sb.append(s"spark.executor.memoryOverhead=${memoryOverhead}\n") + sb.append("\n") + engineExecutionContext.appendStdout( + LogUtils.generateInfo(s" Your spark job exec with configs:\n${sb.toString()}") + ) + })(t => { + logger.warn("Get actual used resource exception", t) + }) + } + val response = Utils.tryFinally(runCode(this, _code, engineExecutorContext, jobGroup)) { // Utils.tryAndWarn(this.engineExecutionContext.pushProgress(1, getProgressInfo(""))) jobGroup = null