From d99f967b074a6c834b3e6496baaa11408824bc46 Mon Sep 17 00:00:00 2001 From: peacewong Date: Tue, 10 Sep 2024 11:32:17 +0800 Subject: [PATCH] [Feature][Commons] Linkis Commons Function optimization (#5168) * Custom variable feature enhancement to support more script types for replacement, as well as the YYMMDDHH time-based replacement based on the task submission time. * LDAP Support cache * HDFS cache fs do not close * Support log request info * Optimize rpc retry * add new utils method * Fix build * Fix build --- .../common/exception/FatalException.java | 2 +- .../linkis/common/utils/LinkisUtils.java | 165 ++++++++++++++++++ .../apache/linkis/common/utils/MD5Utils.java | 45 +++++ .../common/utils/VariableOperationUtils.java | 19 +- .../linkis/common/conf/Configuration.scala | 26 ++- .../common/utils/CodeAndRunTypeUtils.scala | 8 +- .../linkis/common/utils/LDAPUtils.scala | 34 ++++ .../linkis/common/utils/VariableUtils.scala | 52 ++++-- .../exception/ExceptionManagerTest.java | 3 +- .../variable/VariableOperationTest.java | 15 +- .../hadoop/common/utils/HDFSUtils.scala | 118 ++++++++----- linkis-commons/linkis-module/pom.xml | 7 + .../linkis/DataWorkCloudApplication.java | 8 +- .../linkis/server/InterceptorConfigure.java | 31 ++++ .../linkis/server/PerformanceInterceptor.java | 57 ++++++ .../apache/linkis/server/Knife4jConfig.scala | 4 +- .../protocol/AbstractRetryableProtocol.java | 6 +- .../protocol/constants/TaskConstant.java | 2 +- .../linkis/protocol/RetryableProtocol.scala | 6 +- .../errorcode/LinkisRpcErrorCodeSummary.java | 2 + .../utils/LoadBalancerOptionsUtils.java | 42 +++++ .../linkis/rpc/conf/RPCConfiguration.scala | 32 +++- .../common/RetryableRPCInterceptor.scala | 10 +- .../rpc/sender/SpringMVCRPCSender.scala | 4 + .../storage/fs/impl/HDFSFileSystem.java | 2 +- .../linkis/storage/fs/impl/OSSFileSystem.java | 3 +- .../impl/DefaultECMRegisterService.scala | 2 +- .../parser/CommonEntranceParser.scala | 2 +- .../spark/imexport/LoadData.scala | 8 +- tool/dependencies/known-dependencies.txt | 1 + 30 files changed, 609 insertions(+), 107 deletions(-) create mode 100644 linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java create mode 100644 linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java create mode 100644 linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/InterceptorConfigure.java create mode 100644 linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java create mode 100644 linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/exception/FatalException.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/exception/FatalException.java index 4847d76d63..26a6992c05 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/exception/FatalException.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/exception/FatalException.java @@ -17,7 +17,7 @@ package org.apache.linkis.common.exception; -public class FatalException extends LinkisRuntimeException { +public class FatalException extends LinkisException { private ExceptionLevel level = ExceptionLevel.FATAL; public FatalException(int errCode, String desc) { diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java new file mode 100644 index 0000000000..353f80f1da --- /dev/null +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/LinkisUtils.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.common.utils; + +import org.apache.linkis.common.exception.ErrorException; +import org.apache.linkis.common.exception.FatalException; +import org.apache.linkis.common.exception.WarnException; + +import java.util.concurrent.Callable; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LinkisUtils { + private static final Logger logger = LoggerFactory.getLogger(LinkisUtils.class); + + public static T tryCatch(Callable tryOp, Function catchOp) { + T result = null; + try { + result = tryOp.call(); + } catch (Throwable t) { + if (t instanceof FatalException) { + logger.error("Fatal error, system exit...", t); + System.exit(((FatalException) t).getErrCode()); + } else if (t instanceof VirtualMachineError) { + logger.error("Fatal error, system exit...", t); + System.exit(-1); + } else if (null != t.getCause() + && (t.getCause() instanceof FatalException + || t.getCause() instanceof VirtualMachineError)) { + logger.error("Caused by fatal error, system exit...", t); + System.exit(-1); + } else if (t instanceof Error) { + logger.error("Throw error", t); + throw (Error) t; + } else { + result = catchOp.apply(t); + } + } + return result; + } + + public static void tryFinally(Runnable tryOp, Runnable finallyOp) { + try { + tryOp.run(); + } finally { + finallyOp.run(); + } + } + + public static T tryAndWarn(Callable tryOp, Logger log) { + return tryCatch( + tryOp, + t -> { + if (t instanceof ErrorException) { + ErrorException error = (ErrorException) t; + log.error( + "Warning code(警告码): {}, Warning message(警告信息): {}.", + error.getErrCode(), + error.getDesc(), + error); + + } else if (t instanceof WarnException) { + WarnException warn = (WarnException) t; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", + warn.getErrCode(), + warn.getDesc(), + warn); + + } else { + log.warn("", t); + } + return null; + }); + } + + public static void tryAndErrorMsg(Runnable tryOp, String message, Logger log) { + try { + tryOp.run(); + } catch (WarnException t) { + WarnException warn = (WarnException) t; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", warn.getErrCode(), warn.getDesc()); + log.warn(message, warn); + } catch (Exception t) { + log.warn(message, t); + } + } + + public static void tryAndWarn(Runnable tryOp, Logger log) { + try { + tryOp.run(); + } catch (Throwable error) { + if (error instanceof WarnException) { + WarnException warn = (WarnException) error; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", + warn.getErrCode(), + warn.getDesc(), + error); + } else { + log.warn("", error); + } + } + } + + public static void tryAndWarnMsg(Runnable tryOp, String message, Logger log) { + try { + tryOp.run(); + } catch (WarnException t) { + WarnException warn = (WarnException) t; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", warn.getErrCode(), warn.getDesc()); + log.warn(message, warn); + } catch (Exception t) { + log.warn(message, t); + } + } + + public static T tryAndWarnMsg(Callable tryOp, String message, Logger log) { + return tryCatch( + tryOp, + t -> { + if (t instanceof ErrorException) { + ErrorException error = (ErrorException) t; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", + error.getErrCode(), + error.getDesc()); + log.warn(message, error); + } else if (t instanceof WarnException) { + WarnException warn = (WarnException) t; + log.warn( + "Warning code(警告码): {}, Warning message(警告信息): {}.", + warn.getErrCode(), + warn.getDesc()); + log.warn(message, warn); + } else { + log.warn(message, t); + } + return null; + }); + } + + public static String getJvmUser() { + return System.getProperty("user.name"); + } +} diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java new file mode 100644 index 0000000000..1291b8bb68 --- /dev/null +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/MD5Utils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.common.utils; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class MD5Utils { + + /** + * @param plaintext + * @return + * @throws NoSuchAlgorithmException + */ + public static String encrypt(String plaintext) throws NoSuchAlgorithmException { + // 使用 MD5 算法创建 MessageDigest 对象 + MessageDigest md = MessageDigest.getInstance("MD5"); + // 更新 MessageDigest 对象中的字节数据 + md.update(plaintext.getBytes()); + // 对更新后的数据计算哈希值,存储在 byte 数组中 + byte[] digest = md.digest(); + // 将 byte 数组转换为十六进制字符串 + StringBuilder sb = new StringBuilder(); + for (byte b : digest) { + sb.append(String.format("%02x", b & 0xff)); + } + // 返回十六进制字符串 + return sb.toString(); + } +} diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java index 615472474d..d1cb59c397 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java @@ -17,12 +17,10 @@ package org.apache.linkis.common.utils; +import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.exception.VariableOperationFailedException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; +import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.Iterator; @@ -62,9 +60,16 @@ public class VariableOperationUtils { * @return */ public static ZonedDateTime toZonedDateTime(Date date, ZoneId zoneId) { - Instant instant = date.toInstant(); - LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime(); - return ZonedDateTime.of(localDateTime, zoneId); + if (Configuration.VARIABLE_OPERATION_USE_NOW()) { + LocalTime currentTime = LocalTime.now(); + LocalDate localDate = date.toInstant().atZone(zoneId).toLocalDate(); + LocalDateTime localDateTime = LocalDateTime.of(localDate, currentTime); + return ZonedDateTime.of(localDateTime, zoneId); + } else { + Instant instant = date.toInstant(); + LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime(); + return ZonedDateTime.of(localDateTime, zoneId); + } } /** diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala index 157c7389f4..ec65fd0a67 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala @@ -33,7 +33,7 @@ object Configuration extends Logging { val IS_PROMETHEUS_ENABLE = CommonVars("wds.linkis.prometheus.enable", false) - val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false) + val IS_MULTIPLE_YARN_CLUSTER = CommonVars("linkis.multiple.yarn.cluster", false).getValue val PROMETHEUS_ENDPOINT = CommonVars("wds.linkis.prometheus.endpoint", "/actuator/prometheus") @@ -70,7 +70,10 @@ object Configuration extends Logging { // Only the specified token has permission to call some api val GOVERNANCE_STATION_ADMIN_TOKEN_STARTWITH = "ADMIN-" - val VARIABLE_OPERATION: Boolean = CommonVars("wds.linkis.variable.operation", true).getValue + val VARIABLE_OPERATION_USE_NOW: Boolean = + CommonVars("wds.linkis.variable.operation.use.now", true).getValue + + val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true) val ERROR_MSG_TIP = CommonVars( @@ -78,6 +81,18 @@ object Configuration extends Logging { "The request interface %s is abnormal. You can try to troubleshoot common problems in the knowledge base document" ) + val LINKIS_TOKEN = CommonVars("wds.linkis.token", "LINKIS-AUTH-eTaYLbQpmIulPyrXcMl") + + val GLOBAL_CONF_CHN_NAME = "全局设置" + + val GLOBAL_CONF_CHN_OLDNAME = "通用设置" + + val GLOBAL_CONF_CHN_EN_NAME = "GlobalSettings" + + val GLOBAL_CONF_SYMBOL = "*" + + val GLOBAL_CONF_LABEL = "*-*,*-*" + def isAdminToken(token: String): Boolean = { if (StringUtils.isBlank(token)) { false @@ -137,4 +152,11 @@ object Configuration extends Logging { (adminUsers ++ historyAdminUsers).distinct } + def getGlobalCreator(creator: String): String = creator match { + case Configuration.GLOBAL_CONF_CHN_NAME | Configuration.GLOBAL_CONF_CHN_OLDNAME | + Configuration.GLOBAL_CONF_CHN_EN_NAME => + GLOBAL_CONF_SYMBOL + case _ => creator + } + } diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala index 917ac53261..e6e63a9779 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars import org.apache.commons.lang3.StringUtils +import java.util.Locale + import scala.collection.mutable object CodeAndRunTypeUtils { @@ -31,7 +33,7 @@ object CodeAndRunTypeUtils { */ val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars( "linkis.codeType.language.relation", - "sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc" + "sql=>sql|hql|jdbc|hive|psql|fql|tsql|nebula|ngql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc" ) val LANGUAGE_TYPE_SQL = "sql" @@ -117,7 +119,9 @@ object CodeAndRunTypeUtils { if (StringUtils.isBlank(codeType)) { return "" } - getLanguageTypeAndCodeTypeRelationMap.getOrElse(codeType, defaultLanguageType) + val lowerCaseCodeType = codeType.toLowerCase(Locale.getDefault) + getLanguageTypeAndCodeTypeRelationMap.getOrElse(lowerCaseCodeType, defaultLanguageType) + } /** diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala index b53184eceb..e021b9a482 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/LDAPUtils.scala @@ -19,12 +19,17 @@ package org.apache.linkis.common.utils import org.apache.linkis.common.conf.CommonVars +import org.apache.commons.codec.binary.Hex import org.apache.commons.lang3.StringUtils import javax.naming.Context import javax.naming.ldap.InitialLdapContext +import java.nio.charset.StandardCharsets import java.util.Hashtable +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} object LDAPUtils extends Logging { @@ -38,7 +43,33 @@ object LDAPUtils extends Logging { val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue val userNameFormat = CommonVars("wds.linkis.ldap.proxy.userNameFormat", "").getValue + private val storeUser: Cache[String, String] = CacheBuilder + .newBuilder() + .maximumSize(1000) + .expireAfterWrite(60, TimeUnit.MINUTES) + .removalListener(new RemovalListener[String, String] { + + override def onRemoval(removalNotification: RemovalNotification[String, String]): Unit = { + logger.info(s"store user remove key: ${removalNotification.getKey}") + } + + }) + .build() + def login(userID: String, password: String): Unit = { + + val saltPwd = storeUser.getIfPresent(userID) + if (StringUtils.isNotBlank(saltPwd)) { + Utils.tryAndWarn { + if ( + saltPwd.equalsIgnoreCase(Hex.encodeHexString(password.getBytes(StandardCharsets.UTF_8))) + ) { + logger.info(s"user $userID login success for storeUser") + return + } + } + } + val env = new Hashtable[String, String]() val bindDN = if (StringUtils.isBlank(userNameFormat)) userID @@ -53,6 +84,9 @@ object LDAPUtils extends Logging { env.put(Context.SECURITY_CREDENTIALS, bindPassword) new InitialLdapContext(env, null) + Utils.tryAndWarn { + storeUser.put(userID, Hex.encodeHexString(password.getBytes(StandardCharsets.UTF_8))) + } logger.info(s"user $userID login success.") } diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala index c79a169b4b..bd2fab4930 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala @@ -43,6 +43,8 @@ object VariableUtils extends Logging { val RUN_TODAY_H = "run_today_h" + val RUN_TODAY_HOUR = "run_today_hour" + private val codeReg = "\\$\\{\\s*[A-Za-z][A-Za-z0-9_\\.]*\\s*[\\+\\-\\*/]?\\s*[A-Za-z0-9_\\.]*\\s*\\}".r @@ -83,6 +85,13 @@ object VariableUtils extends Logging { nameAndType(RUN_TODAY_H) = HourType(runTodayH) } } + if (variables.containsKey(RUN_TODAY_HOUR)) { + val runTodayHourStr = variables.get(RUN_TODAY_HOUR).asInstanceOf[String] + if (StringUtils.isNotBlank(runTodayHourStr)) { + val runTodayHour = new CustomHourType(runTodayHourStr, false) + nameAndType(RUN_TODAY_HOUR) = HourType(runTodayHour) + } + } initAllDateVars(run_date, nameAndType) val codeOperation = parserVar(replaceStr, nameAndType) parserDate(codeOperation, run_date) @@ -141,6 +150,13 @@ object VariableUtils extends Logging { nameAndType(RUN_TODAY_H) = HourType(runTodayH) } } + if (variables.containsKey(RUN_TODAY_HOUR)) { + val runTodayHourStr = variables.get(RUN_TODAY_HOUR).asInstanceOf[String] + if (StringUtils.isNotBlank(runTodayHourStr)) { + val runTodayHour = new CustomHourType(runTodayHourStr, false) + nameAndType(RUN_TODAY_HOUR) = HourType(runTodayHour) + } + } initAllDateVars(run_date, nameAndType) val codeOperation = parserVar(code, nameAndType) parserDate(codeType, codeOperation, run_date) @@ -148,21 +164,13 @@ object VariableUtils extends Logging { @deprecated private def parserDate(code: String, run_date: CustomDateType): String = { - if (Configuration.VARIABLE_OPERATION) { - val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) - VariableOperationUtils.replaces(zonedDateTime, code) - } else { - code - } + val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) + VariableOperationUtils.replaces(zonedDateTime, code) } private def parserDate(codeType: String, code: String, run_date: CustomDateType): String = { - if (Configuration.VARIABLE_OPERATION) { - val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) - VariableOperationUtils.replaces(codeType, zonedDateTime, code) - } else { - code - } + val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) + VariableOperationUtils.replaces(codeType, zonedDateTime, code) } private def initAllDateVars( @@ -265,10 +273,30 @@ object VariableUtils extends Logging { nameAndType("run_today_h_std") = HourType( new CustomHourType(nameAndType(RUN_TODAY_H).asInstanceOf[HourType].getValue, true) ) + // calculate run_today_hour base on run_date + if (nameAndType.contains("run_today_hour")) { + nameAndType("run_today_hour").asInstanceOf[HourType] + } else { + val run_today_hour = new CustomHourType(getCurHour(false, run_today.toString), false) + nameAndType("run_today_hour") = HourType(run_today_hour) + } + nameAndType("run_today_hour_std") = HourType( + new CustomHourType(nameAndType("run_today_hour").asInstanceOf[HourType].getValue, true) + ) // calculate run_last_mon base on run_today val run_roday_mon = new CustomMonType(getMonthDay(false, run_today.getDate), false) nameAndType("run_last_mon_now") = MonType(new CustomMonType(run_roday_mon - 1, false, false)) nameAndType("run_last_mon_now_std") = MonType(new CustomMonType(run_roday_mon - 1, true, false)) + // calculate run_current_mon_now base on run_today + nameAndType("run_current_mon_now") = MonType( + new CustomMonType(run_roday_mon.toString, false, false) + ) + nameAndType("run_current_mon_now_std") = MonType( + new CustomMonType(run_roday_mon.toString, true, false) + ) + // calculate run_mon_now base on run_today + nameAndType("run_mon_now") = MonType(new CustomMonType(run_roday_mon.toString, false, false)) + nameAndType("run_mon_now_std") = MonType(new CustomMonType(run_roday_mon.toString, true, false)) } /** diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/exception/ExceptionManagerTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/exception/ExceptionManagerTest.java index 839a34f859..d45bebc125 100644 --- a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/exception/ExceptionManagerTest.java +++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/exception/ExceptionManagerTest.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.TreeMap; import org.junit.jupiter.api.Test; @@ -41,7 +40,7 @@ void testGenerateException() { + "null"); assertEquals(errorException.getClass(), ExceptionManager.generateException(null).getClass()); assertEquals(errorException.toString(), ExceptionManager.generateException(null).toString()); - Map map = new TreeMap<>(); + Map map = new HashMap<>(); map.put("level", null); map.put("errCode", 1); map.put("desc", "test"); diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java index b24bad2467..5d77cb323b 100644 --- a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java +++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/variable/VariableOperationTest.java @@ -38,21 +38,10 @@ public class VariableOperationTest { @Test public void testSqlFormat() throws VariableOperationFailedException { - String jsonOld = - "select \n" - + "\"&{yyyy-MM}\",\n" - + "\"&{yyyy-MM-dd HHmmss}\",\n" - + "\"&yyyyMMddHH\",\n" - + "\"&{yyyy-MM-dd-HH}\""; + String jsonOld = "select \n" + "\"&{yyyy-MM}\""; String jsonNew = VariableOperationUtils.replaces(zonedDateTime, jsonOld); System.out.println(jsonNew); - assertEquals( - jsonNew, - "select \n" - + "\"2022-04\",\n" - + "\"2022-04-02 173507\",\n" - + "\"&yyyyMMddHH\",\n" - + "\"2022-04-02-17\""); + assertEquals(jsonNew, "select \n" + "\"2022-04\""); } @Test diff --git a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala index 5e8d8a64fc..3ebbbc33ba 100644 --- a/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala +++ b/linkis-commons/linkis-hadoop-common/src/main/scala/org/apache/linkis/hadoop/common/utils/HDFSUtils.scala @@ -128,43 +128,75 @@ object HDFSUtils extends Logging { ) def getHDFSRootUserFileSystem(conf: org.apache.hadoop.conf.Configuration): FileSystem = - getHDFSUserFileSystem(HADOOP_ROOT_USER.getValue, conf) + getHDFSUserFileSystem(HADOOP_ROOT_USER.getValue, null, conf) - def getHDFSUserFileSystem(userName: String): FileSystem = - getHDFSUserFileSystem(userName, getConfiguration(userName)) + /** + * If the cache switch is turned on, fs will be obtained from the cache first + * @param userName + * @return + */ + def getHDFSUserFileSystem(userName: String): FileSystem = { + getHDFSUserFileSystem(userName, null) + } - def getHDFSUserFileSystem( - userName: String, - conf: org.apache.hadoop.conf.Configuration - ): FileSystem = getHDFSUserFileSystem(userName, null, conf) + def getHDFSUserFileSystem(userName: String, label: String): FileSystem = { + + if (HadoopConf.HDFS_ENABLE_CACHE) { + val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label + val cacheKey = userName + JOINT + cacheLabel + val locker = userName + LOCKER_SUFFIX + locker.intern().synchronized { + if (fileSystemCache.containsKey(cacheKey)) { + val hdfsFileSystemContainer = fileSystemCache.get(cacheKey) + hdfsFileSystemContainer.addAccessCount() + hdfsFileSystemContainer.updateLastAccessTime + hdfsFileSystemContainer.getFileSystem + } else { + getHDFSUserFileSystem(userName, label, getConfiguration(userName, label)) + } + } + } else { + getHDFSUserFileSystem(userName, label, getConfiguration(userName, label)) + } + } def getHDFSUserFileSystem( userName: String, label: String, conf: org.apache.hadoop.conf.Configuration - ): FileSystem = if (HadoopConf.HDFS_ENABLE_CACHE) { - val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label - val cacheKey = userName + JOINT + cacheLabel - val locker = cacheKey + LOCKER_SUFFIX - locker.intern().synchronized { - val hdfsFileSystemContainer = if (fileSystemCache.containsKey(cacheKey)) { - fileSystemCache.get(cacheKey) - } else { - // we use cacheLabel to create HDFSFileSystemContainer, and in the rest part of HDFSUtils, we consistently - // use the same cacheLabel to operate HDFSFileSystemContainer, like close or remove. - // At the same time, we don't want to change the behavior of createFileSystem which is out of HDFSUtils, - // so we continue to use the original label to createFileSystem. - val newHDFSFileSystemContainer = - new HDFSFileSystemContainer(createFileSystem(userName, label, conf), userName, cacheLabel) - fileSystemCache.put(cacheKey, newHDFSFileSystemContainer) - newHDFSFileSystemContainer + ): FileSystem = { + + if (HadoopConf.FS_CACHE_DISABLE.getValue && null != conf) { + conf.set("fs.hdfs.impl.disable.cache", "true") + } + if (HadoopConf.HDFS_ENABLE_CACHE) { + val locker = userName + LOCKER_SUFFIX + val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label + val cacheKey = userName + JOINT + cacheLabel + locker.intern().synchronized { + val hdfsFileSystemContainer = if (fileSystemCache.containsKey(cacheKey)) { + fileSystemCache.get(cacheKey) + } else { + // we use cacheLabel to create HDFSFileSystemContainer, and in the rest part of HDFSUtils, we consistently + // use the same cacheLabel to operate HDFSFileSystemContainer, like close or remove. + // At the same time, we don't want to change the behavior of createFileSystem which is out of HDFSUtils, + // so we continue to use the original label to createFileSystem. + val newHDFSFileSystemContainer = + new HDFSFileSystemContainer( + createFileSystem(userName, label, conf), + userName, + cacheLabel + ) + fileSystemCache.put(cacheKey, newHDFSFileSystemContainer) + newHDFSFileSystemContainer + } + hdfsFileSystemContainer.addAccessCount() + hdfsFileSystemContainer.updateLastAccessTime + hdfsFileSystemContainer.getFileSystem } - hdfsFileSystemContainer.addAccessCount() - hdfsFileSystemContainer.updateLastAccessTime - hdfsFileSystemContainer.getFileSystem + } else { + createFileSystem(userName, label, conf) } - } else { - createFileSystem(userName, label, conf) } def createFileSystem(userName: String, conf: org.apache.hadoop.conf.Configuration): FileSystem = @@ -174,16 +206,19 @@ object HDFSUtils extends Logging { userName: String, label: String, conf: org.apache.hadoop.conf.Configuration - ): FileSystem = + ): FileSystem = { + val createCount = count.getAndIncrement() + logger.info(s"user ${userName} to create Fs, create time ${createCount}") getUserGroupInformation(userName, label) .doAs(new PrivilegedExceptionAction[FileSystem] { - // scalastyle:off FileSystemGet - def run: FileSystem = FileSystem.get(conf) - // scalastyle:on FileSystemGet + def run: FileSystem = FileSystem.newInstance(conf) }) + } def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String): Unit = - closeHDFSFIleSystem(fileSystem, userName, null, false) + if (null != fileSystem && StringUtils.isNotBlank(userName)) { + closeHDFSFIleSystem(fileSystem, userName, null, false) + } def closeHDFSFIleSystem(fileSystem: FileSystem, userName: String, label: String): Unit = closeHDFSFIleSystem(fileSystem, userName, label, false) @@ -198,23 +233,28 @@ object HDFSUtils extends Logging { isForce: Boolean ): Unit = if (null != fileSystem && StringUtils.isNotBlank(userName)) { - if (HadoopConf.HDFS_ENABLE_CACHE) { + val locker = userName + LOCKER_SUFFIX + if (HadoopConf.HDFS_ENABLE_CACHE) locker.intern().synchronized { val cacheLabel = if (label == null) DEFAULT_CACHE_LABEL else label val cacheKey = userName + JOINT + cacheLabel val hdfsFileSystemContainer = fileSystemCache.get(cacheKey) - if (null != hdfsFileSystemContainer) { - val locker = cacheKey + LOCKER_SUFFIX + if ( + null != hdfsFileSystemContainer && fileSystem == hdfsFileSystemContainer.getFileSystem + ) { if (isForce) { - locker synchronized fileSystemCache.remove(cacheKey) + fileSystemCache.remove(hdfsFileSystemContainer.getUser) IOUtils.closeQuietly(hdfsFileSystemContainer.getFileSystem) logger.info( s"user${hdfsFileSystemContainer.getUser} to Force remove hdfsFileSystemContainer" ) } else { - locker synchronized hdfsFileSystemContainer.minusAccessCount() + hdfsFileSystemContainer.minusAccessCount() } + } else { + IOUtils.closeQuietly(fileSystem) } - } else { + } + else { IOUtils.closeQuietly(fileSystem) } } diff --git a/linkis-commons/linkis-module/pom.xml b/linkis-commons/linkis-module/pom.xml index 328480f25b..cb8d23c095 100644 --- a/linkis-commons/linkis-module/pom.xml +++ b/linkis-commons/linkis-module/pom.xml @@ -266,6 +266,13 @@ org.springframework.cloud spring-cloud-openfeign-core + + + org.springframework.retry + spring-retry + 1.3.4 + + diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java index 5610ded412..10ab8f9268 100644 --- a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java +++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/DataWorkCloudApplication.java @@ -26,8 +26,6 @@ import org.apache.linkis.server.conf.ServerConfiguration; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; @@ -50,6 +48,7 @@ import org.springframework.core.env.Environment; import org.springframework.core.env.PropertySource; import org.springframework.core.env.StandardEnvironment; +import org.springframework.retry.annotation.EnableRetry; import org.springframework.web.filter.CharacterEncodingFilter; import javax.servlet.DispatcherType; @@ -63,13 +62,16 @@ import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.webapp.WebAppContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SpringBootApplication(scanBasePackages = {"org.apache.linkis", "com.webank.wedatasphere"}) @EnableDiscoveryClient @RefreshScope @EnableFeignClients +@EnableRetry public class DataWorkCloudApplication extends SpringBootServletInitializer { - private static final Log logger = LogFactory.getLog(DataWorkCloudApplication.class); + private static final Logger logger = LoggerFactory.getLogger(DataWorkCloudApplication.class); private static ConfigurableApplicationContext applicationContext; private static ServiceInstance serviceInstance; diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/InterceptorConfigure.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/InterceptorConfigure.java new file mode 100644 index 0000000000..c9c52fc430 --- /dev/null +++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/InterceptorConfigure.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.server; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.InterceptorRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class InterceptorConfigure implements WebMvcConfigurer { + + @Override + public void addInterceptors(InterceptorRegistry registry) { + registry.addInterceptor(new PerformanceInterceptor()); + } +} diff --git a/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java new file mode 100644 index 0000000000..2a9cb2dd02 --- /dev/null +++ b/linkis-commons/linkis-module/src/main/java/org/apache/linkis/server/PerformanceInterceptor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.server; + +import org.apache.linkis.utils.LinkisSpringUtils; + +import org.springframework.web.servlet.HandlerInterceptor; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerformanceInterceptor implements HandlerInterceptor { + + private static final Logger logger = LoggerFactory.getLogger(PerformanceInterceptor.class); + + @Override + public boolean preHandle( + HttpServletRequest request, HttpServletResponse response, Object handler) { + request.setAttribute("Linkis_startTime", System.currentTimeMillis()); + return true; + } + + @Override + public void afterCompletion( + HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) { + Object startObject = request.getAttribute("Linkis_startTime"); + if (null != startObject) { + long startTime = (Long) startObject; + long endTime = System.currentTimeMillis(); + long executeTime = endTime - startTime; + logger.info( + "Request client address:{} request URL: {} Method: {} taken: {} ms", + LinkisSpringUtils.getClientIP(request), + request.getRequestURI(), + request.getMethod(), + executeTime); + } + } +} diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala index c454aaacb2..23d07bdf90 100644 --- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala +++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/Knife4jConfig.scala @@ -47,9 +47,7 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc * 4, in your browser,add dataworkcloud_inner_request=true, bdp-user-ticket-id's value and workspaceId's value into cookie * */ -@EnableSwagger2WebMvc -@EnableKnife4j -@Configuration + class Knife4jConfig extends WebMvcConfigurer { @Value("${spring.application.name}") private var appName = "linkis service" diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java index aa7ddece50..3dfd166846 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/AbstractRetryableProtocol.java @@ -21,7 +21,7 @@ public class AbstractRetryableProtocol implements RetryableProtocol { @Override public long maxPeriod() { - return 30000L; + return 3000L; } @Override @@ -31,11 +31,11 @@ public Class[] retryExceptions() { @Override public int retryNum() { - return 5; + return 2; } @Override public long period() { - return 10000L; + return 1000L; } } diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java index a90a1eb3b7..48d9bb4846 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java @@ -63,7 +63,7 @@ public interface TaskConstant { String JOB_MEMORY_PERCENT = "memoryPercent"; String JOB_CORE_RGB = "coreRGB"; String JOB_MEMORY_RGB = "memoryRGB"; - + String JOB_IS_REUSE = "isReuse"; String JOB_ENGINECONN_MAP = "engineconnMap"; String ENGINE_INSTANCE = "engineInstance"; String TICKET_ID = "ticketId"; diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala index 6ebee7d0e2..51509d6883 100644 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/RetryableProtocol.scala @@ -18,8 +18,8 @@ package org.apache.linkis.protocol trait RetryableProtocol extends Protocol { - def retryNum: Int = 5 - def period: Long = 10000L - def maxPeriod: Long = 30000L + def retryNum: Int = 2 + def period: Long = 1000L + def maxPeriod: Long = 3000L def retryExceptions: Array[Class[_ <: Throwable]] = Array.empty[Class[_ <: Throwable]] } diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java index a8daece891..5aabaccea2 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java @@ -28,6 +28,8 @@ public enum LinkisRpcErrorCodeSummary implements LinkisErrorCode { 10003, "The corresponding anti-sequence class was not found:{0}(找不到对应的反序列类:{0})"), CORRESPONDING_TO_INITIALIZE( 10004, "The corresponding anti-sequence class:{0} failed to initialize(对应的反序列类:{0} 初始化失败)"), + CORRESPONDING_CLASS_ILLEGAL( + 10005, "The corresponding anti-sequence class:{0} is illegal (对应的反序列类:{0} 不合法)"), APPLICATION_IS_NOT_EXISTS( 10051, "The instance:{0} of application {1} does not exist(应用程序:{0} 的实例:{1} 不存在)."), diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java new file mode 100644 index 0000000000..93762d1f30 --- /dev/null +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/message/utils/LoadBalancerOptionsUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.rpc.message.utils; + +import java.lang.reflect.Field; + +import feign.Request.Options; + +public class LoadBalancerOptionsUtils { + + private static Options DEFAULT_OPTIONS = null; + + private static Object locker = new Object(); + + public static Options getDefaultOptions() throws NoSuchFieldException, IllegalAccessException { + if (null == DEFAULT_OPTIONS) { + synchronized (locker) { + Class clazz = null; + Field optionField = clazz.getDeclaredField("DEFAULT_OPTIONS"); + optionField.setAccessible(true); + Object o = optionField.get(clazz); + DEFAULT_OPTIONS = (Options) o; + } + } + return DEFAULT_OPTIONS; + } +} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala index bbe2d4acd3..dd52687f71 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala @@ -97,13 +97,17 @@ object RPCConfiguration { val SERVICE_SCAN_PACKAGE: String = CommonVars("wds.linkis.ms.service.scan.package", "org.apache.linkis").getValue - val ENABLE_SPRING_PARAMS: Boolean = - CommonVars("wds.linkis.rpc.spring.params.enable", false).getValue - // unit is HOUR val SENDER_CACHE_CLEANING_HOUR = CommonVars("linkis.rpc.sender.cache.cleaning.time.hour", 6).getValue + // unit is HOUR + val RPC_RETRY_NUMBER = + CommonVars("linkis.rpc.retry.number", 5).getValue + + val RPC_RETRY_PERIOD = + CommonVars[Long]("linkis.rpc.retry.period", 30000L).getValue + val REFLECTIONS = new Reflections( SERVICE_SCAN_PACKAGE, new MethodAnnotationsScanner(), @@ -114,6 +118,15 @@ object RPCConfiguration { val BDP_RPC_CACHE_CONF_EXPIRE_TIME: CommonVars[Long] = CommonVars("wds.linkis.rpc.cache.expire.time", 120000L) + val ENABLE_SPRING_PARAMS: Boolean = + CommonVars("wds.linkis.rpc.spring.params.enable", false).getValue + + val RPC_READ_TIME_OUT: Int = + CommonVars[Int]("spring.ribbon.ReadTimeout", 100000).getValue + + val RPC_CONNECT_TIME_OUT: Int = + CommonVars[Int]("spring.ribbon.ConnectTimeout", 100000).getValue + val CONTEXT_SERVICE_REQUEST_PREFIX = "contextservice" val CONTEXT_SERVICE_NAME: String = @@ -126,4 +139,17 @@ object RPCConfiguration { CONTEXT_SERVICE_APPLICATION_NAME.getValue } + val configOptions: feign.Request.Options = + new feign.Request.Options(RPC_CONNECT_TIME_OUT, RPC_READ_TIME_OUT, true) + + val RPC_OBJECT_PREFIX_WHITE_LIST: Array[String] = + CommonVars( + "wds.linkis.rpc.object.class.prefix.whitelist", + "org.apache.linkis,com.webank.wedatasphere,com.wedatasphere" + ).getValue + .split(",") + + val ENABLE_RPC_OBJECT_PREFIX_WHITE_LIST_CHECK: Boolean = + CommonVars("wds.linkis.rpc.object.class.prefix.whitelist.check.enable", true).getValue + } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala index d835eef328..0fc2a39100 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/interceptor/common/RetryableRPCInterceptor.scala @@ -21,6 +21,7 @@ import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.exception.LinkisRetryException import org.apache.linkis.common.utils.RetryHandler import org.apache.linkis.protocol.RetryableProtocol +import org.apache.linkis.rpc.conf.RPCConfiguration import org.apache.linkis.rpc.exception.DWCRPCRetryException import org.apache.linkis.rpc.interceptor.{ RPCInterceptor, @@ -34,7 +35,7 @@ import org.apache.commons.lang3.StringUtils import org.springframework.stereotype.Component -import java.net.ConnectException +import java.net.{ConnectException, SocketTimeoutException} import feign.RetryableException @@ -57,12 +58,13 @@ class RetryableRPCInterceptor extends RPCInterceptor { class RPCRetryHandler extends RetryHandler { addRetryException(classOf[ConnectException]) addRetryException(classOf[RetryableException]) + addRetryException(classOf[SocketTimeoutException]) private var serviceInstance: Option[ServiceInstance] = None def setRetryInfo(retry: RetryableProtocol, chain: RPCInterceptorChain): Unit = { - setRetryNum(retry.retryNum) - setRetryPeriod(retry.period) - setRetryMaxPeriod(retry.maxPeriod) + setRetryNum(RPCConfiguration.RPC_RETRY_NUMBER) + setRetryPeriod(RPCConfiguration.RPC_RETRY_PERIOD) + setRetryMaxPeriod(RPCConfiguration.RPC_RETRY_PERIOD * 2) retry.retryExceptions.foreach(addRetryException) chain match { case s: ServiceInstanceRPCInterceptorChain => diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala index ae1070865a..9bb2fdea96 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala @@ -20,6 +20,7 @@ package org.apache.linkis.rpc.sender import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.utils.Logging import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache} +import org.apache.linkis.rpc.conf.RPCConfiguration import org.apache.linkis.rpc.interceptor.{RPCInterceptor, ServiceInstanceRPCInterceptorChain} import org.apache.linkis.server.conf.ServerConfiguration @@ -56,6 +57,9 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( }) } super.doBuilder(builder) + if (RPCConfiguration.ENABLE_SPRING_PARAMS) { + builder.options(RPCConfiguration.configOptions) + } if (StringUtils.isBlank(serviceInstance.getInstance)) { builder .contract(getContract) diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java index 56d846b31d..f40d75c040 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java @@ -182,7 +182,7 @@ public void init(Map properties) throws IOException { throw new IOException("User cannot be empty(用户不能为空)"); } - if (label == null && (boolean) Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) { + if (label == null && (boolean) Configuration.IS_MULTIPLE_YARN_CLUSTER()) { label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue(); } conf = HDFSUtils.getConfigurationByLabel(user, label); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java index 16e5beb539..2d52b83049 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/OSSFileSystem.java @@ -152,8 +152,7 @@ public FsPathListWithError listPathWithError(FsPath path) throws IOException { public void init(Map properties) throws IOException { // read origin configs from hadoop conf if (label == null - && (boolean) - org.apache.linkis.common.conf.Configuration.IS_MULTIPLE_YARN_CLUSTER().getValue()) { + && (boolean) org.apache.linkis.common.conf.Configuration.IS_MULTIPLE_YARN_CLUSTER()) { label = StorageConfiguration.LINKIS_STORAGE_FS_LABEL.getValue(); } conf = HDFSUtils.getConfigurationByLabel(user, label); diff --git a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala index d88f270862..51f223e476 100644 --- a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala +++ b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/service/impl/DefaultECMRegisterService.scala @@ -62,7 +62,7 @@ class DefaultECMRegisterService extends ECMRegisterService with ECMEventListener ENGINE_CONN_MANAGER_SPRING_NAME ) - if (Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) { + if (Configuration.IS_MULTIPLE_YARN_CLUSTER) { labels.asScala += LabelKeyConstant.YARN_CLUSTER_KEY -> (ECM_YARN_CLUSTER_TYPE + "_" + ECM_YARN_CLUSTER_NAME) } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala index afc18bdc19..58fc1f45c3 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/parser/CommonEntranceParser.scala @@ -195,7 +195,7 @@ class CommonEntranceParser(val persistenceManager: PersistenceManager) } private def generateAndVerifyClusterLabel(labels: util.Map[String, Label[_]]): Unit = { - if (!Configuration.IS_MULTIPLE_YARN_CLUSTER.getValue.asInstanceOf[Boolean]) { + if (!Configuration.IS_MULTIPLE_YARN_CLUSTER) { return } var clusterLabel = labels diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala index cb4970bbe2..dafc90a06d 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala @@ -105,7 +105,7 @@ object LoadData { if (".xls".equalsIgnoreCase(suffix)) { val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue) config.setBoolean("fs.hdfs.impl.disable.cache", true) - fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), config) + fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config) path = XlsUtils.excelToCsv(fs.open(new Path(path)), fs, hasHeader, sheetNames) hasHeader = false } else { @@ -117,13 +117,13 @@ object LoadData { } else if (".xls".equalsIgnoreCase(suffix)) { val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue) config.setBoolean("fs.hdfs.impl.disable.cache", true) - fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), config) + fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config) path = XlsUtils.excelToCsv(new FileInputStream(path), fs, hasHeader, sheetNames) hasHeader = false } else { val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue) config.setBoolean("fs.hdfs.impl.disable.cache", true) - fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), config) + fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config) path = copyFileToHdfs(path, fs) } } @@ -193,7 +193,7 @@ object LoadData { } finally { if (fs != null) { fs.delete(new Path(path), true) - fs.close() + // fs.close() } } // warn(s"create table $database $tableName Success") diff --git a/tool/dependencies/known-dependencies.txt b/tool/dependencies/known-dependencies.txt index 652c878957..b8c73b6c88 100644 --- a/tool/dependencies/known-dependencies.txt +++ b/tool/dependencies/known-dependencies.txt @@ -739,6 +739,7 @@ spring-tx-5.3.27.jar spring-web-5.3.27.jar spring-webflux-5.3.27.jar spring-webmvc-5.3.27.jar +spring-retry-1.3.4.jar springfox-bean-validators-3.0.0.jar springfox-boot-starter-3.0.0.jar springfox-core-3.0.0.jar