Skip to content

Commit

Permalink
Merge branch 'dev-1.7.0-webank' of https://github.com/WeDataSphere/li…
Browse files Browse the repository at this point in the history
…nkis into dev-1.7.0-webank

# Conflicts:
#	linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java
#	linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala
#	linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/transform/RPCConsumer.scala
#	linkis-public-enhancements/linkis-cs-server/src/main/java/org/apache/linkis/cs/condition/construction/ContextValueTypeConditionParser.java
#	linkis-public-enhancements/linkis-cs-server/src/main/java/org/apache/linkis/cs/conf/CSConfiguration.java
  • Loading branch information
taoran1250 committed Aug 16, 2024
2 parents f61fa4c + 6fa215b commit e0c91f7
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.linkis.server;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@EnableWebMvc
@Configuration
public class InterceptorConfigure implements WebMvcConfigurer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,24 @@ public class PerformanceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(
HttpServletRequest request, HttpServletResponse response, Object handler) {
request.setAttribute("startTime", System.currentTimeMillis());
request.setAttribute("Linkis_startTime", System.currentTimeMillis());
return true;
}

@Override
public void afterCompletion(
HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
long startTime = (Long) request.getAttribute("startTime");
long endTime = System.currentTimeMillis();
long executeTime = endTime - startTime;
logger.info(
"Request client address:{} request URL: {} Method: {} taken: {} ms",
LinkisSpringUtils.getClientIP(request),
request.getRequestURI(),
request.getMethod(),
executeTime);
Object startObject = request.getAttribute("Linkis_startTime");
if (null != startObject) {
long startTime = (Long) startObject;
long endTime = System.currentTimeMillis();
long executeTime = endTime - startTime;
logger.info(
"Request client address:{} request URL: {} Method: {} taken: {} ms",
LinkisSpringUtils.getClientIP(request),
request.getRequestURI(),
request.getMethod(),
executeTime);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public enum LinkisRpcErrorCodeSummary implements LinkisErrorCode {
CORRESPONDING_TO_INITIALIZE(
10004, "The corresponding anti-sequence class:{0} failed to initialize(对应的反序列类:{0} 初始化失败)"),
CORRESPONDING_CLASS_ILLEGAL(
10005,
"The corresponding anti-sequence class:{0} is illegal (对应的反序列类:{0} 不合法)"),
10005, "The corresponding anti-sequence class:{0} is illegal (对应的反序列类:{0} 不合法)"),
APPLICATION_IS_NOT_EXISTS(
10051, "The instance:{0} of application {1} does not exist(应用程序:{0} 的实例:{1} 不存在)."),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,10 @@ object RPCConfiguration {
new feign.Request.Options(RPC_CONNECT_TIME_OUT, RPC_READ_TIME_OUT, true)

val RPC_OBJECT_PREFIX_WHITE_LIST: Array[String] =
CommonVars(
"wds.linkis.rpc.object.class.prefix.whitelist",
"org.apache.linkis,com.webank.wedatasphere.dss,com.webank.wedatasphere.exchangis,com.webank.wedatasphere.streamis,com.webank.wedatasphere.visualis,com.wedatasphere.dss.visualishub"
).getValue
CommonVars("wds.linkis.rpc.object.class.prefix.whitelist", "org.apache.linkis").getValue
.split(",")

val ENABLE_RPC_OBJECT_PREFIX_WHITE_LIST_CHECK: Boolean =
CommonVars("wds.linkis.rpc.object.class.prefix.whitelist.check.enable", true).getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ package org.apache.linkis.rpc.transform
import org.apache.linkis.common.exception.ExceptionManager
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.rpc.conf.RPCConfiguration
import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary.CORRESPONDING_CLASS_ILLEGAL
import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary.CORRESPONDING_NOT_FOUND
import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary.CORRESPONDING_TO_INITIALIZE
import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary.CORRESPONDING_CLASS_ILLEGAL
import org.apache.linkis.rpc.exception.DWCURIException
import org.apache.linkis.rpc.serializer.ProtostuffSerializeUtil
import org.apache.linkis.server.{EXCEPTION_MSG, JMap, Message}

import java.text.MessageFormat

import scala.runtime.BoxedUnit

import org.slf4j.LoggerFactory

private[linkis] trait RPCConsumer {
Expand All @@ -51,7 +53,10 @@ private[linkis] object RPCConsumer {
val objectStr = data.get(OBJECT_VALUE).toString
val objectClass = data.get(CLASS_VALUE).toString
logger.debug("The corresponding anti-sequence is class {}", objectClass)
if (RPCConfiguration.ENABLE_RPC_OBJECT_PREFIX_WHITE_LIST_CHECK && !RPCConfiguration.RPC_OBJECT_PREFIX_WHITE_LIST.exists(prefix => objectClass.startsWith(prefix))) {
if (
RPCConfiguration.ENABLE_RPC_OBJECT_PREFIX_WHITE_LIST_CHECK && !RPCConfiguration.RPC_OBJECT_PREFIX_WHITE_LIST
.exists(prefix => objectClass.startsWith(prefix))
) {
throw new DWCURIException(
CORRESPONDING_CLASS_ILLEGAL.getErrorCode,
MessageFormat.format(CORRESPONDING_CLASS_ILLEGAL.getErrorDesc, objectClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ object ResultSetAction {
private var nullValue: String = "LINKIS_NULL"

private var enableLimit: Boolean = false
private var columnPage: Int = _
private var columnPageSize: Int = _

def setUser(user: String): Builder = {
this.user = user
Expand Down Expand Up @@ -75,6 +77,16 @@ object ResultSetAction {
this
}

def setColumnPage(columnPage: Int): Builder = {
this.columnPage = columnPage
this
}

def setColumnPageSize(columnPageSize: Int): Builder = {
this.columnPageSize = columnPageSize
this
}

def build(): ResultSetAction = {
if (user == null) throw new UJESClientBuilderException("user is needed!")
if (path == null) throw new UJESClientBuilderException("path is needed!")
Expand All @@ -85,6 +97,8 @@ object ResultSetAction {
resultSetAction.setParameter("charset", charset)
resultSetAction.setParameter("enableLimit", enableLimit)
resultSetAction.setParameter("nullValue", nullValue)
resultSetAction.setParameter("columnPage", columnPage)
resultSetAction.setParameter("columnPageSize", columnPageSize)
resultSetAction.setUser(user)
resultSetAction
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.linkis.manager.common.entity.resource.{NodeResource, Resource,
import org.apache.linkis.manager.common.protocol.em.GetEMInfoRequest
import org.apache.linkis.manager.common.protocol.node.NodeHealthyRequest
import org.apache.linkis.manager.common.utils.ResourceUtils
import org.apache.linkis.manager.label.entity.em.EMInstanceLabel
import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel
import org.apache.linkis.manager.label.service.NodeLabelService
import org.apache.linkis.manager.label.utils.LabelUtil
Expand Down Expand Up @@ -147,11 +148,11 @@ class DefaultEMInfoService extends EMInfoService with Logging {

override def resetResource(serviceInstance: String, username: String): Unit = {
// ECM开关
if (AMConfiguration.AM_ECM_RESET_RESOURCE) {
val filteredECMs = if (StringUtils.isNotBlank(serviceInstance)) {
getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance))
} else {
if (AMConfiguration.AM_ECM_RESET_RESOURCE && StringUtils.isNotBlank(serviceInstance)) {
val filteredECMs = if (serviceInstance.equals("*")) {
getAllEM()
} else {
getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance))
}
// 遍历处理ECM
filteredECMs.foreach { ecmInstance =>
Expand All @@ -162,8 +163,9 @@ class DefaultEMInfoService extends EMInfoService with Logging {
ecmInstance.getServiceInstance.getInstance
)
)
val eMInstanceLabel = ecmInstance.getLabels.filter(_.isInstanceOf[EMInstanceLabel]).head
val lock =
resourceManager.tryLockOneLabel(ecmInstance.getLabels.head, -1, Utils.getJvmUser)
resourceManager.tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser)
engineInfoService
.updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy)
Utils.tryFinally {
Expand All @@ -174,23 +176,23 @@ class DefaultEMInfoService extends EMInfoService with Logging {
val (realSumResource, useResource, lockResource) =
collectResource(nodeResource, ResourceType.LoadInstance)
// 收集ECM资源
val ecmResource =
ecmInstance.getNodeResource.getUsedResource + ecmInstance.getNodeResource.getLockedResource
val ecmNodeResource = ecmInstance.getNodeResource
// 资源对比,资源重置
if (!(ecmResource == realSumResource)) {
if (
(!(useResource == ecmNodeResource.getUsedResource)) || (!(lockResource == ecmNodeResource.getLockedResource))
) {
logger.info(
MessageFormat.format(
"ECM:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
ecmInstance.getServiceInstance.getInstance,
ecmResource,
ecmNodeResource.getUsedResource + ecmNodeResource.getLockedResource,
realSumResource
)
)
val ecmNodeResource = ecmInstance.getNodeResource
ecmNodeResource.setLockedResource(lockResource)
ecmNodeResource.setLeftResource(ecmNodeResource.getMaxResource - realSumResource)
ecmNodeResource.setUsedResource(useResource)
val persistence = ResourceUtils.toPersistenceResource(ecmInstance.getNodeResource)
val persistence = ResourceUtils.toPersistenceResource(ecmNodeResource)
val resourceLabel = labelManagerPersistence.getLabelByResource(persistence)
resourceManager.resetResource(resourceLabel.head, ecmNodeResource)
}
Expand All @@ -209,12 +211,12 @@ class DefaultEMInfoService extends EMInfoService with Logging {
}

// 用户资源重置
if (AMConfiguration.AM_USER_RESET_RESOURCE) {
if (AMConfiguration.AM_USER_RESET_RESOURCE && StringUtils.isNotBlank(username)) {
// 获取用户的标签
val user = if (StringUtils.isNotBlank(username)) {
username
} else {
val user = if (username.equals("*")) {
""
} else {
username
}
val labelValuePattern =
MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "")
Expand All @@ -235,41 +237,38 @@ class DefaultEMInfoService extends EMInfoService with Logging {
val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser)
Utils.tryFinally {
val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource)
val userLabelResourceSum =
userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource
val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType)
val matchResult = userLabelResourceSum.caseMore(Resource.initResource(userResourceType))
if (matchResult) {
val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true)
val userEngineNodeFilter = userEngineNodes
.filter { node =>
val userCreatorLabelStr =
LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue
val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
userLabelResource.getCreator.equalsIgnoreCase(
s"${userCreatorLabelStr},${engineTypeLabelStr}"
)
}
.map(_.getNodeResource)
// 收集所有node所使用的资源(汇总、已使用、上锁)
val (sumResource, uedResource, lockResource) =
collectResource(userEngineNodeFilter, userResourceType)
if (!(sumResource == userLabelResourceSum)) {
logger.info(
MessageFormat.format(
"LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
labelUser,
userLabelResourceSum,
sumResource
)
val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true)
val userEngineNodeFilter = userEngineNodes
.filter { node =>
val userCreatorLabelStr =
LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue
val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
userLabelResource.getCreator.equalsIgnoreCase(
s"${userCreatorLabelStr},${engineTypeLabelStr}"
)
userPersistenceResource.setLeftResource(
userPersistenceResource.getMaxResource - sumResource
)
userPersistenceResource.setUsedResource(uedResource)
userPersistenceResource.setLockedResource(lockResource)
resourceManager.resetResource(resourceLabel.head, userPersistenceResource)
}
.map(_.getNodeResource)
// 收集所有node所使用的资源(汇总、已使用、上锁)
val (sumResource, uedResource, lockResource) =
collectResource(userEngineNodeFilter, userResourceType)
if (
(!(uedResource == userPersistenceResource.getUsedResource)) || (!(lockResource == userPersistenceResource.getLockedResource))
) {
logger.info(
MessageFormat.format(
"LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
labelUser,
userPersistenceResource.getUsedResource + userPersistenceResource.getLockedResource,
sumResource
)
)
userPersistenceResource.setLeftResource(
userPersistenceResource.getMaxResource - sumResource
)
userPersistenceResource.setUsedResource(uedResource)
userPersistenceResource.setLockedResource(lockResource)
resourceManager.resetResource(resourceLabel.head, userPersistenceResource)
}
} {
resourceManager.unLock(lock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.manager.am.service.engine

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.LinkisRetryException
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
Expand All @@ -25,10 +26,6 @@ import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.conf.GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME
import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.manager.am.conf.{AMConfiguration, EngineConnConfigurationService}
import org.apache.linkis.manager.am.conf.AMConfiguration.{
HIVE_CLUSTER_EC_EXECUTE_ONCE_RULE_ENABLE,
SUPPORT_CLUSTER_RULE_EC_TYPES
}
import org.apache.linkis.manager.am.exception.AMErrorException
import org.apache.linkis.manager.am.label.EngineReuseLabelChooser
import org.apache.linkis.manager.am.selector.{ECAvailableRule, NodeSelector}
Expand All @@ -39,34 +36,26 @@ import org.apache.linkis.manager.common.entity.node.{EMNode, EngineNode}
import org.apache.linkis.manager.common.entity.resource.NodeResource
import org.apache.linkis.manager.common.protocol.engine.{EngineCreateRequest, EngineStopRequest}
import org.apache.linkis.manager.common.utils.ManagerUtils
import org.apache.linkis.manager.engineplugin.common.launch.entity.{
EngineConnBuildRequestImpl,
EngineConnCreationDescImpl
}
import org.apache.linkis.manager.engineplugin.common.launch.entity.{EngineConnBuildRequestImpl, EngineConnCreationDescImpl}
import org.apache.linkis.manager.engineplugin.common.resource.TimeoutEngineResourceRequest
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label}
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel
import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel
import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel
import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label}
import org.apache.linkis.manager.label.service.{NodeLabelService, UserLabelService}
import org.apache.linkis.manager.label.utils.{LabelUtil, LabelUtils}
import org.apache.linkis.manager.label.utils.LabelUtils
import org.apache.linkis.manager.persistence.NodeMetricManagerPersistence
import org.apache.linkis.manager.rm.{AvailableResource, NotEnoughResource}
import org.apache.linkis.manager.rm.service.ResourceManager
import org.apache.linkis.manager.rm.{AvailableResource, NotEnoughResource}
import org.apache.linkis.manager.service.common.label.{LabelChecker, LabelFilter}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.message.annotation.Receiver
import org.apache.linkis.server.BDPJettyServerHelper

import org.apache.commons.lang3.StringUtils

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service

import java.util
import java.util.concurrent.{TimeoutException, TimeUnit}

import java.util.concurrent.{TimeUnit, TimeoutException}
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.manager.rm.service

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.manager.am.conf.AMConfiguration
import org.apache.linkis.manager.am.conf.AMConfiguration.{
SUPPORT_CLUSTER_RULE_EC_TYPES,
YARN_QUEUE_NAME_CONFIG_KEY
Expand Down Expand Up @@ -171,11 +172,20 @@ abstract class RequestResourceService(labelResourceService: LabelResourceService
val labels: util.List[Label[_]] = labelContainer.getLabels
val engineType: String = LabelUtil.getEngineType(labels)
val props: util.Map[String, String] = engineCreateRequest.getProperties

// 是否是跨集群的任务
var acrossClusterTask: Boolean = false
if (props != null) {
acrossClusterTask = props.getOrDefault(AMConfiguration.ACROSS_CLUSTER_TASK, "false").toBoolean
}

// hive cluster check
if (
externalResourceService != null && StringUtils.isNotBlank(
engineType
) && SUPPORT_CLUSTER_RULE_EC_TYPES.contains(engineType) && props != null
) && SUPPORT_CLUSTER_RULE_EC_TYPES.contains(
engineType
) && props != null && acrossClusterTask && !"spark".equals(engineType)
) {
val queueName = props.getOrDefault(YARN_QUEUE_NAME_CONFIG_KEY, "default")
logger.info(s"hive cluster check with queue: $queueName")
Expand Down
Loading

0 comments on commit e0c91f7

Please sign in to comment.