diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java index a5ac102033..594f8dd98d 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java @@ -51,7 +51,7 @@ public abstract class ClusterDescriptorAdapter implements Closeable { public static final long CLIENT_REQUEST_TIMEOUT = FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue().toLong(); - protected final ExecutionContext executionContext; + public final ExecutionContext executionContext; // jobId is not null only after job is submitted private JobID jobId; protected ApplicationId clusterID; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java index 4329acb804..9f7d8deeff 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/OperationFactoryImpl.java @@ -49,6 +49,7 @@ public Operation createOperation(SqlCommandCall call, FlinkEngineConnContext con context, call.operands[0], Boolean.parseBoolean(call.operands[1])); break; case CREATE_TABLE: + case CREATE_FUNCTION: case DROP_TABLE: case ALTER_TABLE: case CREATE_CATALOG: diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java index 21f6081123..ec674d5159 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/operation/impl/DDLOperation.java @@ -59,6 +59,9 @@ private String getExceptionMsg() { case CREATE_TABLE: actionMsg = "create a table"; break; + case CREATE_FUNCTION: + actionMsg = "create a function"; + break; case CREATE_DATABASE: actionMsg = "create a database"; break; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java index 9f6ef738e1..03c18c3bbd 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommand.java @@ -46,6 +46,8 @@ public enum SqlCommand { CREATE_DATABASE, + CREATE_FUNCTION, + ALTER_DATABASE, DROP_DATABASE, diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java index f8eb32605c..211d899bcf 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/sql/parser/SqlCommandParserImpl.java @@ -152,6 +152,9 @@ private Optional parseStmt(String stmt, boolean isBlinkPlanner) } else if (node instanceof SqlCreateDatabase) { cmd = SqlCommand.CREATE_DATABASE; operands = new String[] {stmt}; + } else if (node instanceof SqlCreateFunction) { + cmd = SqlCommand.CREATE_FUNCTION; + operands = new String[] {stmt}; } else if (node instanceof SqlDropDatabase) { cmd = SqlCommand.DROP_DATABASE; operands = new String[] {stmt}; diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java new file mode 100644 index 0000000000..e299c08f70 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/java/org/apache/linkis/engineconnplugin/flink/client/utils/FlinkUdfUtils.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.client.utils; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.functions.UserDefinedFunction; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlinkUdfUtils { + + private static final Logger logger = LoggerFactory.getLogger(FlinkUdfUtils.class); + + private static final String CREATE_TEMP_FUNCTION_PATTERN = + "create\\s+temporary\\s+function\\s+(\\w+)\\s+as\\s+\"(.*?)\""; + + private static final String CREATE_TEMP_FUNCTION_SQL = + "CREATE TEMPORARY FUNCTION IF NOT EXISTS %s AS '%s' "; + + public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, String path) { + logger.info("Flink udf start add pipeline classpaths, jar path: {}", path); + + try { + Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); + configuration.setAccessible(true); + Configuration conf = (Configuration) configuration.get(env); + + Field confData = Configuration.class.getDeclaredField("confData"); + confData.setAccessible(true); + Map map = (Map) confData.get(conf); + List jarList = new ArrayList<>(); + List oldList = + conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList); + if (CollectionUtils.isNotEmpty(oldList)) { + jarList.addAll(oldList); + } + jarList.add(path); + map.put(PipelineOptions.CLASSPATHS.key(), jarList); + } catch (Exception e) { + logger.warn("Flink udf add pipeline classpaths failed", e); + } + } + + public static void loadJar(String jarPath) { + logger.info("Flink udf URLClassLoader start loadJar: {}", jarPath); + + Method method = null; + Boolean accessible = null; + try { + method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); + accessible = method.isAccessible(); + + if (accessible == false) { + method.setAccessible(true); + } + URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + method.invoke(classLoader, new URL(jarPath)); + + } catch (Exception e) { + logger.warn("Flink udf URLClassLoader loadJar failed", e); + } finally { + if (accessible != null) { + method.setAccessible(accessible); + } + } + } + + public static String extractUdfClass(String statement) { + Pattern pattern = Pattern.compile(CREATE_TEMP_FUNCTION_PATTERN); + Matcher matcher = pattern.matcher(statement); + if (matcher.find() && matcher.groupCount() >= 2) { + return matcher.group(2); + } + return ""; + } + + public static boolean isFlinkUdf(ClassLoader classLoader, String className) { + try { + Class udfClass = classLoader.loadClass(className); + if (UserDefinedFunction.class.isAssignableFrom(udfClass)) { + return true; + } + + } catch (ClassNotFoundException e) { + logger.warn("flink udf load isFlinkUdf failed, ClassNotFoundException: {}", className); + } + return false; + } + + public static String generateFlinkUdfSql(String name, String className) { + return String.format(CREATE_TEMP_FUNCTION_SQL, name, className); + } +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties index 2012023076..587a150eda 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/resources/linkis-engineconn.properties @@ -18,5 +18,5 @@ wds.linkis.server.version=v1 wds.linkis.engineconn.debug.enable=true #wds.linkis.keytab.enable=true wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineconnplugin.flink.FlinkEngineConnPlugin -wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook +wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconnplugin.flink.hook.FlinkJarUdfEngineHook wds.linkis.engineconn.executor.manager.class=org.apache.linkis.engineconnplugin.flink.executormanager.FlinkExecutorManager \ No newline at end of file diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala index 60f1d9088b..f835db9694 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkSQLComputationExecutor.scala @@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor( with FlinkExecutor { private var operation: JobOperation = _ - private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _ + var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _ override def init(): Unit = { setCodeParser(new SQLCodeParser) diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala new file mode 100644 index 0000000000..bc3d0f1f4a --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/hook/FlinkJarUdfEngineHook.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineconnplugin.flink.hook + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook +import org.apache.linkis.engineconn.core.executor.ExecutorManager +import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils +import org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineTypeLabel, RunType} +import org.apache.linkis.udf.utils.ConstantVar +import org.apache.linkis.udf.vo.UDFInfoVo + +import org.apache.commons.lang3.StringUtils + +import scala.collection.JavaConverters.asScalaBufferConverter + +class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook { + override val udfType: BigInt = ConstantVar.UDF_JAR + override val category: String = ConstantVar.UDF + override val runType = RunType.SQL + + var labels: Array[Label[_]] = null + + override protected def constructCode(udfInfo: UDFInfoVo): String = { + val path: String = udfInfo.getPath + val registerFormat: String = udfInfo.getRegisterFormat + + if (StringUtils.isBlank(path) && StringUtils.isBlank(registerFormat)) { + logger.warn("Flink udfInfo path or registerFormat cannot is empty") + return "" + } + + val udfClassName: String = FlinkUdfUtils.extractUdfClass(registerFormat) + if (StringUtils.isBlank(udfClassName)) { + logger.warn("Flink extract udf class name cannot is empty") + return "" + } + + FlinkUdfUtils.loadJar(path) + + if (!FlinkUdfUtils.isFlinkUdf(ClassLoader.getSystemClassLoader(), udfClassName)) { + logger.warn( + "There is no extends Flink UserDefinedFunction, skip loading flink udf: {} ", + path + ) + return "" + } + + val flinkUdfSql: String = + FlinkUdfUtils.generateFlinkUdfSql(udfInfo.getUdfName, udfClassName) + + logger.info( + s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n" + ) + + if (labels != null && labels.nonEmpty) { + val executor = ExecutorManager.getInstance.getExecutorByLabels(labels) + executor match { + case computationExecutor: FlinkSQLComputationExecutor => + FlinkUdfUtils.addFlinkPipelineClasspaths( + computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment, + path + ) + case _ => + } + } + + "%sql\n" + flinkUdfSql + } + + override def afterExecutionExecute( + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): Unit = { + val codeLanguageLabel = new CodeLanguageLabel + engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) match { + case Some(engineTypeLabel) => + codeLanguageLabel.setCodeType( + getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString + ) + case None => + codeLanguageLabel.setCodeType(runType.toString) + } + labels = Array[Label[_]](codeLanguageLabel) + + super.afterExecutionExecute(engineCreationContext, engineConn) + } + +} diff --git a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala index 70b3ad1b20..13a5bae4d5 100644 --- a/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/flink/flink-core/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala @@ -37,12 +37,15 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConsta addPathToClassPath, CLASS_PATH_SEPARATOR } -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel +import org.apache.linkis.manager.label.entity.engine.{EngineConnMode, UserCreatorLabel} +import org.apache.linkis.manager.label.utils.LabelUtil import java.util import scala.collection.JavaConverters._ +import com.google.common.collect.Lists + class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def getCommands(implicit @@ -136,4 +139,17 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { override protected def ifAddHiveConfigPath: Boolean = true + override protected def getEngineConnManagerHooks(implicit + engineConnBuildRequest: EngineConnBuildRequest + ): java.util.List[String] = if (isOnceMode) { + super.getEngineConnManagerHooks(engineConnBuildRequest) + } else { + Lists.newArrayList("JarUDFLoadECMHook") + } + + def isOnceMode: Boolean = { + val engineConnMode = LabelUtil.getEngineConnMode(engineConnBuildRequest.labels) + EngineConnMode.toEngineConnMode(engineConnMode) == EngineConnMode.Once + } + } diff --git a/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java new file mode 100644 index 0000000000..ce2b05e693 --- /dev/null +++ b/linkis-engineconn-plugins/flink/flink-core/src/test/java/org/apache/linkis/engineplugin/flink/LinkisFlinkUdfExample.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.flink; + +import org.apache.flink.table.functions.ScalarFunction; + +public class LinkisFlinkUdfExample extends ScalarFunction { + public String eval(String str) { + return String.format("linkis flink udf test: %s", str); + } +}