From 5378a184de5bfca71991496d4d48c33371687dbb Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Fri, 12 Jan 2024 15:26:26 +0800 Subject: [PATCH] add flink udf hook --- .../sql/operation/OperationFactoryImpl.java | 1 + .../sql/operation/impl/DDLOperation.java | 3 + .../flink/client/sql/parser/SqlCommand.java | 2 + .../sql/parser/SqlCommandParserImpl.java | 3 + .../resources/linkis-engineconn.properties | 2 +- .../FlinkSQLComputationExecutor.scala | 93 ------------------- .../flink/hook/FlinkJarUdfEngineHook.scala | 68 ++++++++++++++ .../flink/LinkisFlinkUdfExample.java | 26 ++++++ 8 files changed, 104 insertions(+), 94 deletions(-) create mode 100644 linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala create mode 100644 linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java index 4329acb804..9f7d8deeff 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java @@ -49,6 +49,7 @@ public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext con context, call.operands[0], Boolean.parseBoolean(call.operands[1])); break; case CREATE_TABLE: + case CREATE_FUNCTION: case DROP_TABLE: case ALTER_TABLE: case CREATE_CATALOG: diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java index 21f6081123..ec674d5159 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java @@ -59,6 +59,9 @@ private String getExceptionMsg() { case CREATE_TABLE: actionMsg = "create a table"; break; + case CREATE_FUNCTION: + actionMsg = "create a function"; + break; case CREATE_DATABASE: actionMsg = "create a database"; break; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java index 9f6ef738e1..03c18c3bbd 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java @@ -46,6 +46,8 @@ public enum SqlCommand { CREATE_DATABASE, + CREATE_FUNCTION, + ALTER_DATABASE, DROP_DATABASE, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java index f8eb32605c..211d899bcf 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java @@ -152,6 +152,9 @@ private Optional parseStmt(String stmt, boolean isBlinkPlanner) } else if (node instanceof SqlCreateDatabase) { cmd = SqlCommand.CREATE_DATABASE; operands = new String[] {stmt}; + } else if (node instanceof SqlCreateFunction) { + cmd = SqlCommand.CREATE_FUNCTION; + operands = new String[] {stmt}; } else if (node instanceof SqlDropDatabase) { cmd = SqlCommand.DROP_DATABASE; operands = new String[] {stmt}; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties index 2012023076..587a150eda 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties @@ -18,5 +18,5 @@ wds.linkis.server.version=v1 wds.linkis.engineconn.debug.enable=true #wds.linkis.keytab.enable=true wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin -wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook +wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager \ No newline at end of file diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 09d28cfd57..60f1d9088b 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -39,7 +39,6 @@ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.{ import org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl.InsertOperation import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ResultKind import org.apache.linkis.engineconnplugin.flink.client.sql.parser.SqlCommandParser -import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils import org.apache.linkis.engineconnplugin.flink.config.{ FlinkEnvConfiguration, FlinkExecutionTargetType @@ -51,7 +50,6 @@ import org.apache.linkis.engineconnplugin.flink.listener.{ } import org.apache.linkis.engineconnplugin.flink.listener.RowsType.RowsType import org.apache.linkis.governance.common.paser.SQLCodeParser -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.scheduler.executer.{ ErrorExecuteResponse, @@ -59,14 +57,11 @@ import org.apache.linkis.scheduler.executer.{ SuccessExecuteResponse } import org.apache.linkis.storage.resultset.ResultSetFactory -import org.apache.linkis.udf.UDFClient import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase} -import org.apache.commons.lang3.StringUtils import org.apache.flink.api.common.JobStatus._ import org.apache.flink.configuration.DeploymentOptions import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions -import org.apache.flink.table.api.TableResult import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider import org.apache.flink.yarn.configuration.YarnConfigOptions import org.apache.hadoop.yarn.util.ConverterUtils @@ -130,9 +125,6 @@ class FlinkSQLComputationExecutor( engineExecutionContext: EngineExecutionContext, code: String ): ExecuteResponse = { - // The load flink udf failure does not affect task execution - Utils.tryAndWarn(loadFlinkUdf(engineExecutionContext)) - val callOpt = SqlCommandParser.getSqlCommandParser.parse(code.trim, true) val callSQL = if (!callOpt.isPresent) @@ -202,91 +194,6 @@ class FlinkSQLComputationExecutor( } } - private def getExecSqlUser(engineExecutionContext: EngineExecutionContext): String = { - val userCreatorLabel = engineExecutionContext.getLabels - .find(_.isInstanceOf[UserCreatorLabel]) - .get - .asInstanceOf[UserCreatorLabel] - userCreatorLabel.getUser - } - - private def loadFlinkUdf(engineExecutionContext: EngineExecutionContext) = { - logger.info("Flink start load udf") - - val execSqlUser = getExecSqlUser(engineExecutionContext) - val udfAllLoad: String = - engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.all.load", "true").toString - val udfIdStr: String = - engineExecutionContext.getProperties.getOrDefault("linkis.user.udf.custom.ids", "").toString - val udfIds = udfIdStr.split(",").filter(StringUtils.isNotBlank).map(s => s.toLong) - - logger.info(s"start loading UDFs, user: $execSqlUser, load all: $udfAllLoad, udfIds: ${udfIds - .mkString("Array(", ", ", ")")}") - - val udfInfos = - if (udfAllLoad.toBoolean) UDFClient.getJarUdf(execSqlUser) - else UDFClient.getJarUdfByIds(execSqlUser, udfIds) - - if (udfInfos.nonEmpty) { - import scala.util.control.Breaks._ - udfInfos.foreach { udfInfo => - val path: String = udfInfo.getPath - val registerFormat: String = udfInfo.getRegisterFormat - - breakable { - if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { - logger.warn("flink udf udfInfo path or RegisterFormat cannot is empty") - break() - } - logger.info( - s"udfName:${udfInfo.getUdfName}, bml_resource_id:${udfInfo.getBmlResourceId}, bml_id:${udfInfo.getId}\n" - ) - - val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) - if (StringUtils.isBlank(udfClassName)) { - logger.warn("flink udf extract Udf Class cannot is empty") - break() - } - - FlinkUdfUtils.loadJar(path) - - if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { - logger.warn( - "There is no extends UserDefinedFunction, skip loading flink udf: {} ", - path - ) - break() - } - - val context = clusterDescriptor.executionContext - val flinkUdfSql: String = - FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) - - FlinkUdfUtils.addFlinkPipelineClasspaths(context.getStreamExecutionEnvironment, path) - val tableEnv = context.getTableEnvironment - logger.info("Flink execute udf sql:{}", flinkUdfSql) - val tableResult: TableResult = tableEnv.executeSql(flinkUdfSql) - - var loadUdfLog = - s"udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}, Flink load udf %s ." - if ( - tableResult.getResultKind != null && tableResult.getResultKind - .name() - .equalsIgnoreCase("SUCCESS") - ) { - loadUdfLog = String.format(loadUdfLog, "success") - logger.info(loadUdfLog) - } else { - loadUdfLog = String.format(loadUdfLog, "failed") - logger.error(loadUdfLog) - } - engineExecutionContext.appendStdout(loadUdfLog) - } - } - - } - } - override def executeCompletely( engineExecutorContext: EngineExecutionContext, code: String, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala new file mode 100644 index 0000000000..fe590f46f9 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.hook + +import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook +import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils +import org.apache.linkis.manager.label.entity.engine.RunType +import org.apache.linkis.udf.utils.ConstantVar +import org.apache.linkis.udf.vo.UDFInfoVo + +import org.apache.commons.lang3.StringUtils + +class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook { + override val udfType: BigInt = ConstantVar.UDF_JAR + override val category: String = ConstantVar.UDF + override val runType = RunType.SQL + + override protected def constructCode(udfInfo: UDFInfoVo): String = { + val path: String = udfInfo.getPath + val registerFormat: String = udfInfo.getRegisterFormat + + if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { + logger.warn("Flink udfInfo path or registerFormat cannot is empty") + return "" + } + + val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) + if (StringUtils.isBlank(udfClassName)) { + logger.warn("Flink extract udf class name cannot is empty") + return "" + } + + FlinkUdfUtils.loadJar(path) + + if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { + logger.warn( + "There is no extends Flink UserDefinedFunction, skip loading flink udf: {} ", + path + ) + return "" + } + + val flinkUdfSql: String = + FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) + + logger.info( + s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n" + ) + + "%sql\n" + flinkUdfSql + } + +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java new file mode 100644 index 0000000000..ce2b05e693 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.flink; + +import org.apache.flink.table.functions.ScalarFunction; + +public class LinkisFlinkUdfExample extends ScalarFunction { + public String eval(String str) { + return String.format("linkis flink udf test: %s", str); + } +}