Skip to content

Commit

Permalink
Merge branch 'master' into master-nebula-pom
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Nov 28, 2023
2 parents ee0aa1f + b704a2a commit a51fef5
Show file tree
Hide file tree
Showing 378 changed files with 21,916 additions and 2,203 deletions.
2 changes: 1 addition & 1 deletion .github/actions/chart-testing-action
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package org.apache.linkis.common
class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

def getRegistryTimestamp: Long = registryTimestamp

override def equals(other: Any): Boolean = other match {
case that: ServiceInstance =>
applicationName == that.applicationName &&
Expand All @@ -42,7 +48,9 @@ class ServiceInstance {
.foldLeft(0)((a, b) => 31 * a + b)
}

override def toString: String = s"ServiceInstance($applicationName, $instance)"
override def toString: String =
s"ServiceInstance($applicationName, $instance, $registryTimestamp)"

}

object ServiceInstance {
Expand All @@ -54,6 +62,14 @@ object ServiceInstance {
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
serviceInstance.setInstance(instance)
serviceInstance.setRegistryTimestamp(registryTimestamp)
serviceInstance
}

def unapply(serviceInstance: ServiceInstance): Option[(String, String)] =
if (serviceInstance != null) {
Some(serviceInstance.applicationName, serviceInstance.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,20 @@ private[conf] object BDPConfiguration extends Logging {

private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
if (value.isEmpty || value.exists(StringUtils.isEmpty)) return Option(defaultValue)
val trimValue = value.map(_.trim)
val formattedValue = defaultValue match {
case _: String => value
case _: Byte => value.map(_.toByte)
case _: Short => value.map(_.toShort)
case _: Char => value.map(_.toCharArray.apply(0))
case _: Int => value.map(_.toInt)
case _: Long => value.map(_.toLong)
case _: Float => value.map(_.toFloat)
case _: Double => value.map(_.toDouble)
case _: Boolean => value.map(_.toBoolean)
case _: TimeType => value.map(new TimeType(_))
case _: ByteType => value.map(new ByteType(_))
case null => value
case _: String => trimValue
case _: Byte => trimValue.map(_.toByte)
case _: Short => trimValue.map(_.toShort)
case _: Char => trimValue.map(_.toCharArray.apply(0))
case _: Int => trimValue.map(_.toInt)
case _: Long => trimValue.map(_.toLong)
case _: Float => trimValue.map(_.toFloat)
case _: Double => trimValue.map(_.toDouble)
case _: Boolean => trimValue.map(_.toBoolean)
case _: TimeType => trimValue.map(new TimeType(_))
case _: ByteType => trimValue.map(new ByteType(_))
case null => trimValue
}
formattedValue.asInstanceOf[Option[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object LogUtils {
}

def generateERROR(rawLog: String): String = {
getTimeFormat + " " + "ERROR" + " " + rawLog
getTimeFormat + " " + ERROR_STR + " " + rawLog
}

def generateWarn(rawLog: String): String = {
Expand All @@ -52,4 +52,6 @@ object LogUtils {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}

val ERROR_STR = "ERROR"

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import scala.collection.mutable

object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()

Expand Down Expand Up @@ -101,7 +103,14 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
else {
// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
val map = mutable.Map[String, String]()
codeTypeAndRunTypeRelationMap.foreach(kv => {
kv._2.foreach(v => map.put(v, kv._1))
})
map.toMap
}
}

def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,7 @@ object ServerConfiguration extends Logging {
val LINKIS_SERVER_SESSION_PROXY_TICKETID_KEY =
CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1")

val LINKIS_SERVER_ENTRANCE_HEADER_KEY =
CommonVars("linkis.server.entrance.header.key", "jobInstanceKey")

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object RedisClient {
SessionHAConfiguration.RedisHost,
SessionHAConfiguration.RedisPort,
redisTimeout,
SessionHAConfiguration.RedisSentinalServer
SessionHAConfiguration.RedisPassword
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface TaskConstant {
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
String FAILOVER_FLAG = "failoverFlag";
String DEBUG_ENBALE = "debug.enable";

String PARAMS_DATA_SOURCE = "dataSources";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.protocol.engine

case class JobInstance(
status: String,
instances: String,
jobReqId: String,
createTimestamp: Long,
instanceRegistryTimestamp: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ object TaskUtils {
}
} else params.put(key, waitToAdd)

private def clearMap(params: util.Map[String, AnyRef], key: String): Unit =
if (params != null && params.containsKey(key)) {
params.get(key) match {
case map: util.Map[String, AnyRef] => map.clear()
case _ => params.put(key, new util.HashMap[String, AnyRef]())
}
}

private def getConfigurationMap(
params: util.Map[String, AnyRef],
key: String
Expand Down Expand Up @@ -84,13 +92,20 @@ object TaskUtils {
def addStartupMap(params: util.Map[String, AnyRef], startupMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, startupMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)

def clearStartupMap(params: util.Map[String, AnyRef]): Unit = {
val configurationMap = getMap(params, TaskConstant.PARAMS_CONFIGURATION)
if (!configurationMap.isEmpty) {
clearMap(configurationMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)
}
}

def addRuntimeMap(params: util.Map[String, AnyRef], runtimeMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, runtimeMap, TaskConstant.PARAMS_CONFIGURATION_RUNTIME)

def addSpecialMap(params: util.Map[String, AnyRef], specialMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, specialMap, TaskConstant.PARAMS_CONFIGURATION_SPECIAL)

// tdoo
// todo
def getLabelsMap(params: util.Map[String, AnyRef]): util.Map[String, AnyRef] =
getMap(params, TaskConstant.LABELS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object ZuulEntranceUtils {

private val INSTANCE_SPLIT_TOKEN = "_"

private val EXEC_ID = "exec_id"
val EXEC_ID = "exec_id"

private val SPLIT_LEN = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ abstract class AbstractGroup extends Group {

private var _status: GroupStatus = _
private var maxRunningJobs: Int = _
private var maxAllowRunningJobs: Int = 0
private var maxAskExecutorTimes: Long = 0L

def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
def getMaxRunningJobs: Int = maxRunningJobs

def setMaxAllowRunningJobs(maxAllowRunningJobs: Int): Unit = this.maxAllowRunningJobs =
maxAllowRunningJobs

def getMaxAllowRunningJobs: Int =
if (maxAllowRunningJobs <= 0) maxRunningJobs else Math.min(maxAllowRunningJobs, maxRunningJobs)

def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
maxAskExecutorTimes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ object SchedulerEventState extends Enumeration {
SchedulerEventState.withName(jobState)
)

def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited

def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState))

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.linkis.scheduler.executer.Executor
import org.apache.linkis.scheduler.future.{BDPFuture, BDPFutureTask}
import org.apache.linkis.scheduler.queue._

import java.util
import java.util.concurrent.{ExecutorService, Future}

import scala.beans.BeanProperty
Expand Down Expand Up @@ -122,9 +123,10 @@ class FIFOUserConsumer(
}
var event: Option[SchedulerEvent] = getWaitForRetryEvent
if (event.isEmpty) {
val completedNums = runningJobs.filter(job => job == null || job.isCompleted)
if (completedNums.length < 1) {
Utils.tryQuietly(Thread.sleep(1000))
val maxAllowRunningJobs = fifoGroup.getMaxAllowRunningJobs
val currentRunningJobs = runningJobs.count(e => e != null && !e.isCompleted)
if (maxAllowRunningJobs <= currentRunningJobs) {
Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化
return
}
while (event.isEmpty) {
Expand Down Expand Up @@ -207,6 +209,19 @@ class FIFOUserConsumer(
runningJobs(index) = job
}

protected def scanAllRetryJobsAndRemove(): util.List[Job] = {
val jobs = new util.ArrayList[Job]()
for (index <- runningJobs.indices) {
val job = runningJobs(index)
if (job != null && job.isJobCanRetry) {
jobs.add(job)
runningJobs(index) = null
logger.info(s"Job $job can retry, remove from runningJobs")
}
}
jobs
}

override def shutdown(): Unit = {
future.cancel(true)
val waitEvents = queue.getWaitingEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,30 @@ public static void createNewFileWithFileSystem(
}
}

/**
* create new file and set file owner by FileSystem
*
* @param fileSystem
* @param filePath
* @param user
* @param createParentWhenNotExists
*/
public static void createNewFileAndSetOwnerWithFileSystem(
FileSystem fileSystem, FsPath filePath, String user, boolean createParentWhenNotExists)
throws Exception {
if (!fileSystem.exists(filePath)) {
if (!fileSystem.exists(filePath.getParent())) {
if (!createParentWhenNotExists) {
throw new IOException(
"parent dir " + filePath.getParent().getPath() + " dose not exists.");
}
mkdirs(fileSystem, filePath.getParent(), user);
}
fileSystem.createNewFile(filePath);
fileSystem.setOwner(filePath, user);
}
}

/**
* Recursively create a directory
*
Expand Down Expand Up @@ -133,4 +157,39 @@ public static boolean mkdirs(FileSystem fileSystem, FsPath dest, String user) th
}
return true;
}

/**
* Recursively create a directory(递归创建目录) add owner info
*
* @param fileSystem
* @param dest
* @param user
* @throws IOException
* @return
*/
public static boolean mkdirsAndSetOwner(FileSystem fileSystem, FsPath dest, String user)
throws IOException {
FsPath parentPath = dest.getParent();
Stack<FsPath> dirsToMake = new Stack<>();
dirsToMake.push(dest);
while (!fileSystem.exists(parentPath)) {
dirsToMake.push(parentPath);

if (Objects.isNull(parentPath.getParent())) {
// parent path of root is null
break;
}

parentPath = parentPath.getParent();
}
if (!fileSystem.canExecute(parentPath)) {
throw new IOException("You have not permission to access path " + dest.getPath());
}
while (!dirsToMake.empty()) {
FsPath path = dirsToMake.pop();
fileSystem.mkdir(path);
fileSystem.setOwner(path, user);
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,8 @@ public static byte[] mergeByteArrays(byte[] arr1, byte[] arr2) {
System.arraycopy(arr2, 0, mergedArray, arr1.length, arr2.length);
return mergedArray;
}

public static boolean isHDFSPath(FsPath fsPath) {
return HDFS.equalsIgnoreCase(fsPath.getFsType());
}
}
Loading

0 comments on commit a51fef5

Please sign in to comment.