diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala index 8fcb4af737..f9e4718472 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/ServiceInstance.scala @@ -20,11 +20,17 @@ package org.apache.linkis.common class ServiceInstance { private var applicationName: String = _ private var instance: String = _ + private var registryTimestamp: Long = _ def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName def getApplicationName: String = applicationName def setInstance(instance: String): Unit = this.instance = instance def getInstance: String = instance + def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp = + registryTimestamp + + def getRegistryTimestamp: Long = registryTimestamp + override def equals(other: Any): Boolean = other match { case that: ServiceInstance => applicationName == that.applicationName && @@ -42,7 +48,9 @@ class ServiceInstance { .foldLeft(0)((a, b) => 31 * a + b) } - override def toString: String = s"ServiceInstance($applicationName, $instance)" + override def toString: String = + s"ServiceInstance($applicationName, $instance, $registryTimestamp)" + } object ServiceInstance { @@ -54,6 +62,14 @@ object ServiceInstance { serviceInstance } + def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = { + val serviceInstance = new ServiceInstance + serviceInstance.setApplicationName(applicationName) + serviceInstance.setInstance(instance) + serviceInstance.setRegistryTimestamp(registryTimestamp) + serviceInstance + } + def unapply(serviceInstance: ServiceInstance): Option[(String, String)] = if (serviceInstance != null) { Some(serviceInstance.applicationName, serviceInstance.instance) diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala index 3870fe6e58..917ac53261 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars import org.apache.commons.lang3.StringUtils +import scala.collection.mutable + object CodeAndRunTypeUtils { private val CONF_LOCK = new Object() @@ -101,7 +103,14 @@ object CodeAndRunTypeUtils { def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = { val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap if (codeTypeAndRunTypeRelationMap.isEmpty) Map() - else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1))) + else { +// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1))) + val map = mutable.Map[String, String]() + codeTypeAndRunTypeRelationMap.foreach(kv => { + kv._2.foreach(v => map.put(v, kv._1)) + }) + map.toMap + } } def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = { diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala index 582568e626..3c6a25a343 100644 --- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala +++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala @@ -207,4 +207,7 @@ object ServerConfiguration extends Logging { val LINKIS_SERVER_SESSION_PROXY_TICKETID_KEY = CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1") + val LINKIS_SERVER_ENTRANCE_HEADER_KEY = + CommonVars("linkis.server.entrance.header.key", "jobInstanceKey") + } diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java index 6eb97c84d9..a90a1eb3b7 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java @@ -69,6 +69,7 @@ public interface TaskConstant { String TICKET_ID = "ticketId"; String ENGINE_CONN_TASK_ID = "engineConnTaskId"; String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime"; + String FAILOVER_FLAG = "failoverFlag"; String DEBUG_ENBALE = "debug.enable"; String PARAMS_DATA_SOURCE = "dataSources"; diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala new file mode 100644 index 0000000000..5e2eb10a59 --- /dev/null +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.protocol.engine + +case class JobInstance( + status: String, + instances: String, + jobReqId: String, + createTimestamp: Long, + instanceRegistryTimestamp: Long +) diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ZuulEntranceUtils.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ZuulEntranceUtils.scala index 95c7a81873..ad30484c46 100644 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ZuulEntranceUtils.scala +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/ZuulEntranceUtils.scala @@ -23,7 +23,7 @@ object ZuulEntranceUtils { private val INSTANCE_SPLIT_TOKEN = "_" - private val EXEC_ID = "exec_id" + val EXEC_ID = "exec_id" private val SPLIT_LEN = 3 diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala index 6e9ecbd26f..b123682b56 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/AbstractGroup.scala @@ -23,11 +23,18 @@ abstract class AbstractGroup extends Group { private var _status: GroupStatus = _ private var maxRunningJobs: Int = _ + private var maxAllowRunningJobs: Int = 0 private var maxAskExecutorTimes: Long = 0L def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs def getMaxRunningJobs: Int = maxRunningJobs + def setMaxAllowRunningJobs(maxAllowRunningJobs: Int): Unit = this.maxAllowRunningJobs = + maxAllowRunningJobs + + def getMaxAllowRunningJobs: Int = + if (maxAllowRunningJobs <= 0) maxRunningJobs else Math.min(maxAllowRunningJobs, maxRunningJobs) + def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes = maxAskExecutorTimes diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala index 4edc1d5d17..26087d99f0 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala @@ -38,4 +38,8 @@ object SchedulerEventState extends Enumeration { SchedulerEventState.withName(jobState) ) + def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited + + def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState)) + } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala index d541d8a2eb..fcab44a731 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala @@ -27,6 +27,7 @@ import org.apache.linkis.scheduler.executer.Executor import org.apache.linkis.scheduler.future.{BDPFuture, BDPFutureTask} import org.apache.linkis.scheduler.queue._ +import java.util import java.util.concurrent.{ExecutorService, Future} import scala.beans.BeanProperty @@ -122,9 +123,10 @@ class FIFOUserConsumer( } var event: Option[SchedulerEvent] = getWaitForRetryEvent if (event.isEmpty) { - val completedNums = runningJobs.filter(job => job == null || job.isCompleted) - if (completedNums.length < 1) { - Utils.tryQuietly(Thread.sleep(1000)) + val maxAllowRunningJobs = fifoGroup.getMaxAllowRunningJobs + val currentRunningJobs = runningJobs.count(e => e != null && !e.isCompleted) + if (maxAllowRunningJobs <= currentRunningJobs) { + Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化 return } while (event.isEmpty) { @@ -207,6 +209,19 @@ class FIFOUserConsumer( runningJobs(index) = job } + protected def scanAllRetryJobsAndRemove(): util.List[Job] = { + val jobs = new util.ArrayList[Job]() + for (index <- runningJobs.indices) { + val job = runningJobs(index) + if (job != null && job.isJobCanRetry) { + jobs.add(job) + runningJobs(index) = null + logger.info(s"Job $job can retry, remove from runningJobs") + } + } + jobs + } + override def shutdown(): Unit = { future.cancel(true) val waitEvents = queue.getWaitingEvents diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java index d5d97aa364..46fa8a69ef 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java @@ -49,6 +49,9 @@ public class JobRequest { /** result location */ private String resultLocation; + /** Task status updates is ordered, if false, not checked */ + private Boolean updateOrderFlag = true; + private String observeInfo; private Map metrics = new HashMap<>(); @@ -205,6 +208,14 @@ public void setObserveInfo(String observeInfo) { this.observeInfo = observeInfo; } + public Boolean getUpdateOrderFlag() { + return updateOrderFlag; + } + + public void setUpdateOrderFlag(Boolean updateOrderFlag) { + this.updateOrderFlag = updateOrderFlag; + } + @Override public String toString() { return "JobRequest{" diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala index 2e44739787..df197ddb2c 100644 --- a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala +++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/job/JobReqProcotol.scala @@ -51,3 +51,10 @@ class RequestOneJob extends JobReq { } case class RequestAllJob(instance: String) extends JobReq + +case class RequestFailoverJob( + reqMap: util.Map[String, java.lang.Long], + statusList: util.List[String], + startTimestamp: Long, + limit: Int = 10 +) extends JobReq diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala index bc738d5498..ca1887e746 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala @@ -187,7 +187,7 @@ class TaskExecutionServiceImpl if (!lockService.isLockExist(requestTask.getLock)) { logger.error(s"Lock ${requestTask.getLock} not exist, cannot execute.") return ErrorExecuteResponse( - "Lock not exixt", + "Lock not exist", new EngineConnExecutorErrorException( EngineConnExecutorErrorCode.INVALID_LOCK, "Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)." diff --git a/linkis-computation-governance/linkis-entrance/pom.xml b/linkis-computation-governance/linkis-entrance/pom.xml index dea4d1d4d7..bda458c356 100644 --- a/linkis-computation-governance/linkis-entrance/pom.xml +++ b/linkis-computation-governance/linkis-entrance/pom.xml @@ -90,6 +90,12 @@ ${project.version} + + org.apache.linkis + linkis-ps-common-lock + ${project.version} + + diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java index 0bf27a68b3..cf520c3823 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java @@ -42,6 +42,7 @@ import org.apache.linkis.entrance.persistence.QueryPersistenceManager; import org.apache.linkis.entrance.persistence.ResultSetEngine; import org.apache.linkis.entrance.scheduler.EntranceGroupFactory; +import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager; import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext; import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder; import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$; @@ -51,7 +52,6 @@ import org.apache.linkis.scheduler.executer.ExecutorManager; import org.apache.linkis.scheduler.queue.ConsumerManager; import org.apache.linkis.scheduler.queue.GroupFactory; -import org.apache.linkis.scheduler.queue.parallelqueue.ParallelConsumerManager; import org.apache.linkis.scheduler.queue.parallelqueue.ParallelScheduler; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -190,7 +190,7 @@ public GroupFactory groupFactory() { @Bean @ConditionalOnMissingBean public ConsumerManager consumerManager() { - return new ParallelConsumerManager( + return new EntranceParallelConsumerManager( ENTRANCE_SCHEDULER_MAX_PARALLELISM_USERS().getValue(), "EntranceJobScheduler"); } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/constant/ServiceNameConsts.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/constant/ServiceNameConsts.java index cb37279c11..bee17b8ed4 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/constant/ServiceNameConsts.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/constant/ServiceNameConsts.java @@ -26,4 +26,6 @@ private ServiceNameConsts() {} public static final String ENTRANCE_SERVER = "entranceServer"; public static final String ENTRANCE_INTERCEPTOR = "entranceInterceptors"; + + public static final String ENTRANCE_FAILOVER_SERVER = "entranceFailoverServer"; } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/errorcode/EntranceErrorCodeSummary.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/errorcode/EntranceErrorCodeSummary.java index 2f045a1760..b5f90e3070 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/errorcode/EntranceErrorCodeSummary.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/errorcode/EntranceErrorCodeSummary.java @@ -71,7 +71,11 @@ public enum EntranceErrorCodeSummary implements LinkisErrorCode { SHELL_BLACKLISTED_CODE(50081, "Shell code contains blacklisted code(shell中包含黑名单代码)"), JOB_HISTORY_FAILED_ID(50081, ""), - LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)"); + LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)"), + + FAILOVER_RUNNING_TO_CANCELLED( + 30001, + "Job {0} failover, status changed from Running to Cancelled (任务故障转移,状态从Running变更为Cancelled)"); /** (errorCode)错误码 */ private final int errorCode; diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java index 2ab457747c..841a6a3fb0 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceLabelRestfulApi.java @@ -18,14 +18,18 @@ package org.apache.linkis.entrance.restful; import org.apache.linkis.common.conf.Configuration; +import org.apache.linkis.entrance.EntranceServer; +import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext; import org.apache.linkis.instance.label.client.InstanceLabelClient; import org.apache.linkis.manager.label.constant.LabelKeyConstant; import org.apache.linkis.manager.label.constant.LabelValueConstant; import org.apache.linkis.protocol.label.InsLabelRefreshRequest; import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.SchedulerContext; import org.apache.linkis.server.Message; import org.apache.linkis.server.utils.ModuleUserUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; @@ -46,6 +50,12 @@ public class EntranceLabelRestfulApi { private static final Logger logger = LoggerFactory.getLogger(EntranceLabelRestfulApi.class); + private EntranceServer entranceServer; + + @Autowired + public void setEntranceServer(EntranceServer entranceServer) { + this.entranceServer = entranceServer; + } @ApiOperation(value = "update", notes = "update route label", response = Message.class) @ApiOperationSupport(ignoreParameters = {"jsonNode"}) @@ -79,6 +89,16 @@ public Message updateRouteLabel(HttpServletRequest req) { insLabelRefreshRequest.setServiceInstance(Sender.getThisServiceInstance()); InstanceLabelClient.getInstance().refreshLabelsToInstance(insLabelRefreshRequest); logger.info("Finished to modify the routelabel of entry to offline"); + + logger.info("Prepare to update all not execution task instances to empty string"); + SchedulerContext schedulerContext = + entranceServer.getEntranceContext().getOrCreateScheduler().getSchedulerContext(); + if (schedulerContext instanceof EntranceSchedulerContext) { + ((EntranceSchedulerContext) schedulerContext).setOfflineFlag(true); + } + entranceServer.updateAllNotExecutionTaskInstances(true); + logger.info("Finished to update all not execution task instances to empty string"); + return Message.ok(); } } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java index c1479efd8a..cf9cd33653 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java @@ -29,13 +29,16 @@ import org.apache.linkis.governance.common.entity.job.JobRequest; import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus; import org.apache.linkis.protocol.constants.TaskConstant; +import org.apache.linkis.protocol.engine.JobInstance; import org.apache.linkis.protocol.engine.JobProgressInfo; import org.apache.linkis.protocol.utils.ZuulEntranceUtils; import org.apache.linkis.rpc.Sender; import org.apache.linkis.scheduler.listener.LogListener; import org.apache.linkis.scheduler.queue.Job; import org.apache.linkis.scheduler.queue.SchedulerEventState; +import org.apache.linkis.server.BDPJettyServerHelper; import org.apache.linkis.server.Message; +import org.apache.linkis.server.conf.ServerConfiguration; import org.apache.linkis.server.security.SecurityFilter; import org.apache.linkis.server.utils.ModuleUserUtils; @@ -61,6 +64,7 @@ import scala.Option; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; import io.swagger.annotations.Api; @@ -198,6 +202,13 @@ private void pushLog(String log, Job job) { entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log); } + private JobInstance parseHeaderToJobInstance(HttpServletRequest req) + throws JsonProcessingException { + String jobStr = + req.getHeader(ServerConfiguration.LINKIS_SERVER_ENTRANCE_HEADER_KEY().getValue()); + return BDPJettyServerHelper.gson().fromJson(jobStr, JobInstance.class); + } + @ApiOperation(value = "status", notes = "get task stats", response = Message.class) @ApiImplicitParams({ @ApiImplicitParam(name = "taskID", required = false, dataType = "String", value = " task id"), @@ -209,28 +220,74 @@ public Message status( HttpServletRequest req, @PathVariable("id") String id, @RequestParam(value = "taskID", required = false) String taskID) { + ModuleUserUtils.getOperationUser(req, "job status"); Message message = null; - String realId = ZuulEntranceUtils.parseExecID(id)[3]; - ModuleUserUtils.getOperationUser(req, "status realId: " + realId); - Option job = Option.apply(null); + String realId; + String execID; + if (id.startsWith(ZuulEntranceUtils.EXEC_ID())) { + // execID + realId = ZuulEntranceUtils.parseExecID(id)[3]; + execID = id; + } else { + // taskID + JobInstance jobInstance; + try { + jobInstance = parseHeaderToJobInstance(req); + } catch (JsonProcessingException e) { + logger.error("parse JobInstance json error, id: {}", id); + message = Message.error("parse JobInstance json error"); + message.setMethod("/api/entrance/" + id + "/status"); + return message; + } + + // return ok when job complete + if (SchedulerEventState.isCompletedByStr(jobInstance.status())) { + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/status"); + message.data("status", jobInstance.status()).data("execID", "").data("taskID", id); + return message; + } else if (jobInstance.instanceRegistryTimestamp() > jobInstance.createTimestamp()) { + logger.warn("The job {} wait failover, return status is Inited", id); + String status = SchedulerEventState.Inited().toString(); + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/status"); + message.data("status", status).data("execID", "").data("taskID", id); + return message; + } else { + realId = jobInstance.jobReqId(); + execID = + ZuulEntranceUtils.generateExecID( + realId, + Sender.getThisServiceInstance().getApplicationName(), + new String[] {Sender.getThisInstance()}); + } + } + + Option job = null; try { job = entranceServer.getJob(realId); } catch (Exception e) { - logger.warn("获取任务 {} 状态时出现错误", realId, e.getMessage()); + logger.warn("get {} status error", realId, e); + if (StringUtils.isEmpty(taskID)) { + message = + Message.error( + "Get job by ID error and cannot obtain the corresponding task status.(获取job时发生异常,不能获取相应的任务状态)"); + return message; + } long realTaskID = Long.parseLong(taskID); String status = JobHistoryHelper.getStatusByTaskID(realTaskID); message = Message.ok(); message.setMethod("/api/entrance/" + id + "/status"); - message.data("status", status).data("execID", id); + message.data("status", status).data("execID", execID); return message; } - if (job.isDefined()) { + if (job != null && job.isDefined()) { if (job.get() instanceof EntranceJob) { ((EntranceJob) job.get()).updateNewestAccessByClientTimestamp(); } message = Message.ok(); message.setMethod("/api/entrance/" + id + "/status"); - message.data("status", job.get().getState().toString()).data("execID", id); + message.data("status", job.get().getState().toString()).data("execID", execID); } else { message = Message.error( @@ -246,9 +303,56 @@ public Message status( @Override @RequestMapping(path = "/{id}/progress", method = RequestMethod.GET) public Message progress(HttpServletRequest req, @PathVariable("id") String id) { + ModuleUserUtils.getOperationUser(req, "job progress"); Message message = null; - String realId = ZuulEntranceUtils.parseExecID(id)[3]; - ModuleUserUtils.getOperationUser(req, "progress realId: " + realId); + String realId; + String execID; + if (id.startsWith(ZuulEntranceUtils.EXEC_ID())) { + // execID + realId = ZuulEntranceUtils.parseExecID(id)[3]; + execID = id; + } else { + // taskID + JobInstance jobInstance; + try { + jobInstance = parseHeaderToJobInstance(req); + } catch (JsonProcessingException e) { + logger.error("parse JobInstance json error, id: {}", id); + message = Message.error("parse JobInstance json error"); + message.setMethod("/api/entrance/" + id + "/progress"); + return message; + } + + // return ok when job complete + if (SchedulerEventState.isCompletedByStr(jobInstance.status())) { + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/progress"); + message + .data("progress", "1.0") + .data("execID", "") + .data("taskID", id) + .data("progressInfo", new ArrayList<>()); + return message; + } else if (jobInstance.instanceRegistryTimestamp() > jobInstance.createTimestamp()) { + logger.warn("The job {} wait failover, return progress is 0", id); + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/progress"); + message + .data("progress", 0) + .data("execID", "") + .data("taskID", id) + .data("progressInfo", new ArrayList<>()); + return message; + } else { + realId = jobInstance.jobReqId(); + execID = + ZuulEntranceUtils.generateExecID( + realId, + Sender.getThisServiceInstance().getApplicationName(), + new String[] {Sender.getThisInstance()}); + } + } + Option job = null; try { job = entranceServer.getJob(realId); @@ -275,7 +379,7 @@ public Message progress(HttpServletRequest req, @PathVariable("id") String id) { message .data("progress", Math.abs(job.get().getProgress())) - .data("execID", id) + .data("execID", execID) .data("progressInfo", list); } } else { @@ -296,9 +400,60 @@ public Message progress(HttpServletRequest req, @PathVariable("id") String id) { @Override @RequestMapping(path = "/{id}/progressWithResource", method = RequestMethod.GET) public Message progressWithResource(HttpServletRequest req, @PathVariable("id") String id) { + ModuleUserUtils.getOperationUser(req, "job progressWithResource"); Message message = null; - String realId = ZuulEntranceUtils.parseExecID(id)[3]; - ModuleUserUtils.getOperationUser(req, "progressWithResource realId: " + realId); + String realId; + String execID; + if (id.startsWith(ZuulEntranceUtils.EXEC_ID())) { + // execID + realId = ZuulEntranceUtils.parseExecID(id)[3]; + execID = id; + } else { + // taskID + JobInstance jobInstance; + try { + jobInstance = parseHeaderToJobInstance(req); + } catch (JsonProcessingException e) { + logger.error("parse JobInstance json error, id: {}", id); + message = Message.error("parse JobInstance json error"); + message.setMethod("/api/entrance/" + id + "/progressWithResource"); + return message; + } + + // return ok when job complete + if (SchedulerEventState.isCompletedByStr(jobInstance.status())) { + long realTaskID = Long.parseLong(id); + JobRequest jobRequest = JobHistoryHelper.getTaskByTaskID(realTaskID); + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/progressWithResource"); + Map metricsVo = new HashMap<>(); + buildYarnResource(jobRequest, metricsVo, message); + message + .data("progress", "1.0") + .data("execID", "") + .data("taskID", id) + .data("progressInfo", new ArrayList<>()); + return message; + } else if (jobInstance.instanceRegistryTimestamp() > jobInstance.createTimestamp()) { + logger.warn("The job {} wait failover, return progress is 0 and resource is null", id); + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/progressWithResource"); + message + .data(TaskConstant.JOB_YARNRESOURCE, null) + .data("progress", 0) + .data("execID", "") + .data("taskID", id) + .data("progressInfo", new ArrayList<>()); + return message; + } else { + realId = jobInstance.jobReqId(); + execID = + ZuulEntranceUtils.generateExecID( + realId, + Sender.getThisServiceInstance().getApplicationName(), + new String[] {Sender.getThisInstance()}); + } + } Option job = null; try { job = entranceServer.getJob(realId); @@ -324,57 +479,12 @@ public Message progressWithResource(HttpServletRequest req, @PathVariable("id") message.setMethod("/api/entrance/" + id + "/progressWithResource"); JobRequest jobRequest = ((EntranceJob) job.get()).getJobRequest(); - Map metrics = jobRequest.getMetrics(); Map metricsVo = new HashMap<>(); - if (metrics.containsKey(TaskConstant.JOB_YARNRESOURCE)) { - HashMap resourceMap = - (HashMap) metrics.get(TaskConstant.JOB_YARNRESOURCE); - ArrayList resoureList = new ArrayList<>(12); - if (null != resourceMap && !resourceMap.isEmpty()) { - resourceMap.forEach( - (applicationId, resource) -> { - resoureList.add(new YarnResourceWithStatusVo(applicationId, resource)); - }); - metricsVo.put(TaskConstant.JOB_YARNRESOURCE, resoureList); - Optional cores = - resourceMap.values().stream() - .map(resource -> resource.getQueueCores()) - .reduce((x, y) -> x + y); - Optional memory = - resourceMap.values().stream() - .map(resource -> resource.queueMemory()) - .reduce((x, y) -> x + y); - float corePercent = 0.0f; - float memoryPercent = 0.0f; - if (cores.isPresent() && memory.isPresent()) { - corePercent = - cores.get().floatValue() - / EntranceConfiguration.YARN_QUEUE_CORES_MAX().getHotValue(); - memoryPercent = - memory.get().floatValue() - / (EntranceConfiguration.YARN_QUEUE_MEMORY_MAX().getHotValue().longValue() - * 1024 - * 1024 - * 1024); - } - String coreRGB = RGBUtils.getRGB(corePercent); - String memoryRGB = RGBUtils.getRGB(memoryPercent); - metricsVo.put(TaskConstant.JOB_CORE_PERCENT, corePercent); - metricsVo.put(TaskConstant.JOB_MEMORY_PERCENT, memoryPercent); - metricsVo.put(TaskConstant.JOB_CORE_RGB, coreRGB); - metricsVo.put(TaskConstant.JOB_MEMORY_RGB, memoryRGB); - - message.data(TaskConstant.JOB_YARN_METRICS, metricsVo); - } else { - message.data(TaskConstant.JOB_YARNRESOURCE, null); - } - } else { - message.data(TaskConstant.JOB_YARNRESOURCE, null); - } + buildYarnResource(jobRequest, metricsVo, message); message .data("progress", Math.abs(job.get().getProgress())) - .data("execID", id) + .data("execID", execID) .data("progressInfo", list); } } else { @@ -385,6 +495,60 @@ public Message progressWithResource(HttpServletRequest req, @PathVariable("id") return message; } + private void buildYarnResource( + JobRequest jobRequest, Map metricsVo, Message message) { + try { + Map metrics = jobRequest.getMetrics(); + if (metrics.containsKey(TaskConstant.JOB_YARNRESOURCE)) { + + HashMap resourceMap = + (HashMap) metrics.get(TaskConstant.JOB_YARNRESOURCE); + ArrayList resoureList = new ArrayList<>(12); + if (null != resourceMap && !resourceMap.isEmpty()) { + resourceMap.forEach( + (applicationId, resource) -> { + resoureList.add(new YarnResourceWithStatusVo(applicationId, resource)); + }); + metricsVo.put(TaskConstant.JOB_YARNRESOURCE, resoureList); + Optional cores = + resourceMap.values().stream() + .map(resource -> resource.getQueueCores()) + .reduce((x, y) -> x + y); + Optional memory = + resourceMap.values().stream() + .map(resource -> resource.queueMemory()) + .reduce((x, y) -> x + y); + float corePercent = 0.0f; + float memoryPercent = 0.0f; + if (cores.isPresent() && memory.isPresent()) { + corePercent = + cores.get().floatValue() / EntranceConfiguration.YARN_QUEUE_CORES_MAX().getValue(); + memoryPercent = + memory.get().floatValue() + / (EntranceConfiguration.YARN_QUEUE_MEMORY_MAX().getValue().longValue() + * 1024 + * 1024 + * 1024); + } + String coreRGB = RGBUtils.getRGB(corePercent); + String memoryRGB = RGBUtils.getRGB(memoryPercent); + metricsVo.put(TaskConstant.JOB_CORE_PERCENT, corePercent); + metricsVo.put(TaskConstant.JOB_MEMORY_PERCENT, memoryPercent); + metricsVo.put(TaskConstant.JOB_CORE_RGB, coreRGB); + metricsVo.put(TaskConstant.JOB_MEMORY_RGB, memoryRGB); + + message.data(TaskConstant.JOB_YARN_METRICS, metricsVo); + } else { + message.data(TaskConstant.JOB_YARNRESOURCE, null); + } + } else { + message.data(TaskConstant.JOB_YARNRESOURCE, null); + } + } catch (Exception e) { + logger.error("build yarnResource error", e); + } + } + private void setJobProgressInfos( List> list, JobProgressInfo jobProgressInfo) { Map map = new HashMap<>(); @@ -403,10 +567,78 @@ private void setJobProgressInfos( @Override @RequestMapping(path = "/{id}/log", method = RequestMethod.GET) public Message log(HttpServletRequest req, @PathVariable("id") String id) { - String realId = ZuulEntranceUtils.parseExecID(id)[3]; - ModuleUserUtils.getOperationUser(req, "log realId: " + realId); - Option job = Option.apply(null); + ModuleUserUtils.getOperationUser(req, "get job log"); Message message = null; + int fromLine = 0; + int size = 100; + boolean distinctLevel = true; + String fromLineStr = req.getParameter("fromLine"); + String sizeStr = req.getParameter("size"); + if (StringUtils.isNotBlank(fromLineStr)) { + fromLine = Math.max(Integer.parseInt(fromLineStr), 0); + } + if (StringUtils.isNotBlank(sizeStr)) { + size = Integer.parseInt(sizeStr) >= 0 ? Integer.parseInt(sizeStr) : 10000; + } + String distinctLevelStr = req.getParameter("distinctLevel"); + if ("false".equals(distinctLevelStr)) { + distinctLevel = false; + } + + String realId; + String execID; + if (id.startsWith(ZuulEntranceUtils.EXEC_ID())) { + // execID + realId = ZuulEntranceUtils.parseExecID(id)[3]; + execID = id; + } else { + // taskID + JobInstance jobInstance; + try { + jobInstance = parseHeaderToJobInstance(req); + } catch (JsonProcessingException e) { + logger.error("parse JobInstance json error, id: {}", id); + message = Message.error("parse JobInstance json error"); + message.setMethod("/api/entrance/" + id + "/log"); + return message; + } + + // return ok when job complete + if (SchedulerEventState.isCompletedByStr(jobInstance.status())) { + message = + Message.error( + "The job you just executed has ended. This interface no longer provides a query. It is recommended that you download the log file for viewing.(您刚刚执行的job已经结束,本接口不再提供查询,建议您下载日志文件进行查看)"); + message.setMethod("/api/entrance/" + id + "/log"); + return message; + } else if (jobInstance.instanceRegistryTimestamp() > jobInstance.createTimestamp()) { + logger.warn("The job {} wait failover, return customer log", id); + message = Message.ok(); + message.setMethod("/api/entrance/" + id + "/log"); + String log = + LogUtils.generateInfo( + "The job will failover soon, please try again later.(job很快就会failover,请稍后再试)"); + Object retLog; + if (distinctLevel) { + String[] array = new String[4]; + array[2] = log; + array[3] = log; + retLog = new ArrayList(Arrays.asList(array)); + } else { + retLog = log; + } + message.data("log", retLog).data("execID", "").data("taskID", id).data("fromLine", 0); + return message; + } else { + realId = jobInstance.jobReqId(); + execID = + ZuulEntranceUtils.generateExecID( + realId, + Sender.getThisServiceInstance().getApplicationName(), + new String[] {Sender.getThisInstance()}); + } + } + + Option job = null; try { job = entranceServer.getJob(realId); } catch (final Throwable t) { @@ -416,27 +648,10 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) { message.setMethod("/api/entrance/" + id + "/log"); return message; } - if (job.isDefined()) { + if (job != null && job.isDefined()) { logger.debug("begin to get log for {}(开始获取 {} 的日志)", job.get().getId(), job.get().getId()); LogReader logReader = entranceServer.getEntranceContext().getOrCreateLogManager().getLogReader(realId); - int fromLine = 0; - int size = 100; - boolean distinctLevel = true; - if (req != null) { - String fromLineStr = req.getParameter("fromLine"); - String sizeStr = req.getParameter("size"); - if (StringUtils.isNotBlank(fromLineStr)) { - fromLine = Math.max(Integer.parseInt(fromLineStr), 0); - } - if (StringUtils.isNotBlank(sizeStr)) { - size = Integer.parseInt(sizeStr) >= 0 ? Integer.parseInt(sizeStr) : 10000; - } - String distinctLevelStr = req.getParameter("distinctLevel"); - if ("false".equals(distinctLevelStr)) { - distinctLevel = false; - } - } Object retLog = null; int retFromLine = 0; @@ -458,7 +673,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) { e); message = Message.ok(); message.setMethod("/api/entrance/" + id + "/log"); - message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine); + message.data("log", "").data("execID", execID).data("fromLine", retFromLine + fromLine); } catch (final IllegalArgumentException e) { logger.debug( "Failed to get log information for :{}(为 {} 获取日志失败)", @@ -467,7 +682,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) { e); message = Message.ok(); message.setMethod("/api/entrance/" + id + "/log"); - message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine); + message.data("log", "").data("execID", execID).data("fromLine", retFromLine + fromLine); return message; } catch (final Exception e1) { logger.debug( @@ -477,7 +692,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) { e1); message = Message.error("Failed to get log information(获取日志信息失败)"); message.setMethod("/api/entrance/" + id + "/log"); - message.data("log", "").data("execID", id).data("fromLine", retFromLine + fromLine); + message.data("log", "").data("execID", execID).data("fromLine", retFromLine + fromLine); return message; } finally { if (null != logReader && job.get().isCompleted()) { @@ -486,7 +701,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) { } message = Message.ok(); message.setMethod("/api/entrance/" + id + "/log"); - message.data("log", retLog).data("execID", id).data("fromLine", retFromLine + fromLine); + message.data("log", retLog).data("execID", execID).data("fromLine", retFromLine + fromLine); logger.debug("success to get log for {} (获取 {} 日志成功)", job.get().getId(), job.get().getId()); } else { message = @@ -514,7 +729,6 @@ public Message killJobs( JsonNode taskIDNode = jsonNode.get("taskIDList"); ArrayList waitToForceKill = new ArrayList<>(); String userName = ModuleUserUtils.getOperationUser(req, "killJobs"); - if (idNode.size() != taskIDNode.size()) { return Message.error( "The length of the ID list does not match the length of the TASKID list(id列表的长度与taskId列表的长度不一致)"); @@ -527,7 +741,7 @@ public Message killJobs( String id = idNode.get(i).asText(); Long taskID = taskIDNode.get(i).asLong(); String realId = ZuulEntranceUtils.parseExecID(id)[3]; - Option job = Option.apply(null); + Option job = null; try { job = entranceServer.getJob(realId); } catch (Exception e) { @@ -541,7 +755,7 @@ public Message killJobs( continue; } Message message = null; - if (job.isEmpty()) { + if (job == null || job.isEmpty()) { logger.warn("can not find a job in entranceServer, will force to kill it"); waitToForceKill.add(taskID); message = Message.ok("Forced Kill task (强制杀死任务)"); @@ -577,11 +791,12 @@ public Message killJobs( if (null != logListener) { logListener.onLogUpdate( entranceJob, - "Job " - + jobReq.getId() - + " was kill by user successfully(任务" - + jobReq.getId() - + "已成功取消)"); + LogUtils.generateInfo( + "Job " + + jobReq.getId() + + " was kill by user successfully(任务" + + jobReq.getId() + + "已成功取消)")); } this.entranceServer .getEntranceContext() @@ -609,7 +824,7 @@ public Message killJobs( @ApiOperation(value = "kill", notes = "kill", response = Message.class) @ApiImplicitParams({ - @ApiImplicitParam(name = "id", required = true, dataType = "String", value = "excute id"), + @ApiImplicitParam(name = "id", required = true, dataType = "String", value = "exec id"), @ApiImplicitParam(name = "taskID", required = false, dataType = "String", value = "task id") }) @Override @@ -618,23 +833,68 @@ public Message kill( HttpServletRequest req, @PathVariable("id") String id, @RequestParam(value = "taskID", required = false) Long taskID) { - String realId = ZuulEntranceUtils.parseExecID(id)[3]; - String userName = ModuleUserUtils.getOperationUser(req, "kill task realId:" + realId); + String userName = ModuleUserUtils.getOperationUser(req, "kill job"); + Message message = null; + String realId; + String execID; + if (id.startsWith(ZuulEntranceUtils.EXEC_ID())) { + // execID + realId = ZuulEntranceUtils.parseExecID(id)[3]; + execID = id; + } else { + // taskID + JobInstance jobInstance; + try { + jobInstance = parseHeaderToJobInstance(req); + } catch (JsonProcessingException e) { + logger.error("parse JobInstance json error, id: {}", id); + message = Message.error("parse JobInstance json error"); + message.setMethod("/api/entrance/" + id + "/kill"); + return message; + } - Option job = Option.apply(null); + // return ok when job complete + if (SchedulerEventState.isCompletedByStr(jobInstance.status())) { + message = Message.error("The job already completed. Do not support kill.(任务已经结束,不支持kill)"); + message.setMethod("/api/entrance/" + id + "/kill"); + return message; + } else if (jobInstance.instanceRegistryTimestamp() > jobInstance.createTimestamp()) { + logger.warn("The job {} wait failover, but now force kill", id); + // TODO If failover during force kill, the job status may change from Cancelled to Running + long taskId = Long.parseLong(id); + JobHistoryHelper.forceKill(taskId); + message = Message.ok("Forced Kill task (强制杀死任务)"); + message.setMethod("/api/entrance/" + id + "/kill"); + message.data("execID", "").data("taskID", id); + return message; + } else { + realId = jobInstance.jobReqId(); + execID = + ZuulEntranceUtils.generateExecID( + realId, + Sender.getThisServiceInstance().getApplicationName(), + new String[] {Sender.getThisInstance()}); + } + } + + Option job = null; try { job = entranceServer.getJob(realId); } catch (Exception e) { logger.warn("can not find a job in entranceServer, will force to kill it", e); // 如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的 + if (taskID == null || taskID <= 0) { + message = Message.error("Get job by ID error, kill failed.(获取job时发生异常,kill失败)"); + return message; + } JobHistoryHelper.forceKill(taskID); - Message message = Message.ok("Forced Kill task (强制杀死任务)"); + message = Message.ok("Forced Kill task (强制杀死任务)"); message.setMethod("/api/entrance/" + id + "/kill"); message.setStatus(0); return message; } - Message message = null; - if (job.isEmpty()) { + + if (job == null || job.isEmpty()) { logger.warn("can not find a job in entranceServer, will force to kill it"); // 如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的 JobHistoryHelper.forceKill(taskID); @@ -660,8 +920,7 @@ public Message kill( job.get().kill(); message = Message.ok("Successfully killed the job(成功kill了job)"); message.setMethod("/api/entrance/" + id + "/kill"); - message.setStatus(0); - message.data("execID", id); + message.data("execID", execID); // ensure the job's state is cancelled in database if (job.get() instanceof EntranceJob) { EntranceJob entranceJob = (EntranceJob) job.get(); diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java index 7558ab6dc2..94531cd5fe 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java @@ -20,13 +20,17 @@ import org.apache.linkis.common.ServiceInstance; import org.apache.linkis.entrance.EntranceContext; import org.apache.linkis.entrance.EntranceServer; +import org.apache.linkis.entrance.conf.EntranceConfiguration; import org.apache.linkis.entrance.conf.EntranceConfiguration$; import org.apache.linkis.entrance.constant.ServiceNameConsts; import org.apache.linkis.entrance.execute.EntranceJob; +import org.apache.linkis.entrance.job.EntranceExecutionJob; import org.apache.linkis.entrance.log.LogReader; import org.apache.linkis.governance.common.protocol.conf.EntranceInstanceConfRequest; import org.apache.linkis.rpc.Sender; +import org.apache.commons.io.IOUtils; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.EventListener; @@ -94,13 +98,19 @@ private void shutdownEntrance(ContextClosedEvent event) { if (shutdownFlag) { logger.warn("event has been handled"); } else { + if (EntranceConfiguration.ENTRANCE_SHUTDOWN_FAILOVER_CONSUME_QUEUE_ENABLED()) { + logger.warn("Entrance exit to update and clean all ConsumeQueue task instances"); + updateAllNotExecutionTaskInstances(false); + } + logger.warn("Entrance exit to stop all job"); - EntranceJob[] allUndoneJobs = getAllUndoneTask(null); - if (null != allUndoneJobs) { - for (EntranceJob job : allUndoneJobs) { + EntranceJob[] allUndoneTask = getAllUndoneTask(null); + if (null != allUndoneTask) { + for (EntranceJob job : allUndoneTask) { job.onFailure( "Your job will be marked as canceled because the Entrance service restarted(因为Entrance服务重启,您的任务将被标记为取消)", null); + IOUtils.closeQuietly(((EntranceExecutionJob) job).getLogWriter().get()); } } } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java new file mode 100644 index 0000000000..4e66da5cc3 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.server; + +import org.apache.linkis.common.ServiceInstance; +import org.apache.linkis.common.utils.Utils; +import org.apache.linkis.entrance.EntranceServer; +import org.apache.linkis.entrance.conf.EntranceConfiguration; +import org.apache.linkis.entrance.constant.ServiceNameConsts; +import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext; +import org.apache.linkis.entrance.utils.JobHistoryHelper; +import org.apache.linkis.governance.common.entity.job.JobRequest; +import org.apache.linkis.publicservice.common.lock.entity.CommonLock; +import org.apache.linkis.publicservice.common.lock.service.CommonLockService; +import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.queue.SchedulerEventState; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import scala.Enumeration; +import scala.collection.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component(ServiceNameConsts.ENTRANCE_FAILOVER_SERVER) +public class EntranceFailoverJobServer { + + private static final Logger logger = LoggerFactory.getLogger(EntranceFailoverJobServer.class); + + @Autowired private EntranceServer entranceServer; + + @Autowired private CommonLockService commonLockService; + + private static String ENTRANCE_FAILOVER_LOCK = "ENTRANCE_FAILOVER_LOCK"; + + private ScheduledExecutorService scheduledExecutor; + + private Future future; + + @PostConstruct + public void init() { + if (EntranceConfiguration.ENTRANCE_FAILOVER_ENABLED()) { + this.scheduledExecutor = + Executors.newSingleThreadScheduledExecutor( + Utils.threadFactory("Linkis-Failover-Scheduler-Thread-", true)); + failoverTask(); + } + } + + @EventListener + private void shutdownFailover(ContextClosedEvent event) { + if (future != null && !future.isDone()) { + future.cancel(true); + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + logger.info("Entrance Failover Server exit!"); + } + } + + public void failoverTask() { + future = + scheduledExecutor.scheduleWithFixedDelay( + () -> { + EntranceSchedulerContext schedulerContext = + (EntranceSchedulerContext) + entranceServer + .getEntranceContext() + .getOrCreateScheduler() + .getSchedulerContext(); + + // entrance do not failover job when it is offline + if (schedulerContext.getOfflineFlag()) return; + + CommonLock commonLock = new CommonLock(); + commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK); + Boolean locked = false; + try { + locked = commonLockService.lock(commonLock, 30 * 1000L); + if (!locked) return; + logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK); + + // get all entrance server from eureka + ServiceInstance[] serviceInstances = + Sender.getInstances(Sender.getThisServiceInstance().getApplicationName()); + if (serviceInstances == null || serviceInstances.length <= 0) return; + + // serverInstance to map + Map serverInstanceMap = + Arrays.stream(serviceInstances) + .collect( + Collectors.toMap( + ServiceInstance::getInstance, + ServiceInstance::getRegistryTimestamp, + (k1, k2) -> k2)); + + // It is very important to avoid repeated execute job + // when failover self job, if self instance is empty, the job can be repeated + // execute + if (!serverInstanceMap.containsKey(Sender.getThisInstance())) { + logger.warn( + "server has just started and has not get self info, it does not failover"); + return; + } + + // get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理) + long expiredTimestamp = 0L; + if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) { + expiredTimestamp = + System.currentTimeMillis() + - EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME(); + } + + List jobRequests = + JobHistoryHelper.queryWaitForFailoverTask( + serverInstanceMap, + getUnCompleteStatus(), + expiredTimestamp, + EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT()); + if (jobRequests.isEmpty()) return; + List ids = + jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList()); + logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids); + + // failover to local server + for (JobRequest jobRequest : jobRequests) { + entranceServer.failoverExecute(jobRequest); + } + logger.info("finished execute failover jobs, job ids: {}", ids); + + } catch (Exception e) { + logger.error("failover failed", e); + } finally { + if (locked) commonLockService.unlock(commonLock); + } + }, + EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(), + EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(), + TimeUnit.MILLISECONDS); + } + + private List getUnCompleteStatus() { + List status = new ArrayList<>(); + Enumeration.ValueSet values = SchedulerEventState.values(); + Iterator iterator = values.iterator(); + while (iterator.hasNext()) { + Enumeration.Value next = iterator.next(); + if (!SchedulerEventState.isCompleted(next)) status.add(next.toString()); + } + return status; + } +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala index 1035de1e2b..b023065ee4 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala @@ -17,28 +17,45 @@ package org.apache.linkis.entrance +import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.exception.{ErrorException, LinkisException, LinkisRuntimeException} +import org.apache.linkis.common.io.FsPath import org.apache.linkis.common.log.LogUtils import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.cs.CSEntranceHelper +import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._ import org.apache.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException} import org.apache.linkis.entrance.execute.EntranceJob -import org.apache.linkis.entrance.log.LogReader +import org.apache.linkis.entrance.job.EntranceExecutionJob +import org.apache.linkis.entrance.log.{Cache, CacheLogWriter, HDFSCacheLogWriter, LogReader} +import org.apache.linkis.entrance.parser.ParserUtils import org.apache.linkis.entrance.timeout.JobTimeoutManager import org.apache.linkis.entrance.utils.JobHistoryHelper +import org.apache.linkis.governance.common.conf.GovernanceCommonConf import org.apache.linkis.governance.common.entity.job.JobRequest +import org.apache.linkis.governance.common.protocol.task.RequestTaskKill import org.apache.linkis.governance.common.utils.LoggerUtils +import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest +import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.rpc.Sender +import org.apache.linkis.rpc.conf.RPCConfiguration import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState} import org.apache.linkis.server.conf.ServerConfiguration +import org.apache.linkis.storage.utils.StorageUtils import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils -import java.text.MessageFormat -import java.util +import org.springframework.beans.BeanUtils + +import java.{lang, util} +import java.text.{MessageFormat, SimpleDateFormat} +import java.util.Date + +import scala.collection.JavaConverters._ abstract class EntranceServer extends Logging { @@ -82,6 +99,283 @@ abstract class EntranceServer extends Logging { LoggerUtils.setJobIdMDC(jobRequest.getId.toString) val logAppender = new java.lang.StringBuilder() + jobRequest = dealInitedJobRequest(jobRequest, logAppender) + + val job = getEntranceContext.getOrCreateEntranceParser().parseToJob(jobRequest) + Utils.tryThrow { + job.init() + job.setLogListener(getEntranceContext.getOrCreateLogManager()) + job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager()) + job.setJobListener(getEntranceContext.getOrCreatePersistenceManager()) + job match { + case entranceJob: EntranceJob => + entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus) + case _ => + } + Utils.tryCatch { + if (logAppender.length() > 0) { + job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim)) + } + } { t => + logger.error("Failed to write init log, reason: ", t) + } + + /** + * job.afterStateChanged() method is only called in job.run(), and job.run() is called only + * after job is scheduled so it suggest that we lack a hook for job init, currently we call + * this to trigger JobListener.onJobinit() + */ + Utils.tryAndWarn(job.getJobListener.foreach(_.onJobInited(job))) + getEntranceContext.getOrCreateScheduler().submit(job) + val msg = LogUtils.generateInfo( + s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted " + ) + logger.info(msg) + + job match { + case entranceJob: EntranceJob => + entranceJob.getJobRequest.setReqId(job.getId()) + if (jobTimeoutManager.timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) { + jobTimeoutManager.add(job.getId(), entranceJob) + } + entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg)) + case _ => + } + LoggerUtils.removeJobIdMDC() + job + } { t => + LoggerUtils.removeJobIdMDC() + job.onFailure("Submitting the query failed!(提交查询失败!)", t) + val _jobRequest: JobRequest = + getEntranceContext.getOrCreateEntranceParser().parseToJobRequest(job) + getEntranceContext + .getOrCreatePersistenceManager() + .createPersistenceEngine() + .updateIfNeeded(_jobRequest) + t match { + case e: LinkisException => e + case e: LinkisRuntimeException => e + case t: Throwable => + new SubmitFailedException( + SUBMITTING_QUERY_FAILED.getErrorCode, + SUBMITTING_QUERY_FAILED.getErrorDesc + ExceptionUtils.getRootCauseMessage(t), + t + ) + } + } + } + + def logReader(execId: String): LogReader + + def getJob(execId: String): Option[Job] = + getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job]) + + private[entrance] def getEntranceWebSocketService: Option[EntranceWebSocketService] = + if (ServerConfiguration.BDP_SERVER_SOCKET_MODE.getValue) { + if (entranceWebSocketService.isEmpty) synchronized { + if (entranceWebSocketService.isEmpty) { + entranceWebSocketService = Some(new EntranceWebSocketService) + entranceWebSocketService.foreach(_.setEntranceServer(this)) + entranceWebSocketService.foreach( + getEntranceContext.getOrCreateEventListenerBus.addListener + ) + } + } + entranceWebSocketService + } else None + + def getAllUndoneTask(filterWords: String): Array[EntranceJob] = { + val consumers = getEntranceContext + .getOrCreateScheduler() + .getSchedulerContext + .getOrCreateConsumerManager + .listConsumers() + .toSet + val filterConsumer = if (StringUtils.isNotBlank(filterWords)) { + consumers.filter(_.getGroup.getGroupName.contains(filterWords)) + } else { + consumers + } + filterConsumer + .flatMap { consumer => + consumer.getRunningEvents ++ consumer.getConsumeQueue.getWaitingEvents + } + .filter(job => job != null && job.isInstanceOf[EntranceJob]) + .map(_.asInstanceOf[EntranceJob]) + .toArray + } + + def getAllConsumeQueueTask(): Array[EntranceJob] = { + val consumers = getEntranceContext + .getOrCreateScheduler() + .getSchedulerContext + .getOrCreateConsumerManager + .listConsumers() + .toSet + + consumers + .flatMap { consumer => + consumer.getConsumeQueue.getWaitingEvents + } + .filter(job => job != null && job.isInstanceOf[EntranceJob]) + .map(_.asInstanceOf[EntranceJob]) + .toArray + } + + def clearAllConsumeQueue(): Unit = { + getEntranceContext + .getOrCreateScheduler() + .getSchedulerContext + .getOrCreateConsumerManager + .listConsumers() + .foreach(_.getConsumeQueue.clearAll()) + } + + def updateAllNotExecutionTaskInstances(retryWhenUpdateFail: Boolean): Unit = { + val consumeQueueTasks = getAllConsumeQueueTask() + + clearAllConsumeQueue() + logger.info("Finished to clean all ConsumeQueue") + + if (consumeQueueTasks != null && consumeQueueTasks.length > 0) { + val taskIds = new util.ArrayList[Long]() + consumeQueueTasks.foreach(job => { + taskIds.add(job.getJobRequest.getId.asInstanceOf[Long]) + job match { + case entranceExecutionJob: EntranceExecutionJob => + val msg = LogUtils.generateWarn( + s"job ${job.getJobRequest.getId} clean from ConsumeQueue, wait for failover" + ) + entranceExecutionJob.getLogListener.foreach(_.onLogUpdate(entranceExecutionJob, msg)) + entranceExecutionJob.getLogWriter.foreach(_.close()) + case _ => + } + }) + + JobHistoryHelper.updateAllConsumeQueueTask(taskIds, retryWhenUpdateFail) + logger.info("Finished to update all not execution task instances") + } + } + + /** + * execute failover job (提交故障转移任务,返回新的execId) + * + * @param jobRequest + */ + def failoverExecute(jobRequest: JobRequest): Unit = { + + if (null == jobRequest || null == jobRequest.getId || jobRequest.getId <= 0) { + throw new EntranceErrorException( + PERSIST_JOBREQUEST_ERROR.getErrorCode, + PERSIST_JOBREQUEST_ERROR.getErrorDesc + ) + } + + val logAppender = new java.lang.StringBuilder() + logAppender.append( + "*************************************FAILOVER************************************** \n" + ) + + // try to kill ec + killOldEC(jobRequest, logAppender); + + // deal Inited jobRequest, if status is Inited, need to deal by all Interceptors, such as set log_path + if (SchedulerEventState.isInitedByStr(jobRequest.getStatus)) { + dealInitedJobRequest(jobRequest, logAppender) + } + + if ( + EntranceConfiguration.ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED.getValue && + SchedulerEventState.isRunningByStr(jobRequest.getStatus) + ) { + // deal Running jobRequest, if enabled, status changed from Running to Cancelled + dealRunningJobRequest(jobRequest, logAppender) + } else { + // init and submit + initAndSubmitJobRequest(jobRequest, logAppender) + } + } + + def killOldEC(jobRequest: JobRequest, logAppender: lang.StringBuilder): Unit = { + Utils.tryCatch { + logAppender.append( + LogUtils + .generateInfo(s"job ${jobRequest.getId} start to kill old ec \n") + ) + if ( + !SchedulerEventState.isRunning(SchedulerEventState.withName(jobRequest.getStatus)) + || !SchedulerEventState.isScheduled(SchedulerEventState.withName(jobRequest.getStatus)) + ) { + val msg = s"job ${jobRequest.getId} status is not running or scheduled, ignore it" + logger.info(msg) + logAppender.append(LogUtils.generateInfo(msg) + "\n") + return + } + + if ( + jobRequest.getMetrics == null + || !jobRequest.getMetrics.containsKey(TaskConstant.JOB_ENGINECONN_MAP) + ) { + val msg = s"job ${jobRequest.getId} not have EC info, ignore it" + logger.info(msg) + logAppender.append(LogUtils.generateInfo(msg) + "\n") + return + } + + val engineMap = jobRequest.getMetrics + .get(TaskConstant.JOB_ENGINECONN_MAP) + .asInstanceOf[util.Map[String, Object]] + + val engineInstance = + engineMap.asScala + .map(_._2.asInstanceOf[util.Map[String, Object]]) + .filter(_.containsKey(TaskConstant.ENGINE_INSTANCE)) + .maxBy(_.getOrDefault(TaskConstant.ENGINE_CONN_SUBMIT_TIME, "0").toString) + + if (engineInstance == null || engineInstance.containsKey(TaskConstant.FAILOVER_FLAG)) { + val msg = + s"job ${jobRequest.getId} do not submit to EC or already failover, not need kill ec" + logger.info(msg) + logAppender.append(LogUtils.generateInfo(msg) + "\n") + return + } + engineInstance.put(TaskConstant.FAILOVER_FLAG, "") + + val ecInstance = ServiceInstance( + GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue, + engineInstance.get(TaskConstant.ENGINE_INSTANCE).toString + ) + if (jobRequest.getLabels.asScala.exists(_.isInstanceOf[ExecuteOnceLabel])) { + // kill ec by linkismanager + val engineStopRequest = new EngineStopRequest + engineStopRequest.setServiceInstance(ecInstance) + // send to linkismanager kill ec + Sender + .getSender(RPCConfiguration.LINKIS_MANAGER_SERVICE_NAME.getValue) + .send(engineStopRequest) + val msg = + s"job ${jobRequest.getId} send EngineStopRequest to linkismanager, kill EC instance $ecInstance" + logger.info(msg) + logAppender.append(LogUtils.generateInfo(msg) + "\n") + } else if (engineInstance.containsKey(TaskConstant.ENGINE_CONN_TASK_ID)) { + // get ec taskId + val engineTaskId = engineInstance.get(TaskConstant.ENGINE_CONN_TASK_ID).toString + // send to ec kill task + Sender + .getSender(ecInstance) + .send(RequestTaskKill(engineTaskId)) + val msg = + s"job ${jobRequest.getId} send RequestTaskKill to kill engineConn $ecInstance, execID $engineTaskId" + logger.info(msg) + logAppender.append(LogUtils.generateInfo(msg) + "\n") + } + } { t => + logger.error(s"job ${jobRequest.getId} kill ec error", t) + } + } + + def dealInitedJobRequest(jobReq: JobRequest, logAppender: lang.StringBuilder): JobRequest = { + var jobRequest = jobReq Utils.tryThrow( getEntranceContext .getOrCreateEntranceInterceptors() @@ -128,6 +422,68 @@ abstract class EntranceServer extends Logging { .updateIfNeeded(jobRequest) error } + jobRequest + } + + def dealRunningJobRequest(jobRequest: JobRequest, logAppender: lang.StringBuilder): Unit = { + Utils.tryCatch { + // error_msg + val msg = + MessageFormat.format( + EntranceErrorCodeSummary.FAILOVER_RUNNING_TO_CANCELLED.getErrorDesc, + jobRequest.getId.toString + ) + // init jobRequest properties + jobRequest.setStatus(SchedulerEventState.Cancelled.toString) + jobRequest.setProgress("1.0") + jobRequest.setInstances(Sender.getThisInstance) + jobRequest.setErrorCode(EntranceErrorCodeSummary.FAILOVER_RUNNING_TO_CANCELLED.getErrorCode) + jobRequest.setErrorDesc(msg) + + // update jobRequest + getEntranceContext + .getOrCreatePersistenceManager() + .createPersistenceEngine() + .updateIfNeeded(jobRequest) + + // getOrGenerate log_path + var logPath = jobRequest.getLogPath + if (StringUtils.isBlank(logPath)) { + ParserUtils.generateLogPath(jobRequest, null) + logPath = jobRequest.getLogPath + logAppender.append( + LogUtils.generateInfo(s"job ${jobRequest.getId} generate new logPath $logPath \n") + ) + } + val job = getEntranceContext.getOrCreateEntranceParser().parseToJob(jobRequest) + val logWriter = getEntranceContext.getOrCreateLogManager().createLogWriter(job) + if (logAppender.length() > 0) { + logWriter.write(logAppender.toString.trim) + } + + logWriter.write(LogUtils.generateInfo(msg) + "\n") + logWriter.flush() + logWriter.close() + + } { case e: Exception => + logger.error(s"Job ${jobRequest.getId} failover, change status error", e) + } + } + + def initAndSubmitJobRequest(jobRequest: JobRequest, logAppender: lang.StringBuilder): Unit = { + // init properties + initJobRequestProperties(jobRequest, logAppender) + + // update jobRequest + getEntranceContext + .getOrCreatePersistenceManager() + .createPersistenceEngine() + .updateIfNeeded(jobRequest) + + // reset `UpdateOrderFlag` + jobRequest.setUpdateOrderFlag(true) + + logger.info(s"job ${jobRequest.getId} update JobRequest success") val job = getEntranceContext.getOrCreateEntranceParser().parseToJob(jobRequest) Utils.tryThrow { @@ -136,16 +492,16 @@ abstract class EntranceServer extends Logging { job.setProgressListener(getEntranceContext.getOrCreatePersistenceManager()) job.setJobListener(getEntranceContext.getOrCreatePersistenceManager()) job match { - case entranceJob: EntranceJob => + case entranceJob: EntranceJob => { entranceJob.setEntranceListenerBus(getEntranceContext.getOrCreateEventListenerBus) + } case _ => } Utils.tryCatch { - if (logAppender.length() > 0) { + if (logAppender.length() > 0) job.getLogListener.foreach(_.onLogUpdate(job, logAppender.toString.trim)) - } } { t => - logger.error("Failed to write init log, reason: ", t) + logger.error("Failed to write init JobRequest log, reason: ", t) } /** @@ -156,25 +512,21 @@ abstract class EntranceServer extends Logging { Utils.tryAndWarn(job.getJobListener.foreach(_.onJobInited(job))) getEntranceContext.getOrCreateScheduler().submit(job) val msg = LogUtils.generateInfo( - s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted " + s"Job with jobId : ${jobRequest.getId} and execID : ${job.getId()} submitted, success to failover" ) logger.info(msg) job match { case entranceJob: EntranceJob => entranceJob.getJobRequest.setReqId(job.getId()) - if (jobTimeoutManager.timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) { + if (jobTimeoutManager.timeoutCheck && JobTimeoutManager.hasTimeoutLabel(entranceJob)) jobTimeoutManager.add(job.getId(), entranceJob) - } entranceJob.getLogListener.foreach(_.onLogUpdate(entranceJob, msg)) case _ => } - LoggerUtils.removeJobIdMDC() - job } { t => - LoggerUtils.removeJobIdMDC() job.onFailure("Submitting the query failed!(提交查询失败!)", t) - val _jobRequest: JobRequest = + val _jobRequest = getEntranceContext.getOrCreateEntranceParser().parseToJobRequest(job) getEntranceContext .getOrCreatePersistenceManager() @@ -193,44 +545,80 @@ abstract class EntranceServer extends Logging { } } - def logReader(execId: String): LogReader + private def initJobRequestProperties( + jobRequest: JobRequest, + logAppender: lang.StringBuilder + ): Unit = { + logger.info(s"job ${jobRequest.getId} start to initialize the properties") + val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val initInstance = Sender.getThisInstance + val initDate = new Date(System.currentTimeMillis) + val initStatus = SchedulerEventState.Inited.toString + val initProgress = "0.0" + val initReqId = "" - def getJob(execId: String): Option[Job] = - getEntranceContext.getOrCreateScheduler().get(execId).map(_.asInstanceOf[Job]) + logAppender.append( + LogUtils + .generateInfo(s"job ${jobRequest.getId} start to Initialize the properties \n") + ) + logAppender.append( + LogUtils.generateInfo(s"the instances ${jobRequest.getInstances} -> ${initInstance} \n") + ) + logAppender.append( + LogUtils.generateInfo( + s"the created_time ${sdf.format(jobRequest.getCreatedTime)} -> ${sdf.format(initDate)} \n" + ) + ) + logAppender.append( + LogUtils.generateInfo(s"the status ${jobRequest.getStatus} -> $initStatus \n") + ) + logAppender.append( + LogUtils.generateInfo(s"the progress ${jobRequest.getProgress} -> $initProgress \n") + ) - private[entrance] def getEntranceWebSocketService: Option[EntranceWebSocketService] = - if (ServerConfiguration.BDP_SERVER_SOCKET_MODE.getValue) { - if (entranceWebSocketService.isEmpty) synchronized { - if (entranceWebSocketService.isEmpty) { - entranceWebSocketService = Some(new EntranceWebSocketService) - entranceWebSocketService.foreach(_.setEntranceServer(this)) - entranceWebSocketService.foreach( - getEntranceContext.getOrCreateEventListenerBus.addListener + val metricMap = new util.HashMap[String, Object]() + if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED.getValue) { + if ( + jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey( + TaskConstant.JOB_ENGINECONN_MAP ) - } + ) { + val oldEngineconnMap = jobRequest.getMetrics + .get(TaskConstant.JOB_ENGINECONN_MAP) + .asInstanceOf[util.Map[String, Object]] + metricMap.put(TaskConstant.JOB_ENGINECONN_MAP, oldEngineconnMap) } - entranceWebSocketService - } else None - - def getAllUndoneTask(filterWords: String): Array[EntranceJob] = { - val consumers = getEntranceContext - .getOrCreateScheduler() - .getSchedulerContext - .getOrCreateConsumerManager - .listConsumers() - .toSet - val filterConsumer = if (StringUtils.isNotBlank(filterWords)) { - consumers.filter(_.getGroup.getGroupName.contains(filterWords)) - } else { - consumers } - filterConsumer - .flatMap { consumer => - consumer.getRunningEvents ++ consumer.getConsumeQueue.getWaitingEvents + + if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED.getValue) { + if ( + jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey( + TaskConstant.JOB_YARNRESOURCE + ) + ) { + val oldResourceMap = jobRequest.getMetrics + .get(TaskConstant.JOB_YARNRESOURCE) + .asInstanceOf[util.Map[String, Object]] + metricMap.put(TaskConstant.JOB_YARNRESOURCE, oldResourceMap) } - .filter(job => job != null && job.isInstanceOf[EntranceJob]) - .map(_.asInstanceOf[EntranceJob]) - .toArray + } + + jobRequest.setInstances(initInstance) + jobRequest.setCreatedTime(initDate) + jobRequest.setStatus(initStatus) + jobRequest.setProgress(initProgress) + jobRequest.setReqId(initReqId) + jobRequest.setErrorCode(0) + jobRequest.setErrorDesc("") + jobRequest.setMetrics(metricMap) + jobRequest.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, initDate) + // Allow task status updates to be unordered + jobRequest.setUpdateOrderFlag(false) + + logAppender.append( + LogUtils.generateInfo(s"job ${jobRequest.getId} success to initialize the properties \n") + ) + logger.info(s"job ${jobRequest.getId} success to initialize the properties") } } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala index 7c3935e69b..e053d3793c 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala @@ -223,4 +223,44 @@ object EntranceConfiguration { val ENABLE_ENTRANCE_DIRTY_DATA_CLEAR = CommonVars("linkis.entrance.auto.clean.dirty.data.enable", false) + val ENTRANCE_FAILOVER_ENABLED = CommonVars("linkis.entrance.failover.enable", true).getValue + + val ENTRANCE_FAILOVER_SCAN_INIT_TIME = + CommonVars("linkis.entrance.failover.scan.init.time", 3 * 1000).getValue + + val ENTRANCE_FAILOVER_SCAN_INTERVAL = + CommonVars("linkis.entrance.failover.scan.interval", 30 * 1000).getValue + + val ENTRANCE_FAILOVER_DATA_NUM_LIMIT = + CommonVars("linkis.entrance.failover.data.num.limit", 10).getValue + + val ENTRANCE_FAILOVER_DATA_INTERVAL_TIME = + CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("1d").toLong).getValue + + // if true, the waitForRetry job in runningJobs can be failover + val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED = + CommonVars("linkis.entrance.failover.retry.job.enable", false) + + val ENTRANCE_UPDATE_BATCH_SIZE = CommonVars("linkis.entrance.update.batch.size", 100) + + // if true, the job in ConsumeQueue can be failover + val ENTRANCE_SHUTDOWN_FAILOVER_CONSUME_QUEUE_ENABLED = + CommonVars("linkis.entrance.shutdown.failover.consume.queue.enable", true).getValue + + val ENTRANCE_GROUP_SCAN_ENABLED = CommonVars("linkis.entrance.group.scan.enable", true) + + val ENTRANCE_GROUP_SCAN_INIT_TIME = CommonVars("linkis.entrance.group.scan.init.time", 3 * 1000) + + val ENTRANCE_GROUP_SCAN_INTERVAL = CommonVars("linkis.entrance.group.scan.interval", 60 * 1000) + + val ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED = + CommonVars("linkis.entrance.failover.retain.metric.engine.conn.enable", false) + + val ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED = + CommonVars("linkis.entrance.failover.retain.metric.yarn.resource.enable", false) + + // if true, job whose status is running will be set to Cancelled + val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED = + CommonVars("linkis.entrance.failover.running.kill.enable", false) + } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala index 7a7cb7463a..a40c3fa35d 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala @@ -63,7 +63,7 @@ object CustomVariableUtils extends Logging { } val variableMap = TaskUtils .getVariableMap(jobRequest.getParams) - .asInstanceOf[util.HashMap[String, String]] + .asInstanceOf[util.Map[String, String]] variables.putAll(variableMap) if (!variables.containsKey("user")) { variables.put("user", jobRequest.getExecuteUser) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/UserCreatorIPCheckUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/UserCreatorIPCheckUtils.scala index 573c134493..653e9ad78b 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/UserCreatorIPCheckUtils.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/UserCreatorIPCheckUtils.scala @@ -67,7 +67,7 @@ object UserCreatorIPCheckUtils extends Logging { def checkUserIp(jobRequest: JobRequest, logAppender: lang.StringBuilder): JobRequest = { // Get IP address - val jobIp = jobRequest.getSource.get(TaskConstant.REQUEST_IP) + val jobIp = jobRequest.getSource.getOrDefault(TaskConstant.REQUEST_IP, "") logger.debug(s"start to checkTenantLabel $jobIp") if (StringUtils.isNotBlank(jobIp)) { jobRequest match { diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala new file mode 100644 index 0000000000..e2f0ab1d5a --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.scheduler + +import org.apache.linkis.common.utils.Utils +import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.entrance.job.EntranceExecutionJob +import org.apache.linkis.entrance.utils.JobHistoryHelper +import org.apache.linkis.scheduler.SchedulerContext +import org.apache.linkis.scheduler.queue.Group +import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer + +import java.util +import java.util.concurrent.ExecutorService + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter + +class EntranceFIFOUserConsumer( + schedulerContext: SchedulerContext, + executeService: ExecutorService, + private var group: Group +) extends FIFOUserConsumer(schedulerContext, executeService, group) { + + override def loop(): Unit = { + // When offlineFlag=true, the unsubmitted tasks will be failover, and the running tasks will wait for completion. + // In this case,super.loop only submits the retry task, but the retry task can failover and speed up the entrance offline + // (当offlineFlag=true时,未提交任务会被故障转移,运行中任务会等待完成.此时super.loop只会提交重试任务,但是重试任务完全可以故障转移,加快entrance下线) + schedulerContext match { + case entranceSchedulerContext: EntranceSchedulerContext => + if ( + entranceSchedulerContext.getOfflineFlag && EntranceConfiguration.ENTRANCE_FAILOVER_RETRY_JOB_ENABLED.getValue + ) { + val jobs = scanAllRetryJobsAndRemove() + if (!jobs.isEmpty) { + val ids = new util.ArrayList[Long]() + jobs.asScala.foreach { + case entranceJob: EntranceExecutionJob => + entranceJob.getLogWriter.foreach(_.close()) + ids.add(entranceJob.getJobRequest.getId) + case _ => + } + JobHistoryHelper.updateBatchInstancesEmpty(ids) + } + Utils.tryQuietly(Thread.sleep(5000)) + return + } + case _ => + } + + // general logic + super.loop() + + } + +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala index 7f16dd2463..0f31351b48 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala @@ -17,7 +17,6 @@ package org.apache.linkis.entrance.scheduler -import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.conf.{CommonVars, Configuration} import org.apache.linkis.common.utils.{Logging, Utils} import org.apache.linkis.entrance.conf.EntranceConfiguration @@ -28,16 +27,12 @@ import org.apache.linkis.governance.common.protocol.conf.{ RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig } -import org.apache.linkis.instance.label.client.InstanceLabelClient -import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext -import org.apache.linkis.manager.label.constant.{LabelKeyConstant, LabelValueConstant} import org.apache.linkis.manager.label.entity.Label import org.apache.linkis.manager.label.entity.engine.{ ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel } -import org.apache.linkis.manager.label.entity.route.RouteLabel import org.apache.linkis.manager.label.utils.LabelUtil import org.apache.linkis.protocol.constants.TaskConstant import org.apache.linkis.protocol.utils.TaskUtils @@ -157,40 +152,10 @@ class EntranceGroupFactory extends GroupFactory with Logging { } private def getUserMaxRunningJobs(keyAndValue: util.Map[String, String]): Int = { - var userDefinedRunningJobs = EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue) - var entranceNum = Sender.getInstances(Sender.getThisServiceInstance.getApplicationName).length - val labelList = new util.ArrayList[Label[_]]() - val offlineRouteLabel = LabelBuilderFactoryContext.getLabelBuilderFactory - .createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE) - labelList.add(offlineRouteLabel) - var offlineIns: Array[ServiceInstance] = null - Utils.tryAndWarn { - offlineIns = InstanceLabelClient.getInstance - .getInstanceFromLabel(labelList) - .asScala - .filter(l => - null != l && l.getApplicationName - .equalsIgnoreCase(Sender.getThisServiceInstance.getApplicationName) - ) - .toArray - } - if (null != offlineIns) { - logger.info(s"There are ${offlineIns.length} offlining instance.") - entranceNum = entranceNum - offlineIns.length - } - /* - Sender.getInstances may get 0 instances due to cache in Sender. So this instance is the one instance. - */ - if (0 >= entranceNum) { - logger.error( - s"Got ${entranceNum} ${Sender.getThisServiceInstance.getApplicationName} instances." - ) - entranceNum = 1 - } Math.max( EntranceConfiguration.ENTRANCE_INSTANCE_MIN.getValue, - userDefinedRunningJobs / entranceNum - ); + EntranceConfiguration.WDS_LINKIS_INSTANCE.getValue(keyAndValue) + ) } } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala new file mode 100644 index 0000000000..2cdee97cc7 --- /dev/null +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.entrance.scheduler + +import org.apache.linkis.common.ServiceInstance +import org.apache.linkis.common.utils.Utils +import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.instance.label.client.InstanceLabelClient +import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext +import org.apache.linkis.manager.label.constant.{LabelKeyConstant, LabelValueConstant} +import org.apache.linkis.manager.label.entity.Label +import org.apache.linkis.manager.label.entity.route.RouteLabel +import org.apache.linkis.rpc.Sender +import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer +import org.apache.linkis.scheduler.queue.parallelqueue.{ParallelConsumerManager, ParallelGroup} + +import java.util +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + +class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String) + extends ParallelConsumerManager(maxParallelismUsers, schedulerName) { + + override protected def createConsumer(groupName: String): FIFOUserConsumer = { + val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName) + new EntranceFIFOUserConsumer(getSchedulerContext, getOrCreateExecutorService, group) + } + + if (EntranceConfiguration.ENTRANCE_GROUP_SCAN_ENABLED.getValue) { + Utils.defaultScheduler.scheduleAtFixedRate( + new Runnable { + override def run(): Unit = { + Utils.tryAndError { + logger.info("start refresh consumer group maxAllowRunningJobs") + // get all entrance server from eureka + val serviceInstances = + Sender.getInstances(Sender.getThisServiceInstance.getApplicationName) + if (null == serviceInstances || serviceInstances.isEmpty) return + + // get all offline label server + val routeLabel = LabelBuilderFactoryContext.getLabelBuilderFactory + .createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE) + val labels = new util.ArrayList[Label[_]] + labels.add(routeLabel) + val labelInstances = InstanceLabelClient.getInstance.getInstanceFromLabel(labels) + + // get active entrance server + val allInstances = new util.ArrayList[ServiceInstance]() + allInstances.addAll(serviceInstances.toList.asJava) + allInstances.removeAll(labelInstances) + // refresh all group maxAllowRunningJobs + refreshAllGroupMaxAllowRunningJobs(allInstances.size()) + logger.info("Finished to refresh consumer group maxAllowRunningJobs") + } + } + }, + EntranceConfiguration.ENTRANCE_GROUP_SCAN_INIT_TIME.getValue, + EntranceConfiguration.ENTRANCE_GROUP_SCAN_INTERVAL.getValue, + TimeUnit.MILLISECONDS + ) + } + + def refreshAllGroupMaxAllowRunningJobs(validInsCount: Int): Unit = { + listConsumers() + .foreach(item => { + item.getGroup match { + case group: ParallelGroup => + val maxAllowRunningJobs = Math.round(group.getMaxRunningJobs / validInsCount) + group.setMaxAllowRunningJobs(maxAllowRunningJobs) + logger + .info( + "group {} refresh maxAllowRunningJobs => {}/{}={}", + Array( + group.getGroupName, + group.getMaxRunningJobs.toString, + validInsCount.toString, + maxAllowRunningJobs.toString + ): _* + ) + case _ => + } + }) + } + +} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceSchedulerContext.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceSchedulerContext.scala index d5de2cc2da..1638b0fb1c 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceSchedulerContext.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceSchedulerContext.scala @@ -28,6 +28,11 @@ class EntranceSchedulerContext extends SchedulerContext { private var consumerManager: ConsumerManager = _ private var executorManager: ExecutorManager = _ + private var offlineFlag: Boolean = false + + def setOfflineFlag(offlineFlag: Boolean): Unit = this.offlineFlag = offlineFlag + def getOfflineFlag: Boolean = this.offlineFlag + def this( groupFactory: GroupFactory, consumerManager: ConsumerManager, diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala index ec29128889..44e2357b34 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala @@ -69,6 +69,11 @@ object JobHistoryHelper extends Logging { else task.getStatus } + def getProgressByTaskID(taskID: Long): String = { + val task = getTaskByTaskID(taskID) + if (task == null) "0" else task.getProgress + } + def getRequestIpAddr(req: HttpServletRequest): String = { val addrList = List( Option(req.getHeader("x-forwarded-for")).getOrElse("").split(",")(0), @@ -123,7 +128,144 @@ object JobHistoryHelper extends Logging { sender.ask(jobReqBatchUpdate) } - private def getTaskByTaskID(taskID: Long): JobRequest = { + /** + * Get all consume queue task and batch update instances(获取所有消费队列中的任务进行批量更新) + * + * @param taskIdList + * @param retryWhenUpdateFail + */ + def updateAllConsumeQueueTask( + taskIdList: util.List[Long], + retryWhenUpdateFail: Boolean = false + ): Unit = { + + if (taskIdList.isEmpty) return + + val updateTaskIds = new util.ArrayList[Long]() + + if ( + EntranceConfiguration.ENTRANCE_UPDATE_BATCH_SIZE.getValue > 0 && + taskIdList.size() > EntranceConfiguration.ENTRANCE_UPDATE_BATCH_SIZE.getValue + ) { + for (i <- 0 until EntranceConfiguration.ENTRANCE_UPDATE_BATCH_SIZE.getValue) { + updateTaskIds.add(taskIdList.get(i)) + } + } else { + updateTaskIds.addAll(taskIdList) + } + val list = new util.ArrayList[Long]() + list.addAll(taskIdList) + try { + val successTaskIds = updateBatchInstancesEmpty(updateTaskIds) + if (retryWhenUpdateFail) { + list.removeAll(successTaskIds) + } else { + list.removeAll(updateTaskIds) + } + } catch { + case e: Exception => + logger.warn("update batch instances failed, wait for retry", e) + Thread.sleep(1000) + } + updateAllConsumeQueueTask(list, retryWhenUpdateFail) + + } + + /** + * Batch update instances(批量更新instances字段) + * + * @param taskIdList + * @return + */ + def updateBatchInstancesEmpty(taskIdList: util.List[Long]): util.List[Long] = { + val jobReqList = new util.ArrayList[JobRequest]() + taskIdList.asScala.foreach(taskID => { + val jobRequest = new JobRequest + jobRequest.setId(taskID) + jobRequest.setInstances("") + jobReqList.add(jobRequest) + }) + val jobReqBatchUpdate = JobReqBatchUpdate(jobReqList) + Utils.tryCatch { + val response = sender.ask(jobReqBatchUpdate) + response match { + case resp: util.List[JobRespProtocol] => + // todo filter success data, rpc have bug +// resp.asScala +// .filter(r => +// r.getStatus == SUCCESS_FLAG && r.getData.containsKey(JobRequestConstants.JOB_ID) +// ) +// .map(_.getData.get(JobRequestConstants.JOB_ID).asInstanceOf[java.lang.Long]) +// .toList + + taskIdList + case _ => + throw JobHistoryFailedException( + "update batch instances from jobhistory not a correct List type" + ) + } + } { + case errorException: ErrorException => throw errorException + case e: Exception => + val e1 = + JobHistoryFailedException( + s"update batch instances ${taskIdList.asScala.mkString(",")} error" + ) + e1.initCause(e) + throw e + } + } + + /** + * query wait for failover task(获取待故障转移的任务) + * + * @param reqMap + * @param statusList + * @param startTimestamp + * @param limit + * @return + */ + def queryWaitForFailoverTask( + reqMap: util.Map[String, java.lang.Long], + statusList: util.List[String], + startTimestamp: Long, + limit: Int + ): util.List[JobRequest] = { + val requestFailoverJob = RequestFailoverJob(reqMap, statusList, startTimestamp, limit) + val tasks = Utils.tryCatch { + val response = sender.ask(requestFailoverJob) + response match { + case responsePersist: JobRespProtocol => + val status = responsePersist.getStatus + if (status != SUCCESS_FLAG) { + logger.error(s"query from jobHistory status failed, status is $status") + throw JobHistoryFailedException("query from jobHistory status failed") + } + val data = responsePersist.getData + data.get(JobRequestConstants.JOB_HISTORY_LIST) match { + case tasks: List[JobRequest] => + tasks.asJava + case _ => + throw JobHistoryFailedException( + s"query from jobhistory not a correct List type, instances ${reqMap.keySet()}" + ) + } + case _ => + logger.error("get query response incorrectly") + throw JobHistoryFailedException("get query response incorrectly") + } + } { + case errorException: ErrorException => throw errorException + case e: Exception => + val e1 = + JobHistoryFailedException(s"query failover task error, instances ${reqMap.keySet()} ") + e1.initCause(e) + throw e + } + tasks + } + + def getTaskByTaskID(taskID: Long): JobRequest = { val jobRequest = new JobRequest jobRequest.setId(taskID) jobRequest.setSource(null) @@ -176,15 +318,15 @@ object JobHistoryHelper extends Logging { val ecResourceMap = if (resourceInfo == null) new util.HashMap[String, ResourceWithStatus] else resourceInfo if (resourceMap != null) { - resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap) + resourceMap.asInstanceOf[util.Map[String, ResourceWithStatus]].putAll(ecResourceMap) } else { metricsMap.put(TaskConstant.JOB_YARNRESOURCE, ecResourceMap) } - var engineInstanceMap: util.HashMap[String, AnyRef] = null + var engineInstanceMap: util.Map[String, AnyRef] = null if (metricsMap.containsKey(TaskConstant.JOB_ENGINECONN_MAP)) { engineInstanceMap = metricsMap .get(TaskConstant.JOB_ENGINECONN_MAP) - .asInstanceOf[util.HashMap[String, AnyRef]] + .asInstanceOf[util.Map[String, AnyRef]] } else { engineInstanceMap = new util.HashMap[String, AnyRef]() metricsMap.put(TaskConstant.JOB_ENGINECONN_MAP, engineInstanceMap) @@ -194,7 +336,7 @@ object JobHistoryHelper extends Logging { val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String] val engineExtraInfoMap = engineInstanceMap .getOrDefault(ticketId, new util.HashMap[String, AnyRef]) - .asInstanceOf[util.HashMap[String, AnyRef]] + .asInstanceOf[util.Map[String, AnyRef]] engineExtraInfoMap.putAll(infoMap) engineInstanceMap.put(ticketId, engineExtraInfoMap) } else { diff --git a/linkis-dist/helm/charts/linkis/templates/configmap-linkis-config.yaml b/linkis-dist/helm/charts/linkis/templates/configmap-linkis-config.yaml index e6f419744c..6437188298 100644 --- a/linkis-dist/helm/charts/linkis/templates/configmap-linkis-config.yaml +++ b/linkis-dist/helm/charts/linkis/templates/configmap-linkis-config.yaml @@ -241,8 +241,8 @@ data: wds.linkis.gateway.conf.enable.token.auth=true wds.linkis.is.gateway=true wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml - wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity - wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao + wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity,org.apache.linkis.jobhistory.entity + wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao,org.apache.linkis.jobhistory.dao wds.linkis.label.entity.packages=org.apache.linkis.gateway.ujes.route.label wds.linkis.login_encrypt.enable=false ##LDAP:q @@ -317,6 +317,11 @@ data: wds.linkis.resultSet.store.path=hdfs://{{ .Values.linkis.locations.runtimeDir }} {{- end }} + ##mybatis + wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml + wds.linkis.server.mybatis.BasePackage=org.apache.linkis.publicservice.common.lock.dao + wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.publicservice.common.lock.entity + ##Spring spring.server.port={{ .Values.cgEntrance.port }} diff --git a/linkis-dist/package/conf/linkis-cg-entrance.properties b/linkis-dist/package/conf/linkis-cg-entrance.properties index bc43125b1d..4e2741e13e 100644 --- a/linkis-dist/package/conf/linkis-cg-entrance.properties +++ b/linkis-dist/package/conf/linkis-cg-entrance.properties @@ -35,5 +35,5 @@ wds.linkis.entrance.user.creator.ip.interceptor.switch=false ## you may set service version if you want to distinguish different configuration version spring.eureka.instance.metadata-map.linkis.conf.version=v1 -## clean dirty data when the entrance start -linkis.entrance.auto.clean.dirty.data.enable=true \ No newline at end of file +wds.linkis.server.mybatis.BasePackage=org.apache.linkis.publicservice.common.lock.dao +wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.publicservice.common.lock.entity \ No newline at end of file diff --git a/linkis-dist/package/conf/linkis-mg-gateway.properties b/linkis-dist/package/conf/linkis-mg-gateway.properties index ef2d740c40..1f1d2416b4 100644 --- a/linkis-dist/package/conf/linkis-mg-gateway.properties +++ b/linkis-dist/package/conf/linkis-mg-gateway.properties @@ -20,8 +20,8 @@ wds.linkis.gateway.conf.enable.proxy.user=false wds.linkis.gateway.conf.url.pass.auth=/dss/ wds.linkis.gateway.conf.enable.token.auth=true wds.linkis.is.gateway=true -wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity -wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao +wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity,org.apache.linkis.jobhistory.entity +wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao,org.apache.linkis.jobhistory.dao wds.linkis.label.entity.packages=org.apache.linkis.gateway.ujes.route.label wds.linkis.login_encrypt.enable=false ##LDAP diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala index 50dbef632c..10f3a64d13 100644 --- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala +++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala @@ -48,7 +48,7 @@ object OrchestratorConfiguration { CommonVars("wds.linkis.orchestrator.execution.task.max.parallelism", 5) val TASK_RUNNER_MAX_SIZE = - CommonVars("wds.linkis.orchestrator.execution.task.runner.max.size", 200) + CommonVars("wds.linkis.orchestrator.execution.task.runner.max.size", 1000) val EXEC_RUNNER_FACTORY_CLASS = CommonVars("wds.linkis.orchestrator.exec.task.runner.factory.class", "") diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java index 7bb7656346..806d8ec70c 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.List; +import java.util.Map; public interface JobHistoryMapper { @@ -108,4 +109,27 @@ Integer countUndoneTaskWithCreatorOnly( void updateJobHistoryCancelById( @Param("idList") List idList, @Param("errorDesc") String errorDesc); + + /** + * query wait for failover job + * + *

Sql example: SELECT a.* FROM linkis_ps_job_history_group_history a where (a.instances = '' + * or a.instances is null or a.instances not in ('192.168.1.123:9104','192.168.1.124:9104') or + * EXISTS ( select 1 from ( select '192.168.1.123:9104' as instances, 1697775054098 as + * registryTime union all select '192.168.1.124:9104' as instances, 1666239054098 as registryTime + * ) b where a.instances = b.instances and a.created_time < FROM_UNIXTIME(b.registryTime/1000) ) ) + * and status in ('Inited','Running','Scheduled','WaitForRetry') and a.created_time >= + * FROM_UNIXTIME(1666239054098/1000) limit 10 + * + * @param instancesMap + * @param statusList + * @param startTimestamp + * @param limit + * @return + */ + List selectFailoverJobHistory( + @Param("instancesMap") Map instancesMap, + @Param("statusList") List statusList, + @Param("startTimestamp") Long startTimestamp, + @Param("limit") Integer limit); } diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml index 7a81b6c87a..509acdd08b 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/mysql/JobHistoryMapper.xml @@ -231,4 +231,27 @@ #{id} + + diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml index 30e4e85b34..f7e75dea0e 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml @@ -229,4 +229,26 @@ #{id} + diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java index 433cbe0474..b8731554d4 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/JobHistoryQueryService.java @@ -38,6 +38,8 @@ public interface JobHistoryQueryService { JobRespProtocol query(JobReqQuery jobReqQuery); + JobRespProtocol queryFailoverJobs(RequestFailoverJob requestFailoverJob); + JobHistory getJobHistoryByIdAndName(Long jobID, String userName); List search(Long jobId, String username, String creator, String status, Date sDate, Date eDate, String engineType, Long startJobId, String instance); diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala index f00abc5568..59801bd333 100644 --- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala +++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala @@ -113,7 +113,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { logger.info(s"${jobReq.getErrorDesc}") } } - if (jobReq.getStatus != null) { + if (jobReq.getUpdateOrderFlag && jobReq.getStatus != null) { val oldStatus: String = jobHistoryMapper.selectJobHistoryStatusForUpdate(jobReq.getId) if (oldStatus != null && !shouldUpdate(oldStatus, jobReq.getStatus)) { throw new QueryException( @@ -178,7 +178,7 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { logger.info(s"${jobReq.getErrorDesc}") } } - if (jobReq.getStatus != null) { + if (jobReq.getUpdateOrderFlag && jobReq.getStatus != null) { val oldStatus: String = jobHistoryMapper.selectJobHistoryStatusForUpdate(jobReq.getId) if (oldStatus != null && !shouldUpdate(oldStatus, jobReq.getStatus)) { throw new QueryException( @@ -247,6 +247,30 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging { jobResp } + @Receiver + override def queryFailoverJobs(requestFailoverJob: RequestFailoverJob): JobRespProtocol = { + val reqMap = requestFailoverJob.reqMap + val statusList = requestFailoverJob.statusList + val startTimestamp = requestFailoverJob.startTimestamp + val limit = requestFailoverJob.limit + logger.info(s"query failover jobs, start timestamp:${startTimestamp}, limit:${limit}") + val jobResp = new JobRespProtocol + Utils.tryCatch { + val jobList = + jobHistoryMapper.selectFailoverJobHistory(reqMap, statusList, startTimestamp, limit) + val jobReqList = jobList.asScala.map(jobHistory2JobRequest).toList + val map = new util.HashMap[String, Object]() + map.put(JobRequestConstants.JOB_HISTORY_LIST, jobReqList) + jobResp.setStatus(0) + jobResp.setData(map) + } { case e: Exception => + logger.error(s"Failed to query failover job, instances ${reqMap.keySet()}", e) + jobResp.setStatus(1) + jobResp.setMsg(ExceptionUtils.getRootCauseMessage(e)) + } + jobResp + } + override def getJobHistoryByIdAndName(jobId: java.lang.Long, userName: String): JobHistory = { val jobReq = new JobHistory jobReq.setId(jobId) diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/pom.xml b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/pom.xml index b089c4bfe9..344fce259c 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/pom.xml +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/pom.xml @@ -81,6 +81,13 @@ ${project.version} + + + org.apache.linkis + linkis-jobhistory + ${project.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala index 2ee0f4b023..04f206d6f6 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala @@ -22,12 +22,25 @@ import org.apache.linkis.gateway.config.GatewayConfiguration import org.apache.linkis.gateway.http.GatewayContext import org.apache.linkis.gateway.parser.AbstractGatewayParser import org.apache.linkis.gateway.ujes.parser.EntranceExecutionGatewayParser._ +import org.apache.linkis.jobhistory.service.JobHistoryQueryService +import org.apache.linkis.protocol.engine.JobInstance import org.apache.linkis.protocol.utils.ZuulEntranceUtils +import org.apache.linkis.rpc.interceptor.ServiceInstanceUtils +import org.apache.linkis.server.BDPJettyServerHelper +import org.apache.linkis.server.conf.ServerConfiguration + +import org.apache.commons.lang3.StringUtils import org.springframework.stereotype.Component +import javax.annotation.Resource + @Component class EntranceRequestGatewayParser extends AbstractGatewayParser { + + @Resource + private var jobHistoryQueryService: JobHistoryQueryService = _ + override def shouldContainRequestBody(gatewayContext: GatewayContext): Boolean = false override def parse(gatewayContext: GatewayContext): Unit = @@ -50,13 +63,57 @@ class EntranceRequestGatewayParser extends AbstractGatewayParser { } else { ServiceInstance(GatewayConfiguration.ENTRANCE_SPRING_NAME.getValue, null) } - } else { + } else if (execId.startsWith(ZuulEntranceUtils.EXEC_ID)) { + // parse by execId ZuulEntranceUtils.parseServiceInstanceByExecID(execId)(0) + } else { + // build JobInstance by taskId + val jobInstance = buildJobInstance(execId.toLong, gatewayContext) + if (jobInstance == null) return + val str = BDPJettyServerHelper.gson.toJson(jobInstance) + gatewayContext.getRequest.addHeader( + ServerConfiguration.LINKIS_SERVER_ENTRANCE_HEADER_KEY.getValue, + Array(str) + ) + + ServiceInstance(GatewayConfiguration.ENTRANCE_SPRING_NAME.getValue, jobInstance.instances) } gatewayContext.getGatewayRoute.setServiceInstance(serviceInstance) case _ => } + def buildJobInstance(taskId: Long, gatewayContext: GatewayContext): JobInstance = { + val histories = + jobHistoryQueryService.search(taskId, null, null, null, null, null, null, null, null) + if (histories.isEmpty) { + sendErrorResponse(s"taskId $taskId is not exists.", gatewayContext) + return null + } + val history = histories.get(0) + if (StringUtils.isEmpty(history.getInstances)) { + return JobInstance( + history.getStatus, + null, + history.getJobReqId, + history.getCreatedTime.getTime, + Long.MaxValue + ) + } + val activeInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances( + GatewayConfiguration.ENTRANCE_SPRING_NAME.getValue + ) + val instance = activeInstances + .find(_.getInstance.equals(history.getInstances)) + .getOrElse(ServiceInstance(null, null, Long.MaxValue)) + JobInstance( + history.getStatus, + instance.getInstance, + history.getJobReqId, + history.getCreatedTime.getTime, + instance.getRegistryTimestamp + ) + } + } object EntranceRequestGatewayParser { diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/http/SpringCloudGatewayHttpRequest.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/http/SpringCloudGatewayHttpRequest.scala index d591e5ce94..929ed6ae62 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/http/SpringCloudGatewayHttpRequest.scala +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/scala/org/apache/linkis/gateway/springcloud/http/SpringCloudGatewayHttpRequest.scala @@ -87,8 +87,10 @@ class SpringCloudGatewayHttpRequest(request: AbstractServerHttpRequest) extends override def getHeaders: JMap[String, Array[String]] = headers - override def addHeader(headerName: String, headers: Array[String]): Unit = + override def addHeader(headerName: String, headers: Array[String]): Unit = { + this.headers.put(headerName, headers) addHeaders.put(headerName, headers) + } override def addCookie(cookieName: String, cookies: Array[Cookie]): Unit = { this.cookies.put(cookieName, cookies)