Skip to content

Commit

Permalink
[Feature] Entrance support ha and failover task
Browse files Browse the repository at this point in the history
[Feature] Entrance support ha and failover task
  • Loading branch information
peacewong authored Nov 19, 2023
2 parents b6af5f0 + 9f4a9c8 commit 0453b07
Show file tree
Hide file tree
Showing 41 changed files with 1,672 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package org.apache.linkis.common
class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

def getRegistryTimestamp: Long = registryTimestamp

override def equals(other: Any): Boolean = other match {
case that: ServiceInstance =>
applicationName == that.applicationName &&
Expand All @@ -42,7 +48,9 @@ class ServiceInstance {
.foldLeft(0)((a, b) => 31 * a + b)
}

override def toString: String = s"ServiceInstance($applicationName, $instance)"
override def toString: String =
s"ServiceInstance($applicationName, $instance, $registryTimestamp)"

}

object ServiceInstance {
Expand All @@ -54,6 +62,14 @@ object ServiceInstance {
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
serviceInstance.setInstance(instance)
serviceInstance.setRegistryTimestamp(registryTimestamp)
serviceInstance
}

def unapply(serviceInstance: ServiceInstance): Option[(String, String)] =
if (serviceInstance != null) {
Some(serviceInstance.applicationName, serviceInstance.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import scala.collection.mutable

object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()

Expand Down Expand Up @@ -101,7 +103,14 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
else {
// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
val map = mutable.Map[String, String]()
codeTypeAndRunTypeRelationMap.foreach(kv => {
kv._2.foreach(v => map.put(v, kv._1))
})
map.toMap
}
}

def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,7 @@ object ServerConfiguration extends Logging {
val LINKIS_SERVER_SESSION_PROXY_TICKETID_KEY =
CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1")

val LINKIS_SERVER_ENTRANCE_HEADER_KEY =
CommonVars("linkis.server.entrance.header.key", "jobInstanceKey")

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface TaskConstant {
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
String FAILOVER_FLAG = "failoverFlag";
String DEBUG_ENBALE = "debug.enable";

String PARAMS_DATA_SOURCE = "dataSources";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.protocol.engine

case class JobInstance(
status: String,
instances: String,
jobReqId: String,
createTimestamp: Long,
instanceRegistryTimestamp: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object ZuulEntranceUtils {

private val INSTANCE_SPLIT_TOKEN = "_"

private val EXEC_ID = "exec_id"
val EXEC_ID = "exec_id"

private val SPLIT_LEN = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ abstract class AbstractGroup extends Group {

private var _status: GroupStatus = _
private var maxRunningJobs: Int = _
private var maxAllowRunningJobs: Int = 0
private var maxAskExecutorTimes: Long = 0L

def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
def getMaxRunningJobs: Int = maxRunningJobs

def setMaxAllowRunningJobs(maxAllowRunningJobs: Int): Unit = this.maxAllowRunningJobs =
maxAllowRunningJobs

def getMaxAllowRunningJobs: Int =
if (maxAllowRunningJobs <= 0) maxRunningJobs else Math.min(maxAllowRunningJobs, maxRunningJobs)

def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
maxAskExecutorTimes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ object SchedulerEventState extends Enumeration {
SchedulerEventState.withName(jobState)
)

def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited

def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState))

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.linkis.scheduler.executer.Executor
import org.apache.linkis.scheduler.future.{BDPFuture, BDPFutureTask}
import org.apache.linkis.scheduler.queue._

import java.util
import java.util.concurrent.{ExecutorService, Future}

import scala.beans.BeanProperty
Expand Down Expand Up @@ -122,9 +123,10 @@ class FIFOUserConsumer(
}
var event: Option[SchedulerEvent] = getWaitForRetryEvent
if (event.isEmpty) {
val completedNums = runningJobs.filter(job => job == null || job.isCompleted)
if (completedNums.length < 1) {
Utils.tryQuietly(Thread.sleep(1000))
val maxAllowRunningJobs = fifoGroup.getMaxAllowRunningJobs
val currentRunningJobs = runningJobs.count(e => e != null && !e.isCompleted)
if (maxAllowRunningJobs <= currentRunningJobs) {
Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
return
}
while (event.isEmpty) {
Expand Down Expand Up @@ -207,6 +209,19 @@ class FIFOUserConsumer(
runningJobs(index) = job
}

protected def scanAllRetryJobsAndRemove(): util.List[Job] = {
val jobs = new util.ArrayList[Job]()
for (index <- runningJobs.indices) {
val job = runningJobs(index)
if (job != null && job.isJobCanRetry) {
jobs.add(job)
runningJobs(index) = null
logger.info(s"Job $job can retry, remove from runningJobs")
}
}
jobs
}

override def shutdown(): Unit = {
future.cancel(true)
val waitEvents = queue.getWaitingEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class JobRequest {
/** result location */
private String resultLocation;

/** Task status updates is ordered, if false, not checked */
private Boolean updateOrderFlag = true;

private String observeInfo;

private Map<String, Object> metrics = new HashMap<>();
Expand Down Expand Up @@ -205,6 +208,14 @@ public void setObserveInfo(String observeInfo) {
this.observeInfo = observeInfo;
}

public Boolean getUpdateOrderFlag() {
return updateOrderFlag;
}

public void setUpdateOrderFlag(Boolean updateOrderFlag) {
this.updateOrderFlag = updateOrderFlag;
}

@Override
public String toString() {
return "JobRequest{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,10 @@ class RequestOneJob extends JobReq {
}

case class RequestAllJob(instance: String) extends JobReq

case class RequestFailoverJob(
reqMap: util.Map[String, java.lang.Long],
statusList: util.List[String],
startTimestamp: Long,
limit: Int = 10
) extends JobReq
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TaskExecutionServiceImpl
if (!lockService.isLockExist(requestTask.getLock)) {
logger.error(s"Lock ${requestTask.getLock} not exist, cannot execute.")
return ErrorExecuteResponse(
"Lock not exixt",
"Lock not exist",
new EngineConnExecutorErrorException(
EngineConnExecutorErrorCode.INVALID_LOCK,
"Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)."
Expand Down
6 changes: 6 additions & 0 deletions linkis-computation-governance/linkis-entrance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.linkis</groupId>
<artifactId>linkis-ps-common-lock</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.linkis.entrance.persistence.QueryPersistenceManager;
import org.apache.linkis.entrance.persistence.ResultSetEngine;
import org.apache.linkis.entrance.scheduler.EntranceGroupFactory;
import org.apache.linkis.entrance.scheduler.EntranceParallelConsumerManager;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder;
import org.apache.linkis.orchestrator.ecm.EngineConnManagerBuilder$;
Expand All @@ -51,7 +52,6 @@
import org.apache.linkis.scheduler.executer.ExecutorManager;
import org.apache.linkis.scheduler.queue.ConsumerManager;
import org.apache.linkis.scheduler.queue.GroupFactory;
import org.apache.linkis.scheduler.queue.parallelqueue.ParallelConsumerManager;
import org.apache.linkis.scheduler.queue.parallelqueue.ParallelScheduler;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -190,7 +190,7 @@ public GroupFactory groupFactory() {
@Bean
@ConditionalOnMissingBean
public ConsumerManager consumerManager() {
return new ParallelConsumerManager(
return new EntranceParallelConsumerManager(
ENTRANCE_SCHEDULER_MAX_PARALLELISM_USERS().getValue(), "EntranceJobScheduler");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ private ServiceNameConsts() {}
public static final String ENTRANCE_SERVER = "entranceServer";

public static final String ENTRANCE_INTERCEPTOR = "entranceInterceptors";

public static final String ENTRANCE_FAILOVER_SERVER = "entranceFailoverServer";
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ public enum EntranceErrorCodeSummary implements LinkisErrorCode {
SHELL_BLACKLISTED_CODE(50081, "Shell code contains blacklisted code(shell中包含黑名单代码)"),
JOB_HISTORY_FAILED_ID(50081, ""),

LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)");
LOGPATH_NOT_NULL(20301, "The logPath cannot be empty(日志路径不能为空)"),

FAILOVER_RUNNING_TO_CANCELLED(
30001,
"Job {0} failover, status changed from Running to Cancelled (任务故障转移,状态从Running变更为Cancelled)");

/** (errorCode)错误码 */
private final int errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
package org.apache.linkis.entrance.restful;

import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.scheduler.EntranceSchedulerContext;
import org.apache.linkis.instance.label.client.InstanceLabelClient;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.manager.label.constant.LabelValueConstant;
import org.apache.linkis.protocol.label.InsLabelRefreshRequest;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.SchedulerContext;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.utils.ModuleUserUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import javax.servlet.http.HttpServletRequest;
Expand All @@ -46,6 +50,12 @@
public class EntranceLabelRestfulApi {

private static final Logger logger = LoggerFactory.getLogger(EntranceLabelRestfulApi.class);
private EntranceServer entranceServer;

@Autowired
public void setEntranceServer(EntranceServer entranceServer) {
this.entranceServer = entranceServer;
}

@ApiOperation(value = "update", notes = "update route label", response = Message.class)
@ApiOperationSupport(ignoreParameters = {"jsonNode"})
Expand Down Expand Up @@ -79,6 +89,16 @@ public Message updateRouteLabel(HttpServletRequest req) {
insLabelRefreshRequest.setServiceInstance(Sender.getThisServiceInstance());
InstanceLabelClient.getInstance().refreshLabelsToInstance(insLabelRefreshRequest);
logger.info("Finished to modify the routelabel of entry to offline");

logger.info("Prepare to update all not execution task instances to empty string");
SchedulerContext schedulerContext =
entranceServer.getEntranceContext().getOrCreateScheduler().getSchedulerContext();
if (schedulerContext instanceof EntranceSchedulerContext) {
((EntranceSchedulerContext) schedulerContext).setOfflineFlag(true);
}
entranceServer.updateAllNotExecutionTaskInstances(true);
logger.info("Finished to update all not execution task instances to empty string");

return Message.ok();
}
}
Loading

0 comments on commit 0453b07

Please sign in to comment.