Skip to content

Commit

Permalink
Code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Jan 12, 2024
1 parent 5378a18 commit 952efd3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.engineconnplugin.flink.client.utils;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -27,6 +28,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -57,6 +59,11 @@ public static void addFlinkPipelineClasspaths(StreamExecutionEnvironment env, St
confData.setAccessible(true);
Map<String, Object> map = (Map<String, Object>) confData.get(conf);
List<String> jarList = new ArrayList<>();
List<String> oldList =
conf.getOptional(PipelineOptions.CLASSPATHS).orElseGet(Collections::emptyList);
if (CollectionUtils.isNotEmpty(oldList)) {
jarList.addAll(oldList);
}
jarList.add(path);
map.put(PipelineOptions.CLASSPATHS.key(), jarList);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FlinkSQLComputationExecutor(
with FlinkExecutor {

private var operation: JobOperation = _
private var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _
var clusterDescriptor: AbstractSessionClusterDescriptorAdapter = _

override def init(): Unit = {
setCodeParser(new SQLCodeParser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@

package org.apache.linkis.engineconnplugin.flink.hook

import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.computation.executor.hook.UDFLoadEngineConnHook
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.engineconnplugin.flink.client.utils.FlinkUdfUtils
import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.engineconnplugin.flink.executor.FlinkSQLComputationExecutor
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineTypeLabel, RunType}
import org.apache.linkis.udf.utils.ConstantVar
import org.apache.linkis.udf.vo.UDFInfoVo

import org.apache.commons.lang3.StringUtils

import scala.collection.JavaConverters.asScalaBufferConverter

class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
override val udfType: BigInt = ConstantVar.UDF_JAR
override val category: String = ConstantVar.UDF
override val runType = RunType.SQL

var labels: Array[Label[_]] = null

override protected def constructCode(udfInfo: UDFInfoVo): String = {
val path: String = udfInfo.getPath
val registerFormat: String = udfInfo.getRegisterFormat
Expand Down Expand Up @@ -62,7 +71,37 @@ class FlinkJarUdfEngineHook extends UDFLoadEngineConnHook {
s"Flink start load udf, udfName:${udfInfo.getUdfName}, udfJar:${path}, udfClass:${udfClassName}\n"
)

if (labels != null && labels.nonEmpty) {
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
executor match {
case computationExecutor: FlinkSQLComputationExecutor =>
FlinkUdfUtils.addFlinkPipelineClasspaths(
computationExecutor.clusterDescriptor.executionContext.getStreamExecutionEnvironment,
path
)
case _ =>
}
}

"%sql\n" + flinkUdfSql
}

override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
val codeLanguageLabel = new CodeLanguageLabel
engineCreationContext.getLabels().asScala.find(_.isInstanceOf[EngineTypeLabel]) match {
case Some(engineTypeLabel) =>
codeLanguageLabel.setCodeType(
getRealRunType(engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType).toString
)
case None =>
codeLanguageLabel.setCodeType(runType.toString)
}
labels = Array[Label[_]](codeLanguageLabel)

super.afterExecutionExecute(engineCreationContext, engineConn)
}

}

0 comments on commit 952efd3

Please sign in to comment.