From e13e1f3864255f102897201e3a600eb6f17b060e Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 11 Jan 2024 18:27:28 +0800 Subject: [PATCH 1/3] Flink supports udf function --- .../deployment/ClusterDescriptorAdapter.java | 2 +- .../flink/client/utils/FlinkUdfUtils.java | 116 ++++++++++++++++++ .../FlinkSQLComputationExecutor.scala | 93 ++++++++++++++ .../launch/FlinkEngineConnLaunchBuilder.scala | 18 ++- 4 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java index a5ac102033..594f8dd98d 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java @@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable { public static final long CLIENT_REQUEST_TIMEOUT = FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong(); - protected final ExecutionContext executionContext; + public final ExecutionContext executionContext; // jobId is not null only after job is submitted private JobID jobId; protected ApplicationId clusterID; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java new file mode 100644 index 0000000000..9df14197d0 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.functions.UserDefinedFunction; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkUdfUtils { + + private static final Logger logger = LoggerFactory.getLogger(FlinkUdfUtils.class); + + private static final String CREATE_TEMP_FUNCTION_PATTERN = + "create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\""; + + private static final String CREATE_TEMP_FUNCTION_SQL = + "CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' "; + + public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, String path) { + logger.info("Flink udf start add pipeline classpaths, jar path: {}", path); + + try { + Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configuration.setAccessible(true); + Configuration conf = (Configuration) configuration.get(env); + + Field confData = Configuration.class.getDeclaredField("confData"); + confData.setAccessible(true); + Map map = (Map) confData.get(conf); + List jarList = new ArrayList<>(); + jarList.add(path); + map.put(PipelineOptions.CLASSPATHS.key(), jarList); + } catch (Exception e) { + logger.warn("Flink udf add pipeline classpaths failed", e); + } + } + + public static void loadJar(String jarPath) { + logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath); + + Method method = null; + Boolean accessible = null; + try { + method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + accessible = method.isAccessible(); + + if (accessible == false) { + method.setAccessible(true); + } + URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + method.invoke(classLoader, new URL(jarPath)); + + } catch (Exception e) { + logger.warn("Flink udf URLClassLoader loadJar failed", e); + } finally { + if (accessible != null) { + method.setAccessible(accessible); + } + } + } + + public static String extractUdfClass(String statement) { + Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN); + Matcher matcher = pattern.matcher(statement); + if (matcher.find() && matcher.groupCount() >= 2) { + return matcher.group(2); + } + return ""; + } + + public static boolean isFlinkUdf(ClassLoader classLoader, String className) { + try { + Class udfClass = classLoader.loadClass(className); + if (UserDefinedFunction.class.isAssignableFrom(udfClass)) { + return true; + } + + } catch (ClassNotFoundException e) { + logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: {}", className); + } + return false; + } + + public static String generateFlinkUdfSql(String name, String className) { + return String.format(CREATE_TEMP_FUNCTION_SQL, name, className); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 60f1d9088b..09d28cfd57 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -39,6 +39,7 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser +import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils import org.apache.linkis.engineconnplugin.flink.config.{ FlinkEnvConfiguration, FlinkExecutionTargetType @@ -50,6 +51,7 @@ import org.apache.linkis.engineconnplugin.flink.listener.{ } import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType import org.apache.linkis.governance.common.paser.SQLCodeParser +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.{ ErrorExecuteResponse, @@ -57,11 +59,14 @@ import org.apache.linkis.scheduler.executer.{ SuccessExecuteResponse } import org.apache.linkis.storage.resultset.ResultSetFactory +import org.apache.linkis.udf.UDFClient import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase} +import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobStatus._ import org.apache.flink.configuration.DeploymentOptions import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions +import org.apache.flink.table.api.TableResult import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.yarn.util.ConverterUtils @@ -125,6 +130,9 @@ class FlinkSQLComputationExecutor( engineExecutionContext: EngineExecutionContext, code: String ): ExecuteResponse = { + // The load flink udf failure does not affect task execution + Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext)) + val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true) val callSQL = if (!callOpt.isPresent) @@ -194,6 +202,91 @@ class FlinkSQLComputationExecutor( } } + private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = { + val userCreatorLabel = engineExecutionContext.getLabels + .find(_.isInstanceOf[UserCreatorLabel]) + .get + .asInstanceOf[UserCreatorLabel] + userCreatorLabel.getUser + } + + private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = { + logger.info("Flink start load udf") + + val execSqlUser = getExecSqlUser(engineExecutionContext) + val udfAllLoad: String = + engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString + val udfIdStr: String = + engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString + val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong) + + logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds + .mkString("Array(", ", ", ")")}") + + val udfInfos = + if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser) + else UDFClient.getJarUdfByIds(execSqlUser, udfIds) + + if (udfInfos.nonEmpty) { + import scala.util.control.Breaks._ + udfInfos.foreach { udfInfo => + val path: String = udfInfo.getPath + val registerFormat: String = udfInfo.getRegisterFormat + + breakable { + if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { + logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty") + break() + } + logger.info( + s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n" + ) + + val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) + if (StringUtils.isBlank(udfClassName)) { + logger.warn("flink udf extract Udf Class cannot is empty") + break() + } + + FlinkUdfUtils.loadJar(path) + + if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { + logger.warn( + "There is no extends UserDefinedFunction, skip loading flink udf: {} ", + path + ) + break() + } + + val context = clusterDescriptor.executionContext + val flinkUdfSql: String = + FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) + + FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path) + val tableEnv = context.getTableEnvironment + logger.info("Flink execute udf sql:{}", flinkUdfSql) + val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql) + + var loadUdfLog = + s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ." + if ( + tableResult.getResultKind != null && tableResult.getResultKind + .name() + .equalsIgnoreCase("SUCCESS") + ) { + loadUdfLog = String.format(loadUdfLog, "success") + logger.info(loadUdfLog) + } else { + loadUdfLog = String.format(loadUdfLog, "failed") + logger.error(loadUdfLog) + } + engineExecutionContext.appendStdout(loadUdfLog) + } + } + + } + } + override def executeCompletely( engineExecutorContext: EngineExecutionContext, code: String, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 70b3ad1b20..13a5bae4d5 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -37,12 +37,15 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta addPathToClassPath, CLASS_PATH_SEPARATOR } -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, UserCreatorLabel} +import org.apache.linkis.manager.label.utils.LabelUtil import java.util import scala.collection.JavaConverters._ +import com.google.common.collect.Lists + class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getCommands(implicit @@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def ifAddHiveConfigPath: Boolean = true + override protected def getEngineConnManagerHooks(implicit + engineConnBuildRequest: EngineConnBuildRequest + ): java.util.List[String] = if (isOnceMode) { + super.getEngineConnManagerHooks(engineConnBuildRequest) + } else { + Lists.newArrayList("JarUDFLoadECMHook") + } + + def isOnceMode: Boolean = { + val engineConnMode = LabelUtil.getEngineConnMode(engineConnBuildRequest.labels) + EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once + } + } From 5378a184de5bfca71991496d4d48c33371687dbb Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 12 Jan 2024 15:26:26 +0800 Subject: [PATCH 2/3] add flink udf hook --- .../sql/operation/OperationFactoryImpl.java | 1 + .../sql/operation/impl/DDLOperation.java | 3 + .../flink/client/sql/parser/SqlCommand.java | 2 + .../sql/parser/SqlCommandParserImpl.java | 3 + .../resources/linkis-engineconn.properties | 2 +- .../FlinkSQLComputationExecutor.scala | 93 ------------------- .../flink/hook/FlinkJarUdfEngineHook.scala | 68 ++++++++++++++ .../flink/LinkisFlinkUdfExample.java | 26 ++++++ 8 files changed, 104 insertions(+), 94 deletions(-) create mode 100644 linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala create mode 100644 linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java index 4329acb804..9f7d8deeff 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java @@ -49,6 +49,7 @@ public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext con context, call.operands[0], Boolean.parseBoolean(call.operands[1])); break; case CREATE_TABLE: + case CREATE_FUNCTION: case DROP_TABLE: case ALTER_TABLE: case CREATE_CATALOG: diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java index 21f6081123..ec674d5159 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java @@ -59,6 +59,9 @@ private String getExceptionMsg() { case CREATE_TABLE: actionMsg = "create a table"; break; + case CREATE_FUNCTION: + actionMsg = "create a function"; + break; case CREATE_DATABASE: actionMsg = "create a database"; break; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java index 9f6ef738e1..03c18c3bbd 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java @@ -46,6 +46,8 @@ public enum SqlCommand { CREATE_DATABASE, + CREATE_FUNCTION, + ALTER_DATABASE, DROP_DATABASE, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java index f8eb32605c..211d899bcf 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java @@ -152,6 +152,9 @@ private Optional parseStmt(String stmt, boolean isBlinkPlanner) } else if (node instanceof SqlCreateDatabase) { cmd = SqlCommand.CREATE_DATABASE; operands = new String[] {stmt}; + } else if (node instanceof SqlCreateFunction) { + cmd = SqlCommand.CREATE_FUNCTION; + operands = new String[] {stmt}; } else if (node instanceof SqlDropDatabase) { cmd = SqlCommand.DROP_DATABASE; operands = new String[] {stmt}; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties index 2012023076..587a150eda 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties @@ -18,5 +18,5 @@ wds.linkis.server.version=v1 wds.linkis.engineconn.debug.enable=true #wds.linkis.keytab.enable=true wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin -wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook +wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager \ No newline at end of file diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 09d28cfd57..60f1d9088b 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -39,7 +39,6 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser -import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils import org.apache.linkis.engineconnplugin.flink.config.{ FlinkEnvConfiguration, FlinkExecutionTargetType @@ -51,7 +50,6 @@ import org.apache.linkis.engineconnplugin.flink.listener.{ } import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType import org.apache.linkis.governance.common.paser.SQLCodeParser -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.{ ErrorExecuteResponse, @@ -59,14 +57,11 @@ import org.apache.linkis.scheduler.executer.{ SuccessExecuteResponse } import org.apache.linkis.storage.resultset.ResultSetFactory -import org.apache.linkis.udf.UDFClient import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase} -import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobStatus._ import org.apache.flink.configuration.DeploymentOptions import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -import org.apache.flink.table.api.TableResult import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.yarn.util.ConverterUtils @@ -130,9 +125,6 @@ class FlinkSQLComputationExecutor( engineExecutionContext: EngineExecutionContext, code: String ): ExecuteResponse = { - // The load flink udf failure does not affect task execution - Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext)) - val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true) val callSQL = if (!callOpt.isPresent) @@ -202,91 +194,6 @@ class FlinkSQLComputationExecutor( } } - private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = { - val userCreatorLabel = engineExecutionContext.getLabels - .find(_.isInstanceOf[UserCreatorLabel]) - .get - .asInstanceOf[UserCreatorLabel] - userCreatorLabel.getUser - } - - private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = { - logger.info("Flink start load udf") - - val execSqlUser = getExecSqlUser(engineExecutionContext) - val udfAllLoad: String = - engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString - val udfIdStr: String = - engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString - val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong) - - logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds - .mkString("Array(", ", ", ")")}") - - val udfInfos = - if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser) - else UDFClient.getJarUdfByIds(execSqlUser, udfIds) - - if (udfInfos.nonEmpty) { - import scala.util.control.Breaks._ - udfInfos.foreach { udfInfo => - val path: String = udfInfo.getPath - val registerFormat: String = udfInfo.getRegisterFormat - - breakable { - if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { - logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty") - break() - } - logger.info( - s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n" - ) - - val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) - if (StringUtils.isBlank(udfClassName)) { - logger.warn("flink udf extract Udf Class cannot is empty") - break() - } - - FlinkUdfUtils.loadJar(path) - - if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { - logger.warn( - "There is no extends UserDefinedFunction, skip loading flink udf: {} ", - path - ) - break() - } - - val context = clusterDescriptor.executionContext - val flinkUdfSql: String = - FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) - - FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path) - val tableEnv = context.getTableEnvironment - logger.info("Flink execute udf sql:{}", flinkUdfSql) - val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql) - - var loadUdfLog = - s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ." - if ( - tableResult.getResultKind != null && tableResult.getResultKind - .name() - .equalsIgnoreCase("SUCCESS") - ) { - loadUdfLog = String.format(loadUdfLog, "success") - logger.info(loadUdfLog) - } else { - loadUdfLog = String.format(loadUdfLog, "failed") - logger.error(loadUdfLog) - } - engineExecutionContext.appendStdout(loadUdfLog) - } - } - - } - } - override def executeCompletely( engineExecutorContext: EngineExecutionContext, code: String, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala new file mode 100644 index 0000000000..fe590f46f9 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.hook + +import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook +import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils +import org.apache.linkis.manager.label.entity.engine.RunType +import org.apache.linkis.udf.utils.ConstantVar +import org.apache.linkis.udf.vo.UDFInfoVo + +import org.apache.commons.lang3.StringUtils + +class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook { + override val udfType: BigInt = ConstantVar.UDF_JAR + override val category: String = ConstantVar.UDF + override val runType = RunType.SQL + + override protected def constructCode(udfInfo: UDFInfoVo): String = { + val path: String = udfInfo.getPath + val registerFormat: String = udfInfo.getRegisterFormat + + if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { + logger.warn("Flink udfInfo path or registerFormat cannot is empty") + return "" + } + + val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) + if (StringUtils.isBlank(udfClassName)) { + logger.warn("Flink extract udf class name cannot is empty") + return "" + } + + FlinkUdfUtils.loadJar(path) + + if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { + logger.warn( + "There is no extends Flink UserDefinedFunction, skip loading flink udf: {} ", + path + ) + return "" + } + + val flinkUdfSql: String = + FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) + + logger.info( + s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n" + ) + + "%sql\n" + flinkUdfSql + } + +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java new file mode 100644 index 0000000000..ce2b05e693 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.flink; + +import org.apache.flink.table.functions.ScalarFunction; + +public class LinkisFlinkUdfExample extends ScalarFunction { + public String eval(String str) { + return String.format("linkis flink udf test: %s", str); + } +} From 952efd3e0815f4801ae0c5ff4aa8331dcca7919f Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 12 Jan 2024 19:05:17 +0800 Subject: [PATCH 3/3] Code optimization --- .../flink/client/utils/FlinkUdfUtils.java | 7 ++++ .../FlinkSQLComputationExecutor.scala | 2 +- .../flink/hook/FlinkJarUdfEngineHook.scala | 41 ++++++++++++++++++- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java index 9df14197d0..e299c08f70 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java @@ -17,6 +17,7 @@ package org.apache.linkis.engineconnplugin.flink.client.utils; +import org.apache.commons.collections.CollectionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -27,6 +28,7 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.regex.Matcher; @@ -57,6 +59,11 @@ public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, St confData.setAccessible(true); Map map = (Map) confData.get(conf); List jarList = new ArrayList<>(); + List oldList = + conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList); + if (CollectionUtils.isNotEmpty(oldList)) { + jarList.addAll(oldList); + } jarList.add(path); map.put(PipelineOptions.CLASSPATHS.key(), jarList); } catch (Exception e) { diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 60f1d9088b..f835db9694 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor( with FlinkExecutor { private var operation: JobOperation = _ - private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _ + var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _ override def init(): Unit = { setCodeParser(new SQLCodeParser) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala index fe590f46f9..bc3d0f1f4a 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala @@ -17,19 +17,28 @@ package org.apache.linkis.engineconnplugin.flink.hook +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook +import org.apache.linkis.engineconn.core.executor.ExecutorManager import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils -import org.apache.linkis.manager.label.entity.engine.RunType +import org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineTypeLabel, RunType} import org.apache.linkis.udf.utils.ConstantVar import org.apache.linkis.udf.vo.UDFInfoVo import org.apache.commons.lang3.StringUtils +import scala.collection.JavaConverters.asScalaBufferConverter + class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook { override val udfType: BigInt = ConstantVar.UDF_JAR override val category: String = ConstantVar.UDF override val runType = RunType.SQL + var labels: Array[Label[_]] = null + override protected def constructCode(udfInfo: UDFInfoVo): String = { val path: String = udfInfo.getPath val registerFormat: String = udfInfo.getRegisterFormat @@ -62,7 +71,37 @@ class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook { s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n" ) + if (labels != null && labels.nonEmpty) { + val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) + executor match { + case computationExecutor: FlinkSQLComputationExecutor => + FlinkUdfUtils.addFlinkPipelineClasspaths( + computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment, + path + ) + case _ => + } + } + "%sql\n" + flinkUdfSql } + override def afterExecutionExecute( + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): Unit = { + val codeLanguageLabel = new CodeLanguageLabel + engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) match { + case Some(engineTypeLabel) => + codeLanguageLabel.setCodeType( + getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString + ) + case None => + codeLanguageLabel.setCodeType(runType.toString) + } + labels = Array[Label[_]](codeLanguageLabel) + + super.afterExecutionExecute(engineCreationContext, engineConn) + } + }