From 2f9f9bbf0ab01f89a793d79f0672024b401a229d Mon Sep 17 00:00:00 2001 From: yangwenzea <45863771+yangwenzea@users.noreply.github.com> Date: Fri, 8 Dec 2023 23:25:58 +0800 Subject: [PATCH] flink load default configuration (#5025) * flink load default configuration * fix gc log bug * code format --- .../flink/config/FlinkEnvConfiguration.scala | 7 +++ .../factory/FlinkEngineConnFactory.scala | 54 +++++++++++++++++- .../flink/util/FlinkValueFormatUtil.scala | 57 +++++++++++++++++++ 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index bcd721c162..a6bbceb585 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -174,4 +174,11 @@ object FlinkEnvConfiguration { val FLINK_HANDSHAKE_WAIT_TIME_MILLS = CommonVars("linkis.flink.handshake.wait.time.mills", 60 * 1000) + val FLINK_CONF_YAML = CommonVars("flink.conf.yaml.dir", "flink-conf.yaml") + + val FLINK_YAML_MERGE_ENABLE = CommonVars("flink.yaml.merge.enable", true) + + val FLINK_ENV_JAVA_OPTS = + CommonVars("flink.env.java.opts", "env.java.opts") + } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 1b9759d847..c3da253803 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -35,7 +35,7 @@ import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._ import org.apache.linkis.engineconnplugin.flink.config.FlinkResourceConfiguration._ import org.apache.linkis.engineconnplugin.flink.context.{EnvironmentContext, FlinkEngineConnContext} import org.apache.linkis.engineconnplugin.flink.setting.Settings -import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, ManagerUtil} +import org.apache.linkis.engineconnplugin.flink.util.{ClassUtil, FlinkValueFormatUtil, ManagerUtil} import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.manager.engineplugin.common.conf.EnvConfiguration import org.apache.linkis.manager.engineplugin.common.creation.{ @@ -55,7 +55,7 @@ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget} -import java.io.File +import java.io.{File, FileNotFoundException} import java.net.URL import java.text.MessageFormat import java.time.Duration @@ -63,8 +63,10 @@ import java.util import java.util.{Collections, Locale} import scala.collection.JavaConverters._ +import scala.io.Source import com.google.common.collect.{Lists, Sets} +import org.yaml.snakeyaml.Yaml class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging { @@ -196,7 +198,15 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots) // set extra configs options.asScala.filter { case (key, _) => key.startsWith(FLINK_CONFIG_PREFIX) }.foreach { - case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value) + case (key, value) => + var flinkConfigValue = value + if ( + FlinkEnvConfiguration.FLINK_YAML_MERGE_ENABLE.getValue && key + .equals(FLINK_CONFIG_PREFIX + FLINK_ENV_JAVA_OPTS.getValue) + ) { + flinkConfigValue = getExtractJavaOpts(value) + } + flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), flinkConfigValue) } // set kerberos config if (FLINK_KERBEROS_ENABLE.getValue(options)) { @@ -295,6 +305,44 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging context } + private def getExtractJavaOpts(envJavaOpts: String): String = { + var defaultJavaOpts = "" + val yamlFilePath = FLINK_CONF_DIR.getValue + val yamlFile = yamlFilePath + "/" + FLINK_CONF_YAML.getHotValue() + if (new File(yamlFile).exists()) { + val source = Source.fromFile(yamlFile) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + val inputStream = getClass.getResourceAsStream(yamlFile) + if (inputStream != null) { + val source = Source.fromInputStream(inputStream) + try { + val yamlContent = source.mkString + val yaml = new Yaml() + val configMap = yaml.loadAs(yamlContent, classOf[util.LinkedHashMap[String, Object]]) + if (configMap.containsKey(FLINK_ENV_JAVA_OPTS.getValue)) { + defaultJavaOpts = configMap.get(FLINK_ENV_JAVA_OPTS.getValue).toString + } + } finally { + source.close() + } + } else { + throw new FileNotFoundException("YAML file not found in both file system and classpath.") + } + } + val merged = FlinkValueFormatUtil.mergeAndDeduplicate(defaultJavaOpts, envJavaOpts) + merged + } + protected def isOnceEngineConn(labels: util.List[Label[_]]): Boolean = { val engineConnModeLabel = getEngineConnModeLabel(labels) engineConnModeLabel != null && (EngineConnMode.toEngineConnMode( diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala index 62782507eb..0160e97eab 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/util/FlinkValueFormatUtil.scala @@ -36,4 +36,61 @@ object FlinkValueFormatUtil { case _ => null } + def mergeAndDeduplicate(defaultJavaOpts: String, envJavaOpts: String): String = { + val patternX = """-XX:([^\s]+)=([^\s]+)""".r + val keyValueMapX = patternX + .findAllMatchIn(envJavaOpts) + .map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + } + .toMap + + val patternD = """-D([^\s]+)=([^\s]+)""".r + val keyValueMapD = patternD + .findAllMatchIn(envJavaOpts) + .map { matchResult => + val key = matchResult.group(1) + val value = matchResult.group(2) + (key, value) + } + .toMap + val xloggcPattern = """-Xloggc:[^\s]+""".r + val xloggcValueStr1 = xloggcPattern.findFirstMatchIn(defaultJavaOpts).getOrElse("").toString + val xloggcValueStr2 = xloggcPattern.findFirstMatchIn(envJavaOpts).getOrElse("").toString + var escapedXloggcValue = "" + var replaceStr1 = "" + var replaceStr2 = "" + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.nonEmpty) { + escapedXloggcValue = xloggcValueStr2.replace("\\<", "<").replace("\\>", ">") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts.replace(xloggcValueStr2, "") + } + if (xloggcValueStr1.nonEmpty && xloggcValueStr2.isEmpty) { + escapedXloggcValue = xloggcValueStr1.replace("\\<", "<").replace("\\>", ">") + replaceStr1 = defaultJavaOpts.replace(xloggcValueStr1, escapedXloggcValue) + replaceStr2 = envJavaOpts + } + if (xloggcValueStr1.isEmpty && xloggcValueStr2.isEmpty) { + replaceStr1 = defaultJavaOpts + replaceStr2 = envJavaOpts + } + val MergedStringX = keyValueMapX.foldLeft(replaceStr1) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + + val MergedStringD = keyValueMapD.foldLeft(MergedStringX) { (result, entry) => + val (key, value) = entry + val oldValue = s"$key=[^\\s]+" + val newValue = key + "=" + value + result.replaceAll(oldValue, newValue) + } + val javaOpts = (MergedStringD.split("\\s+") ++ replaceStr2.split("\\s+")).distinct.mkString(" ") + javaOpts + } + }