Skip to content

Commit

Permalink
code review update
Browse files Browse the repository at this point in the history
  • Loading branch information
aiceflower committed Nov 6, 2024
1 parent 4f19fbb commit b56480d
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,7 @@ abstract class UDFLoad extends Logging {
// 判断是否加载了特殊udf
val udfNames: String = ComputationExecutorConf.SPECIAL_UDF_NAMES.getValue
udfInfos.foreach { l =>
if (
ComputationExecutorConf.SPECIAL_UDF_CHECK_ENABLED.getValue && StringUtils.isNotBlank(
udfNames
) && udfNames.split(",").exists(l.getUdfName.contains)
) {
if (StringUtils.isNotBlank(udfNames) && udfNames.split(",").exists(l.getUdfName.contains)) {
logger.info(s"add spacial udf check for job with udfNames: {}", udfNames)
System.getProperties.put(ComputationExecutorConf.ONLY_SQL_USE_UDF_KEY, udfNames)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,10 @@ class TaskExecutionServiceImpl
}

// only sql can use udf check, udfName set in UDFLoad
if (ComputationExecutorConf.SPECIAL_UDF_CHECK_ENABLED.getValue) {
val codeType: String = LabelUtil.getCodeType(requestTask.getLabels)
val languageType: String = CodeAndRunTypeUtils.getLanguageTypeByCodeType(codeType)
System.getProperties.put(ComputationExecutorConf.CODE_TYPE, languageType)
logger.info(s"add spacial udf check for job ${jobId} with codeType: {}", languageType)
}
val codeType: String = LabelUtil.getCodeType(requestTask.getLabels)
val languageType: String = CodeAndRunTypeUtils.getLanguageTypeByCodeType(codeType)
System.getProperties.put(ComputationExecutorConf.CODE_TYPE, languageType)
logger.info(s"add spacial udf check for job ${jobId} with codeType: {}", languageType)

val task = new CommonEngineConnTask(taskId, retryAble)
task.setCode(requestTask.getCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public enum SparkErrorCodeSummary implements LinkisErrorCode {
43032, "The application start failed, since yarn applicationId is null."),

NOT_SUPPORT_METHOD(43040, "Not support method for requestExpectedResource."),
NOT_SUPPORT_FUNCTION(43050, "NoT support spacial udf in non-SQL script."),
NOT_SUPPORT_FUNCTION(43050, "Not support spacial udf in non-SQL script.(特殊UDF不支持在非sql脚本中使用)"),
;

/** (errorCode)错误码 */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public static boolean containsUdf(MultiTreeNode multiTreeNode, String udfName) {
NamedExpression next = it.next();
if (next instanceof Alias) {
Alias alias = (Alias) next;
if (alias.name().contains(udfName)) {
return true;
}
Expression child = alias.child();
if (child instanceof ScalaUDF) {
ScalaUDF su = (ScalaUDF) child;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,23 @@ import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
import org.apache.linkis.engineplugin.spark.exception.RuleCheckFailedException
import org.apache.linkis.engineplugin.spark.extension.{
SparkPostExecutionHook,
SparkPreExecutionHook
}
import org.apache.linkis.engineplugin.spark.utils.JobProgressUtil
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.exception.LinkisJobRetryException
import org.apache.linkis.governance.common.exception.engineconn.{
EngineConnExecutorErrorCode,
EngineConnExecutorErrorException
}
import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.resource._
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
import org.apache.linkis.manager.label.utils.{LabelUtil, LabelUtils}
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer.ExecuteResponse

Expand Down Expand Up @@ -130,7 +128,8 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
}

// 正则匹配校验
if (ComputationExecutorConf.SPECIAL_UDF_CHECK_BY_REGEX_ENABLED.getValue) {
val udfNames: String = System.getProperty(ComputationExecutorConf.ONLY_SQL_USE_UDF_KEY, "")
if (StringUtils.isNotBlank(udfNames)) {
val codeType: String = LabelUtil.getCodeType(engineExecutorContext.getLabels.toList.asJava)
val languageType: String = CodeAndRunTypeUtils.getLanguageTypeByCodeType(codeType)
if (!CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL.equals(languageType)) {
Expand All @@ -139,9 +138,10 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)
val funcNames: Array[String] = udfNames.split(",")
funcNames.foreach(funcName => {
if (code.contains(funcName)) {
throw new EngineConnExecutorErrorException(
EngineConnExecutorErrorCode.ILLEGAL_USE_UDF_FUNCTION,
"非法使用UDF函数,特殊加解密UDF函数只能在sql脚本使用"
logger.info("contains specific functionName: {}", udfNames)
throw new RuleCheckFailedException(
SparkErrorCodeSummary.NOT_SUPPORT_FUNCTION.getErrorCode,
SparkErrorCodeSummary.NOT_SUPPORT_FUNCTION.getErrorDesc
)
}
})
Expand Down

0 comments on commit b56480d

Please sign in to comment.