From 2347ae84768cf35247129f72bdaf484d359a5525 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 29 Nov 2023 22:24:56 +0800 Subject: [PATCH] Fix flink-1.16 ClassNotFoundException bug (#5001) * Fix flink-1.16 ClassNotFoundException bug * Fix flink-1.16 ClassNotFoundException bug --- .../flink/config/FlinkEnvConfiguration.scala | 6 ++++-- .../flink/factory/FlinkEngineConnFactory.scala | 9 +++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala index 6b521dceed..bcd721c162 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala @@ -38,7 +38,7 @@ object FlinkEnvConfiguration { val FLINK_DIST_JAR_PATH = CommonVars( "flink.dist.jar.path", - FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar" + FLINK_HOME.getValue + s"/lib/flink-dist-${FLINK_VERSION.getValue}.jar" ) val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "") @@ -58,7 +58,9 @@ object FlinkEnvConfiguration { "The local lib path of each user in Flink EngineConn." ) - val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "") + val FLINK_SHIP_DIRECTORIES = + CommonVars("flink.yarn.ship-directories", FLINK_HOME.getValue + "/lib") + val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "") val FLINK_CHECK_POINT_ENABLE = CommonVars("flink.app.checkpoint.enable", false) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala index 1c6db3bba9..1b9759d847 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala @@ -108,7 +108,13 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging val flinkHome = FLINK_HOME.getValue(options) val flinkConfDir = FLINK_CONF_DIR.getValue(options) val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options) - val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options) + val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options) + var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options) + if ( + StringUtils.isNotBlank(flinkVersion) && flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION) + ) { + flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", "flink-dist_2.11") + } // Local lib path val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",") // Ship directories @@ -126,7 +132,6 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging ) } otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase()) - val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options) FlinkVersionThreadLocal.setFlinkVersion(flinkVersion) val context = new EnvironmentContext( defaultEnv,