Skip to content

Commit

Permalink
Fix flink-1.16 ClassNotFoundException bug (#5001)
Browse files Browse the repository at this point in the history
* Fix flink-1.16 ClassNotFoundException bug

* Fix flink-1.16 ClassNotFoundException bug
  • Loading branch information
ChengJie1053 authored Nov 29, 2023
1 parent 03540ea commit 2347ae8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object FlinkEnvConfiguration {

val FLINK_DIST_JAR_PATH = CommonVars(
"flink.dist.jar.path",
FLINK_HOME.getValue + s"/lib/flink-dist_2.11-${FLINK_VERSION.getValue}.jar"
FLINK_HOME.getValue + s"/lib/flink-dist-${FLINK_VERSION.getValue}.jar"
)

val FLINK_PROVIDED_LIB_PATH = CommonVars("flink.lib.path", "")
Expand All @@ -58,7 +58,9 @@ object FlinkEnvConfiguration {
"The local lib path of each user in Flink EngineConn."
)

val FLINK_SHIP_DIRECTORIES = CommonVars("flink.yarn.ship-directories", "")
val FLINK_SHIP_DIRECTORIES =
CommonVars("flink.yarn.ship-directories", FLINK_HOME.getValue + "/lib")

val FLINK_SHIP_REMOTE_DIRECTORIES = CommonVars("flink.yarn.remote.ship-directories", "")

val FLINK_CHECK_POINT_ENABLE = CommonVars("flink.app.checkpoint.enable", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val flinkHome = FLINK_HOME.getValue(options)
val flinkConfDir = FLINK_CONF_DIR.getValue(options)
val flinkProvidedLibPath = FLINK_PROVIDED_LIB_PATH.getValue(options)
val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
var flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
if (
StringUtils.isNotBlank(flinkVersion) && flinkVersion.equalsIgnoreCase(FLINK_1_12_2_VERSION)
) {
flinkDistJarPath = flinkDistJarPath.replaceFirst("flink-dist", "flink-dist_2.11")
}
// Local lib path
val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
// Ship directories
Expand All @@ -126,7 +132,6 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
)
}
otherParams.put(GovernanceCommonConf.EC_APP_MANAGE_MODE.key, flinkClientType.toLowerCase())
val flinkVersion = FlinkEnvConfiguration.FLINK_VERSION.getValue(options)
FlinkVersionThreadLocal.setFlinkVersion(flinkVersion)
val context = new EnvironmentContext(
defaultEnv,
Expand Down

0 comments on commit 2347ae8

Please sign in to comment.