diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java index 7908e5a506..a1c0839b2f 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.FileSystems; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; @@ -168,6 +170,10 @@ public File toFile() { return new File(uri); } + public Path toPath() { + return FileSystems.getDefault().getPath(uri.toString()); + } + public String getUriString() { return uri.toString(); } diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java index d23f4a0867..0ecb3dc2a5 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ByteTimeUtils.java @@ -213,7 +213,6 @@ private static long parseByteString(String str, ByteUnit unit) { } else { throw new NumberFormatException("Failed to parse byte string: " + str); } - suffix = suffix.toLowerCase(); // Check for invalid suffixes if (suffix != null && !byteSuffixes.containsKey(suffix)) { throw new NumberFormatException("Invalid suffix: \"" + suffix + "\""); @@ -297,6 +296,18 @@ public static long byteStringAsGb(String str) { return parseByteString(str, ByteUnit.GiB); } + /** + * Convert a passed byte string (e.g. -50b, -100k, or -250m) to gibibytes for internal use. + * + *

If no suffix is provided, the passed number is assumed to be in gibibytes. + */ + public static long negativeByteStringAsGb(String str) { + if (str.startsWith("-")) { + return Math.negateExact(parseByteString(str.substring(1), ByteUnit.GiB)); + } + return parseByteString(str, ByteUnit.GiB); + } + /** * Returns a byte array with the buffer's contents, trying to avoid copying the data if possible. */ diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java new file mode 100644 index 0000000000..a367b38b80 --- /dev/null +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/ResultSetUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.common.utils; + +import org.apache.linkis.common.io.FsPath; + +import java.io.File; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ResultSetUtils { + + // Sort in ASC order by numx in the result set _numx.dolphin file name + public static Comparator getResultSetFileComparatorOrderByNameNum() { + + Comparator comparator = + (o1, o2) -> { + // get the num of file name + String regx = "\\d+"; + + String[] res1 = o1.getPath().split(File.separator); + String fileName1 = res1[res1.length - 1]; + Matcher matcher1 = Pattern.compile(regx).matcher(fileName1); + int num1 = matcher1.find() ? Integer.parseInt(matcher1.group()) : Integer.MAX_VALUE; + + String[] res2 = o2.getPath().split(File.separator); + String fileName2 = res2[res2.length - 1]; + Matcher matcher2 = Pattern.compile(regx).matcher(fileName2); + int num2 = matcher2.find() ? Integer.parseInt(matcher2.group()) : Integer.MAX_VALUE; + + return num1 - num2; + }; + return comparator; + } + + public static void sortByNameNum(List fsPathList) { + Collections.sort(fsPathList, getResultSetFileComparatorOrderByNameNum()); + } +} diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java index b891f99d15..615472474d 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/utils/VariableOperationUtils.java @@ -28,7 +28,10 @@ import java.util.Iterator; import java.util.Map; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -49,6 +52,9 @@ public class VariableOperationUtils { private static final String[] CYCLES = new String[] {CYCLE_YEAR, CYCLE_MONTH, CYCLE_DAY, CYCLE_HOUR, CYCLE_MINUTE, CYCLE_SECOND}; + private static final ObjectMapper mapper = + JsonMapper.builder().enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS).build(); + /** * yyyy-MM-dd HH:mm:ss * @@ -78,30 +84,44 @@ public static ZonedDateTime toZonedDateTime(Date date) { * @param str * @return */ + @Deprecated public static String replaces(ZonedDateTime dateTime, String str) throws VariableOperationFailedException { - return replaces(dateTime, str, true); + try { + JsonNode rootNode = mapper.readTree(str); + if (rootNode.isArray() || rootNode.isObject()) { + replaceJson(dateTime, rootNode); + return rootNode.toString(); + } + } catch (Exception e) { + return replace(dateTime, str); + } + return replace(dateTime, str); } /** * json support variable operation * + * @param codeType * @param dateTime * @param str - * @param format * @return */ - public static String replaces(ZonedDateTime dateTime, String str, boolean format) + public static String replaces(String codeType, ZonedDateTime dateTime, String str) throws VariableOperationFailedException { - try { - JsonNode rootNode = JsonUtils.jackson().readTree(str); - if (rootNode.isArray() || rootNode.isObject()) { - replaceJson(dateTime, rootNode); - return rootNode.toString(); + String languageType = CodeAndRunTypeUtils.getLanguageTypeByCodeType(codeType, ""); + if (languageType.equals(CodeAndRunTypeUtils.LANGUAGE_TYPE_JSON())) { + try { + JsonNode rootNode = mapper.readTree(str); + if (rootNode.isArray() || rootNode.isObject()) { + replaceJson(dateTime, rootNode); + return rootNode.toString(); + } + } catch (Exception e) { + return replace(dateTime, str); } - } catch (Exception e) { - return replace(dateTime, str); } + return replace(dateTime, str); } @@ -197,8 +217,7 @@ private static void replaceJson(ZonedDateTime dateTime, JsonNode object) } else if (temp.isObject()) { replaceJson(dateTime, temp); } else { - arrayNode.remove(i); - arrayNode.insert(i, replace(dateTime, temp.toString())); + arrayNode.set(i, replace(dateTime, temp.toString())); } } } else if (object.isObject()) { diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala index 30bdeb4b14..6c5bd7cf3c 100644 --- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala +++ b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala @@ -143,9 +143,10 @@ object VariableUtils extends Logging { } initAllDateVars(run_date, nameAndType) val codeOperation = parserVar(code, nameAndType) - parserDate(codeOperation, run_date) + parserDate(codeType, codeOperation, run_date) } + @deprecated private def parserDate(code: String, run_date: CustomDateType): String = { if (Configuration.VARIABLE_OPERATION) { val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) @@ -155,6 +156,15 @@ object VariableUtils extends Logging { } } + private def parserDate(codeType: String, code: String, run_date: CustomDateType): String = { + if (Configuration.VARIABLE_OPERATION) { + val zonedDateTime: ZonedDateTime = VariableOperationUtils.toZonedDateTime(run_date.getDate) + VariableOperationUtils.replaces(codeType, zonedDateTime, code) + } else { + code + } + } + private def initAllDateVars( run_date: CustomDateType, nameAndType: mutable.Map[String, variable.VariableType] @@ -337,7 +347,7 @@ object VariableUtils extends Logging { * * @param code * :code - * @param codeType + * @param languageType * :SQL,PYTHON * @return */ @@ -346,27 +356,37 @@ object VariableUtils extends Logging { var varString: String = null var errString: String = null + var rightVarString: String = null languageType match { case CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL => varString = """\s*--@set\s*.+\s*""" + rightVarString = """^\s*--@set\s*.+\s*""" errString = """\s*--@.*""" case CodeAndRunTypeUtils.LANGUAGE_TYPE_PYTHON | CodeAndRunTypeUtils.LANGUAGE_TYPE_SHELL => varString = """\s*#@set\s*.+\s*""" + rightVarString = """^\s*#@set\s*.+\s*""" errString = """\s*#@""" case CodeAndRunTypeUtils.LANGUAGE_TYPE_SCALA => varString = """\s*//@set\s*.+\s*""" + rightVarString = """^\s*//@set\s*.+\s*""" errString = """\s*//@.+""" case CodeAndRunTypeUtils.LANGUAGE_TYPE_JAVA => varString = """\s*!!@set\s*.+\s*""" + rightVarString = """^\s*!!@set\s*.+\s*""" case _ => return nameAndValue } val customRegex = varString.r.unanchored + val customRightRegex = rightVarString.r.unanchored val errRegex = errString.r.unanchored code.split("\n").foreach { str => { + + if (customRightRegex.unapplySeq(str).size < customRegex.unapplySeq(str).size) { + logger.warn(s"code:$str is wrong custom variable format!!!") + } str match { case customRegex() => val clearStr = if (str.endsWith(";")) str.substring(0, str.length - 1) else str diff --git a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java index 7b90f053ca..f548d89d46 100644 --- a/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java +++ b/linkis-commons/linkis-common/src/test/java/org/apache/linkis/common/utils/ByteTimeUtilsTest.java @@ -17,31 +17,159 @@ package org.apache.linkis.common.utils; +import java.util.function.Function; + +import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; - class ByteTimeUtilsTest { + private static final ImmutableMap> opFunction = + ImmutableMap.>builder() + .put("byteStringAsBytes", tar -> ByteTimeUtils.byteStringAsBytes(tar)) + .put("byteStringAsKb", tar -> ByteTimeUtils.byteStringAsKb(tar)) + .put("byteStringAsMb", tar -> ByteTimeUtils.byteStringAsMb(tar)) + .put("byteStringAsGb", tar -> ByteTimeUtils.byteStringAsGb(tar)) + .build(); + + private static final ImmutableMap convertToByte = + ImmutableMap.builder() + .put("1", 1l) + .put("1b", 1l) + .put("1B", 1l) + .put("1k", 1024l) + .put("1K", 1024l) + .put("1kb", 1024l) + .put("1Kb", 1024l) + .put("1kB", 1024l) + .put("1KB", 1024l) + .put("1m", 1024l * 1024l) + .put("1M", 1024l * 1024l) + .put("1mb", 1024l * 1024l) + .put("1Mb", 1024l * 1024l) + .put("1mB", 1024l * 1024l) + .put("1MB", 1024l * 1024l) + .put("1g", 1024l * 1024l * 1024l) + .put("1G", 1024l * 1024l * 1024l) + .put("1gb", 1024l * 1024l * 1024l) + .put("1gB", 1024l * 1024l * 1024l) + .put("1Gb", 1024l * 1024l * 1024l) + .put("1GB", 1024l * 1024l * 1024l) + .put("1t", 1024l * 1024l * 1024l * 1024l) + .put("1T", 1024l * 1024l * 1024l * 1024l) + .put("1tb", 1024l * 1024l * 1024l * 1024l) + .put("1Tb", 1024l * 1024l * 1024l * 1024l) + .put("1tB", 1024l * 1024l * 1024l * 1024l) + .put("1TB", 1024l * 1024l * 1024l * 1024l) + .put("1p", 1024l * 1024l * 1024l * 1024l * 1024l) + .put("1P", 1024l * 1024l * 1024l * 1024l * 1024l) + .put("1pb", 1024l * 1024l * 1024l * 1024l * 1024l) + .put("1Pb", 1024l * 1024l * 1024l * 1024l * 1024l) + .put("1pB", 1024l * 1024l * 1024l * 1024l * 1024l) + .put("1PB", 1024l * 1024l * 1024l * 1024l * 1024l) + .build(); + + private static final ImmutableMap convertToKB = + ImmutableMap.builder() + .put("1", 1l) + .put("1024b", 1l) + .put("1024B", 1l) + .put("1k", 1l) + .put("1K", 1l) + .put("1kb", 1l) + .put("1Kb", 1l) + .put("1kB", 1l) + .put("1KB", 1l) + .put("1m", 1024l) + .put("1M", 1024l) + .put("1mb", 1024l) + .put("1Mb", 1024l) + .put("1mB", 1024l) + .put("1MB", 1024l) + .put("1g", 1024l * 1024l) + .put("1G", 1024l * 1024l) + .put("1gb", 1024l * 1024l) + .put("1gB", 1024l * 1024l) + .put("1Gb", 1024l * 1024l) + .put("1GB", 1024l * 1024l) + .build(); + + private static final ImmutableMap convertToMB = + ImmutableMap.builder() + .put("1", 1l) + .put("1024k", 1l) + .put("1024K", 1l) + .put("1024kb", 1l) + .put("1024Kb", 1l) + .put("1024kB", 1l) + .put("1024KB", 1l) + .put("1m", 1l) + .put("1M", 1l) + .put("1mb", 1l) + .put("1Mb", 1l) + .put("1mB", 1l) + .put("1MB", 1l) + .put("1g", 1024l) + .put("1G", 1024l) + .put("1gb", 1024l) + .put("1gB", 1024l) + .put("1Gb", 1024l) + .put("1GB", 1024l) + .build(); + + private static final ImmutableMap convertToGB = + ImmutableMap.builder() + .put("1", 1l) + .put("1024m", 1l) + .put("1024M", 1l) + .put("1024mb", 1l) + .put("1024Mb", 1l) + .put("1024mB", 1l) + .put("1024MB", 1l) + .put("1g", 1l) + .put("1G", 1l) + .put("1gb", 1l) + .put("1gB", 1l) + .put("1Gb", 1l) + .put("1GB", 1l) + .put("1t", 1024l) + .put("1T", 1024l) + .put("1tb", 1024l) + .put("1Tb", 1024l) + .put("1tB", 1024l) + .put("1TB", 1024l) + .build(); + @Test - void byteStringAsBytes() {} + void byteStringAsBytes() { + convertToByte.forEach( + (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsBytes").apply(k), v)); + Assertions.assertThrows( + IllegalArgumentException.class, () -> opFunction.get("byteStringAsBytes").apply("1A")); + } @Test - void byteStringAsKb() {} + void byteStringAsKb() { + convertToKB.forEach( + (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsKb").apply(k), v)); + Assertions.assertThrows( + IllegalArgumentException.class, () -> opFunction.get("byteStringAsKb").apply("1a")); + } @Test - void byteStringAsMb() {} + void byteStringAsMb() { + convertToMB.forEach( + (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsMb").apply(k), v)); + Assertions.assertThrows( + IllegalArgumentException.class, () -> opFunction.get("byteStringAsMb").apply("1c")); + } @Test void byteStringAsGb() { - Long res = ByteTimeUtils.byteStringAsGb("1G"); - Assertions.assertEquals(res, 1); - + convertToGB.forEach( + (k, v) -> Assertions.assertEquals(opFunction.get("byteStringAsGb").apply(k), v)); Assertions.assertThrows( - NullPointerException.class, - () -> { - ByteTimeUtils.byteStringAsGb("512"); - }); + IllegalArgumentException.class, () -> opFunction.get("byteStringAsGb").apply("1C")); } } diff --git a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala index c0d4ad1d61..e7a105497c 100644 --- a/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala +++ b/linkis-commons/linkis-common/src/test/scala/org/apache/linkis/common/utils/VariableUtilsTest.scala @@ -22,6 +22,8 @@ import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour, getToday} import java.util +import scala.collection.mutable + import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -63,4 +65,19 @@ class VariableUtilsTest { assertEquals(VariableUtils.replace(sql, "sql", varMap), resSql) } + @Test + def testGetCustomVar: Unit = { + var scalaCode = "" + + "-------@set globalpara=60--------\n" + + "--@set globalpara2=66\n" + + "select ${globalpara} as globalpara,\n" + + "-- ${globalpara1} as globalpara1, \n" + + "${globalpara2} as globalpara2;\n" + var pythonCode = "" + + val nameAndValue: mutable.Map[String, String] = + VariableUtils.getCustomVar(scalaCode, CodeAndRunTypeUtils.LANGUAGE_TYPE_SQL); + assertEquals(nameAndValue.size, 2) + } + } diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala index d901bb902e..ec61ebd66d 100644 --- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala +++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/AbstractHttpClient.scala @@ -143,23 +143,31 @@ abstract class AbstractHttpClient(clientConfig: ClientConfig, clientName: String val req = prepareReq(action) val startTime = System.currentTimeMillis val response = executeRequest(req, Some(waitTime).filter(_ > 0)) + val taken = System.currentTimeMillis - startTime + attempts.add(taken) + val costTime = ByteTimeUtils.msDurationToString(taken) + logger.info( + s"invoke ${req.getURI} get status ${response.getStatusLine.getStatusCode} taken: ${costTime}." + ) if (response.getStatusLine.getStatusCode == 401) { tryLogin(action, getRequestUrl(action), true) - logger.info("The user is not logged in, please log in first, you can set a retry") val msg = Utils.tryCatch(EntityUtils.toString(response.getEntity)) { t => logger.warn("failed to parse entity", t) "" } IOUtils.closeQuietly(response) - throw new HttpClientRetryException( - "The user is not logged in, please log in first, you can set a retry, message: " + msg - ) + if (attempts.size() <= 1) { + logger.info("The user is not logged in, default retry once") + addAttempt() + } else { + logger.info("The user is not logged in, you can set a retry") + throw new HttpClientRetryException( + "The user is not logged in, please log in first, you can set a retry, message: " + msg + ) + } + } else { + response } - val taken = System.currentTimeMillis - startTime - attempts.add(taken) - val costTime = ByteTimeUtils.msDurationToString(taken) - logger.info(s"invoke ${req.getURI} taken: ${costTime}.") - response } val response = diff --git a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala index 26170a4577..b1fc579f3c 100644 --- a/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala +++ b/linkis-commons/linkis-httpclient/src/main/scala/org/apache/linkis/httpclient/config/ClientConfigBuilder.scala @@ -17,9 +17,9 @@ package org.apache.linkis.httpclient.config +import org.apache.linkis.common.exception.LinkisRetryException import org.apache.linkis.common.utils.{DefaultRetryHandler, RetryHandler} import org.apache.linkis.httpclient.authentication.AuthenticationStrategy -import org.apache.linkis.httpclient.exception.HttpClientRetryException import org.apache.linkis.httpclient.loadbalancer.LoadBalancerStrategy import scala.concurrent.duration.TimeUnit @@ -39,11 +39,10 @@ class ClientConfigBuilder protected () { protected var readTimeout: Long = _ protected var maxConnection: Int = _ protected var retryEnabled: Boolean = true - protected var retryHandler: RetryHandler = buildDefaultRetryHandler() - def buildDefaultRetryHandler(): RetryHandler = { - retryHandler = new DefaultRetryHandler - retryHandler.addRetryException(classOf[HttpClientRetryException]) + protected var retryHandler: RetryHandler = { + val retryHandler = new DefaultRetryHandler + retryHandler.addRetryException(classOf[LinkisRetryException]) retryHandler } diff --git a/linkis-commons/linkis-mybatis/pom.xml b/linkis-commons/linkis-mybatis/pom.xml index b87a1ed66a..1481f2008d 100644 --- a/linkis-commons/linkis-mybatis/pom.xml +++ b/linkis-commons/linkis-mybatis/pom.xml @@ -37,6 +37,32 @@ com.baomidou mybatis-plus-boot-starter + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-autoconfigure + + + org.springframework + spring-beans + + + org.springframework + spring-jdbc + + + com.zaxxer + HikariCP + + + + + org.springframework + spring-jdbc com.github.pagehelper diff --git a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java index 42661f8283..6eb97c84d9 100644 --- a/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java +++ b/linkis-commons/linkis-protocol/src/main/java/org/apache/linkis/protocol/constants/TaskConstant.java @@ -69,6 +69,7 @@ public interface TaskConstant { String TICKET_ID = "ticketId"; String ENGINE_CONN_TASK_ID = "engineConnTaskId"; String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime"; + String DEBUG_ENBALE = "debug.enable"; String PARAMS_DATA_SOURCE = "dataSources"; diff --git a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/TaskUtils.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/TaskUtils.scala index 3b94bbdc14..9b2be16ef7 100644 --- a/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/TaskUtils.scala +++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/utils/TaskUtils.scala @@ -97,4 +97,15 @@ object TaskUtils { def addLabelsMap(params: util.Map[String, AnyRef], labels: util.Map[String, AnyRef]): Unit = addMap(params, labels, TaskConstant.LABELS) + def isWithDebugInfo(params: util.Map[String, AnyRef]): Boolean = { + val debug = getConfigurationMap(params, TaskConstant.PARAMS_CONFIGURATION_STARTUP).get( + TaskConstant.DEBUG_ENBALE + ) + if (debug != null && "true".equals(debug.toString)) { + true + } else { + false + } + } + } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala index 32f12da273..3a34e42a0e 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/conf/RPCConfiguration.scala @@ -80,6 +80,16 @@ object RPCConfiguration { CommonVars("wds.linkis.gateway.conf.linkismanager.list", "linkisManager,engineplugin").getValue .split(",") + val LINKIS_DATASOURCE_SERVICE_NAME: CommonVars[String] = + CommonVars("linkis.gateway.conf.linkisdatasource.name", "linkis-ps-datasource") + + val LINKIS_DATASOURCE_SERVICE_LIST: Array[String] = + CommonVars( + "linkis.gateway.conf.linkisdatasource.list", + "data-source-manager,metadataquery,datasource,metadataQuery,metadatamanager" + ).getValue + .split(",") + val BDP_RPC_INSTANCE_ALIAS_SERVICE_REFRESH_INTERVAL: CommonVars[TimeType] = CommonVars("wds.linkis.rpc.instancealias.refresh.interval", new TimeType("3s")) diff --git a/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/conf/RPCConfigurationTest.scala b/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/conf/RPCConfigurationTest.scala index 5aaadd133c..ed1d39ce8c 100644 --- a/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/conf/RPCConfigurationTest.scala +++ b/linkis-commons/linkis-rpc/src/test/scala/org/apache/linkis/rpc/conf/RPCConfigurationTest.scala @@ -37,7 +37,6 @@ class RPCConfigurationTest { val enablepublicservice = RPCConfiguration.ENABLE_PUBLIC_SERVICE.getValue val publicserviceapplicationname = RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue val publicservicelist = RPCConfiguration.PUBLIC_SERVICE_LIST - val metadataqueryservicelist = RPCConfiguration.METADATAQUERY_SERVICE_LIST Assertions.assertTrue(25 == bdprpcbroadcastthreadsize.intValue()) Assertions.assertTrue(400 == bdprpcreceiverasynconsumerthreadmax.intValue()) @@ -48,7 +47,6 @@ class RPCConfigurationTest { Assertions.assertTrue(enablepublicservice) Assertions.assertEquals("linkis-ps-publicservice", publicserviceapplicationname) Assertions.assertTrue(publicservicelist.size > 0) - Assertions.assertTrue(metadataqueryservicelist.size > 0) } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala index 50dce2ca12..165a274362 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/Consumer.scala @@ -41,7 +41,6 @@ abstract class Consumer(schedulerContext: SchedulerContext, executeService: Exec def start(): Unit def shutdown(): Unit = { - logger.info(s"$toString is ready to stop!") terminate = true logger.info(s"$toString stopped!") } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala index f3471b07dd..be1716f238 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/GroupFactory.scala @@ -19,6 +19,11 @@ package org.apache.linkis.scheduler.queue abstract class GroupFactory { + /** + * Create a Group and set the concurrency limit of the group + * @param event + * @return + */ def getOrCreateGroup(event: SchedulerEvent): Group def getGroup(groupName: String): Group diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala index b0bbfd3c2b..8bea7e52b1 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/LoopArrayQueue.scala @@ -40,7 +40,12 @@ class LoopArrayQueue(var group: Group) extends ConsumeQueue with Logging { override def getWaitingEvents: Array[SchedulerEvent] = { eventQueue synchronized { - toIndexedSeq.filter(x => x.getState.equals(SchedulerEventState.Inited)).toArray + toIndexedSeq + .filter(x => + x.getState.equals(SchedulerEventState.Inited) || x.getState + .equals(SchedulerEventState.Scheduled) + ) + .toArray } } diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala index 2a40c2517b..d541d8a2eb 100644 --- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala +++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/fifoqueue/FIFOUserConsumer.scala @@ -73,6 +73,8 @@ class FIFOUserConsumer( override def getRunningEvents: Array[SchedulerEvent] = getEvents(e => e.isRunning || e.isWaitForRetry) + protected def getSchedulerContext: SchedulerContext = schedulerContext + private def getEvents(op: SchedulerEvent => Boolean): Array[SchedulerEvent] = { val result = ArrayBuffer[SchedulerEvent]() runningJobs.filter(_ != null).filter(x => op(x)).foreach(result += _) @@ -82,16 +84,28 @@ class FIFOUserConsumer( override def run(): Unit = { Thread.currentThread().setName(s"${toString}Thread") logger.info(s"$toString thread started!") - while (!terminate) { - Utils.tryAndError(loop()) - Utils.tryAndError(Thread.sleep(10)) + while (!terminate) Utils.tryAndError { + loop() + Thread.sleep(10) } logger.info(s"$toString thread stopped!") } protected def askExecutorGap(): Unit = {} + /** + * Task scheduling interception is used to judge the rules of task operation, and to judge other + * task rules based on Group. For example, Entrance makes Creator-level task judgment. + */ + protected def runScheduleIntercept(): Boolean = { + true + } + protected def loop(): Unit = { + if (!runScheduleIntercept()) { + Utils.tryQuietly(Thread.sleep(1000)) + return + } var isRetryJob = false def getWaitForRetryEvent: Option[SchedulerEvent] = { val waitForRetryJobs = runningJobs.filter(job => job != null && job.isJobCanRetry) @@ -110,7 +124,7 @@ class FIFOUserConsumer( if (event.isEmpty) { val completedNums = runningJobs.filter(job => job == null || job.isCompleted) if (completedNums.length < 1) { - Utils.tryQuietly(Thread.sleep(1000)) // TODO 还可以优化,通过实现JobListener进行优化 + Utils.tryQuietly(Thread.sleep(1000)) return } while (event.isEmpty) { @@ -119,7 +133,12 @@ class FIFOUserConsumer( if ( takeEvent.exists(e => Utils.tryCatch(e.turnToScheduled()) { t => - takeEvent.get.asInstanceOf[Job].onFailure("Job状态翻转为Scheduled失败!", t) + takeEvent.get + .asInstanceOf[Job] + .onFailure( + "Failed to change the job status to Scheduled(Job状态翻转为Scheduled失败)", + t + ) false } ) @@ -174,7 +193,7 @@ class FIFOUserConsumer( ) ) case error: Throwable => - job.onFailure("请求引擎失败,可能是由于后台进程错误!请联系管理员", error) + job.onFailure("Failed to request EngineConn", error) if (job.isWaitForRetry) { logger.warn(s"Ask executor for Job $job failed, wait for the next retry!", error) if (!isRetryJob) putToRunningJobs(job) @@ -190,6 +209,20 @@ class FIFOUserConsumer( override def shutdown(): Unit = { future.cancel(true) + val waitEvents = queue.getWaitingEvents + if (waitEvents.nonEmpty) { + waitEvents.foreach { + case job: Job => + job.onFailure("Your job will be marked as canceled because the consumer be killed", null) + case _ => + } + } + + this.runningJobs.foreach { job => + if (job != null && !job.isCompleted) { + job.onFailure("Your job will be marked as canceled because the consumer be killed", null) + } + } super.shutdown() } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 66fafeacda..74950c15fe 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -39,7 +39,7 @@ public class LinkisStorageConf { public static final String FILE_TYPE = CommonVars.apply( "wds.linkis.storage.file.type", - "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql") + "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql") .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java index ba4d877f3a..d98be40337 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java @@ -91,14 +91,17 @@ public void addMetaData(MetaData metaData) throws IOException { private String compact(String[] row) { String quotationMarks = "\""; + String dealNewlineSymbolMarks = "\n"; StringBuilder rowBuilder = new StringBuilder(); for (String value : row) { - String decoratedValue = - StringUtils.isBlank(value) - ? value - : quoteRetouchEnable - ? quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks - : value; + String decoratedValue = value; + if (StringUtils.isNotBlank(value)) { + if (quoteRetouchEnable) { + decoratedValue = quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks; + } + decoratedValue = decoratedValue.replaceAll(dealNewlineSymbolMarks, " "); + logger.debug("decorateValue with input: {} output: {} ", value, decoratedValue); + } rowBuilder.append(decoratedValue).append(delimiter); } if (rowBuilder.length() > 0 && rowBuilder.toString().endsWith(delimiter)) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java index ad9e0ee882..6808f693ec 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL; + public enum DataType { NullType("void", 0), StringType("string", 12), @@ -63,7 +65,8 @@ public enum DataType { public static final String LOWCASE_NULL_VALUE = "null"; // TODO Change to fine-grained regular expressions(改为精细化正则表达式) - public static final Pattern DECIMAL_REGEX = Pattern.compile("^decimal\\(\\d*\\,\\d*\\)"); + public static final Pattern DECIMAL_REGEX = + Pattern.compile("^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)"); public static final Pattern SHORT_REGEX = Pattern.compile("^short.*"); public static final Pattern INT_REGEX = Pattern.compile("^int.*"); @@ -130,7 +133,11 @@ public static DataType toDataType(String dataType) { } public static Object toValue(DataType dataType, String value) { + Object result = null; + if (isLinkisNull(value)) { + return result; + } try { switch (dataType) { case NullType: @@ -187,12 +194,16 @@ public static Object toValue(DataType dataType, String value) { result = value; } } catch (Exception e) { - logger.debug("Failed to " + value + " switch to dataType:", e); + logger.debug("Failed to {} switch to dataType:", value, e); result = value; } return result; } + public static boolean isLinkisNull(String value) { + return value == null || value.equals(LINKIS_NULL); + } + public static boolean isNull(String value) { return value == null || value.equals(NULL_VALUE) || value.trim().equals(""); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java index b6badd2849..35c71295e4 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java @@ -59,6 +59,9 @@ public class Dolphin { public static final String NULL = "NULL"; public static final byte[] NULL_BYTES = "NULL".getBytes(Charset.forName("utf-8")); + public static final String LINKIS_NULL = "LINKIS_NULL"; + public static final byte[] LINKIS_NULL_BYTES = LINKIS_NULL.getBytes(Charset.forName("utf-8")); + public static final int INT_LEN = 10; public static final int FILE_EMPTY = 31; @@ -79,6 +82,14 @@ public static String getString(byte[] bytes, int start, int len) { return new String(bytes, start, len, Dolphin.CHAR_SET); } + public static String toStringValue(String value) { + if (LINKIS_NULL.equals(value)) { + return NULL; + } else { + return value; + } + } + /** * Read an integer value that converts the array to a byte of length 10 bytes * 读取整数值,该值为将数组转换为10字节长度的byte diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java index 3c82ceb523..fad0d83a12 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java @@ -20,7 +20,10 @@ public enum StorageErrorCode { /** */ - FS_NOT_INIT(53001, "please init first(请先初始化)"); + FS_NOT_INIT(53001, "please init first"), + + INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"), + FS_OOM(53002, "OOM occurred while reading the file"); StorageErrorCode(int errorCode, String message) { this.code = errorCode; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 749ec9a24e..3047b715a0 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -99,6 +99,7 @@ public static ResultSetReader getTableResultReader(String res) { } Fs fs = FSFactory.getFs(resPath); + logger.info("Try to init Fs with path:{}", resPath.getPath()); try { fs.init(null); InputStream read = fs.read(resPath); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java index 35f7483c82..c0222cc848 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java @@ -33,7 +33,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +52,6 @@ public class StorageResultSetReader private Fs fs; private final int READ_CACHE = 1024; - private final byte[] bytes = new byte[READ_CACHE]; public StorageResultSetReader(ResultSet resultSet, InputStream inputStream) { super(resultSet, inputStream); @@ -84,21 +82,18 @@ public byte[] readLine() { return null; } - byte[] rowBuffer = new byte[0]; int len = 0; - - while (rowLen > 0 && len >= 0) { - if (rowLen > READ_CACHE) { - len = StorageUtils.readBytes(inputStream, bytes, READ_CACHE); - } else { - len = StorageUtils.readBytes(inputStream, bytes, rowLen); - } - - if (len > 0) { - rowLen -= len; - rowBuffer = Arrays.copyOf(rowBuffer, rowBuffer.length + len); - System.arraycopy(bytes, 0, rowBuffer, rowBuffer.length - len, len); - } + byte[] rowBuffer = null; + try { + rowBuffer = new byte[rowLen]; + len = StorageUtils.readBytes(inputStream, rowBuffer, rowLen); + } catch (OutOfMemoryError error) { + logger.error("Result set read oom, read size {} Byte", rowLen); + throw new RuntimeException(error); + } + if (len != rowLen) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); } rowCount++; return rowBuffer; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java index f13d42b0b6..c8270714ba 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java @@ -19,9 +19,6 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.storage.resultset.ResultRecord; -import org.apache.linkis.storage.utils.StorageUtils; - -import java.util.Arrays; public class TableRecord implements ResultRecord { @@ -35,10 +32,4 @@ public TableRecord(Object[] row) { public Record cloneRecord() { return new TableRecord(row.clone()); } - - public String[] tableRecordToString(String nullValue) { - return Arrays.stream(row) - .map(col -> StorageUtils.colToString(col, nullValue)) - .toArray(String[]::new); - } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java index 7b7838fd25..7e1d6c35fe 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java @@ -52,13 +52,13 @@ public TableMetaData createMetaData(byte[] bytes) { List columns = new ArrayList<>(); for (int i = 0; i < colArray.length; i += 3) { int len = Integer.parseInt(colArray[i]); - String colName = Dolphin.getString(bytes, index, len); + String colName = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; len = Integer.parseInt(colArray[i + 1]); - String colType = Dolphin.getString(bytes, index, len); + String colType = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; len = Integer.parseInt(colArray[i + 2]); - String colComment = Dolphin.getString(bytes, index, len); + String colComment = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; columns.add(new Column(colName, DataType.toDataType(colType), colComment)); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java index 6abe4c56d5..5f40aa33f3 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java @@ -61,7 +61,7 @@ private byte[] lineToBytes(Object[] line) { int colByteLen = 0; int length = 0; for (Object data : line) { - byte[] bytes = data == null ? Dolphin.NULL_BYTES : Dolphin.getBytes(data); + byte[] bytes = data == null ? Dolphin.LINKIS_NULL_BYTES : Dolphin.getBytes(data); dataBytes.add(bytes); byte[] colBytes = Dolphin.getBytes(bytes.length); colIndex.add(colBytes); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index cee72dfcd7..0ed650186d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -135,6 +135,7 @@ static FileSource create(FsPath fsPath, InputStream is) { } static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) { + logger.info("try create result set file split with path:{}", fsPath.getPath()); ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath); ResultSetReader resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, is); return new FileSplit(resultsetReader, resultset.resultSetType()); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java index d8562b7c5b..fb064a8f4f 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java @@ -17,6 +17,7 @@ package org.apache.linkis.storage.source; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.resultset.table.TableRecord; import org.apache.linkis.storage.utils.StorageUtils; @@ -36,9 +37,18 @@ record -> { .map( r -> { if (r == null || r.equals("NULL")) { - return nullValue; + if (nullValue.equals(Dolphin.LINKIS_NULL)) { + return r; + } else { + return nullValue; + } } else if (r.equals("")) { - return getParams().getOrDefault("nullValue", ""); + String emptyValue = getParams().getOrDefault("nullValue", ""); + if (emptyValue.equals(Dolphin.LINKIS_NULL)) { + return ""; + } else { + return nullValue; + } } else if (r instanceof Double) { return StorageUtils.doubleToString((Double) r); } else { diff --git a/linkis-computation-governance/linkis-client/linkis-cli/pom.xml b/linkis-computation-governance/linkis-client/linkis-cli/pom.xml index 9d0cbbb849..76723c4adb 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/pom.xml +++ b/linkis-computation-governance/linkis-client/linkis-cli/pom.xml @@ -62,6 +62,12 @@ ${project.artifactId}-${project.version} + + + true + ${basedir}/src/main/resources + + org.apache.maven.plugins diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java index 592725f857..07df8b79ad 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/CtxBuilder.java @@ -46,9 +46,11 @@ import org.apache.commons.lang3.StringUtils; +import java.io.*; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,6 +158,23 @@ public static CliCtx buildCtx(String[] args) throws LinkisClientRuntimeException .init(); logger.info("==========std_var============\n" + CliUtils.GSON.toJson(varAccess)); - return new CliCtxImpl(params.getCmdType(), template, varAccess, new HashMap<>()); + Properties props = new Properties(); + try (InputStream inputStream = + CtxBuilder.class.getClassLoader().getResourceAsStream("version.properties")) { + try (InputStreamReader reader = new InputStreamReader(inputStream)) { + try (BufferedReader bufferedReader = new BufferedReader(reader)) { + props.load(bufferedReader); + } + } + } catch (Exception e) { + logger.warn("Failed to load version info", e); + } + + String verion = props.getProperty(CliKeys.VERSION); + + Map extraMap = new HashMap<>(); + extraMap.put(CliKeys.VERSION, verion); + + return new CliCtxImpl(params.getCmdType(), template, varAccess, extraMap); } } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java index f13c7399fc..1fb21043a1 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/LinkisClientApplication.java @@ -26,9 +26,11 @@ import org.apache.linkis.cli.application.exception.CommandException; import org.apache.linkis.cli.application.interactor.command.CmdTemplateFactory; import org.apache.linkis.cli.application.interactor.command.template.UniversalCmdTemplate; +import org.apache.linkis.cli.application.interactor.job.help.HelpJob; import org.apache.linkis.cli.application.interactor.job.interactive.InteractiveJob; import org.apache.linkis.cli.application.interactor.job.jobcmd.JobCmdJob; import org.apache.linkis.cli.application.interactor.job.once.LinkisOnceJob; +import org.apache.linkis.cli.application.interactor.job.version.VersionJob; import org.apache.linkis.cli.application.operator.OperManager; import org.apache.linkis.cli.application.operator.once.OnceOperBuilder; import org.apache.linkis.cli.application.operator.ujes.LinkisOperBuilder; @@ -49,6 +51,8 @@ public class LinkisClientApplication { private static Logger logger = LoggerFactory.getLogger(LinkisClientApplication.class); + private static boolean showHelp = false; + public static void main(String[] args) { /* generate template @@ -64,7 +68,9 @@ public static void main(String[] args) { } catch (CommandException e) { CmdTemplate template = CmdTemplateFactory.getTemplateOri(e.getCmdType()); if (template != null) { - printHelp(template); + HelpInfoModel model = new HelpInfoModel(); + model.buildModel(ctx.getTemplate()); + new HelpPresenter().present(model); } LoggerManager.getInformationLogger().error("Failed to build CliCtx", e); System.exit(-1); @@ -80,7 +86,11 @@ public static void main(String[] args) { run job */ Job job; - if (isJobCmd(ctx)) { + if (isVersionCmd(ctx)) { + job = new VersionJob(); + } else if (isHelp(ctx)) { + job = new HelpJob(); + } else if (isJobCmd(ctx)) { job = new JobCmdJob(); } else if (isOnceCmd(ctx)) { job = new LinkisOnceJob(); @@ -134,21 +144,34 @@ public Map getExtraMessage() { } } - private static void printHelp(CmdTemplate template) { - HelpInfoModel model = new HelpInfoModel(); - model.buildModel(template); - new HelpPresenter().present(model); - } - private static void printIndicator(JobResult jobResult) { if (jobResult.isSuccess()) { LoggerManager.getPlaintTextLogger().info(CliConstants.SUCCESS_INDICATOR); } else { + LoggerManager.getPlaintTextLogger().info(jobResult.getMessage()); + StringBuilder b = new StringBuilder(); + for (Map.Entry e : jobResult.getExtraMessage().entrySet()) { + b.append(e.getKey()).append(":").append(e.getValue()).append(System.lineSeparator()); + } + LoggerManager.getPlaintTextLogger().info(b.toString()); LoggerManager.getPlaintTextLogger().info(CliConstants.FAILURE_INDICATOR); - LoggerManager.getPlaintTextLogger().error(jobResult.getMessage()); } } + private static Boolean isHelp(CliCtx ctx) { + if (ctx.getVarAccess().hasVar(CliKeys.LINKIS_CLIENT_HELP_OPT)) { + return true; + } + return false; + } + + private static Boolean isVersionCmd(CliCtx ctx) { + if (ctx.getVarAccess().hasVar(CliKeys.VERSION)) { + return true; + } + return false; + } + private static Boolean isJobCmd(CliCtx ctx) { if (ctx.getVarAccess().hasVar(CliKeys.LINKIS_CLIENT_KILL_OPT) || ctx.getVarAccess().hasVar(CliKeys.LINKIS_CLIENT_STATUS_OPT) diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/CliKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/CliKeys.java index 5c04cbcd64..966836e0bf 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/CliKeys.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/CliKeys.java @@ -38,6 +38,8 @@ public class CliKeys { public static final String DEFAULT_CONFIG_FILE_NAME_KEY = "conf.file"; public static final String LINUX_USER_KEY = "user.name"; + public static final String VERSION = "cli.version"; + /** Configurable */ /* execution type diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java index 8483003132..170f1a8f86 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/constants/LinkisKeys.java @@ -47,4 +47,5 @@ public class LinkisKeys { public static final String KEY_YARN_QUEUE = "wds.linkis.rm.yarnqueue"; public static final String KEY_HIVE_RESULT_DISPLAY_TBALE = "hive.resultset.use.unique.column.names"; + public static final String CLI_VERSION = "cli.version"; } diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java index f761a6a47a..83b42032cf 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java @@ -271,6 +271,15 @@ public class UniversalCmdTemplate extends AbstractCmdTemplate implements Cloneab "specify jobContent map. Input format: -jobContentMap key1=value1 -jobContentMap key2=value2", true); + protected Flag versionFlag = + flag( + CliKeys.VERSION, + CliKeys.VERSION, + new String[] {"--version"}, + "show version", + true, + false); + protected Parameter argumentsParas = parameter( CliKeys.LINKIS_CLIENT_COMMON, @@ -286,6 +295,9 @@ public UniversalCmdTemplate() { @Override public void checkParams() throws CommandException { + if (versionFlag.hasVal()) { + return; + } int cnt = 0; if (statusOpt.hasVal()) { cnt++; diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/help/HelpJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/help/HelpJob.java new file mode 100644 index 0000000000..943c54ed63 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/help/HelpJob.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.cli.application.interactor.job.help; + +import org.apache.linkis.cli.application.entity.context.CliCtx; +import org.apache.linkis.cli.application.entity.job.Job; +import org.apache.linkis.cli.application.entity.job.JobResult; +import org.apache.linkis.cli.application.present.HelpPresenter; +import org.apache.linkis.cli.application.present.model.HelpInfoModel; + +import java.util.HashMap; +import java.util.Map; + +public class HelpJob implements Job { + private CliCtx ctx; + + @Override + public void build(CliCtx ctx) { + this.ctx = ctx; + } + + @Override + public JobResult run() { + HelpInfoModel model = new HelpInfoModel(); + model.buildModel(ctx.getTemplate()); + new HelpPresenter().present(model); + return new JobResult() { + @Override + public Boolean isSuccess() { + return true; + } + + @Override + public String getMessage() { + return ""; + } + + @Override + public Map getExtraMessage() { + return new HashMap<>(); + } + }; + } + + @Override + public void onDestroy() {} +} diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJob.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJob.java new file mode 100644 index 0000000000..599f97904f --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJob.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.cli.application.interactor.job.version; + +import org.apache.linkis.cli.application.constants.CliKeys; +import org.apache.linkis.cli.application.entity.context.CliCtx; +import org.apache.linkis.cli.application.entity.job.Job; +import org.apache.linkis.cli.application.entity.job.JobResult; +import org.apache.linkis.cli.application.utils.LoggerManager; + +import java.util.HashMap; +import java.util.Map; + +public class VersionJob implements Job { + private CliCtx ctx; + + @Override + public void build(CliCtx cliCtx) { + this.ctx = cliCtx; + } + + @Override + public JobResult run() { + String version = (String) ctx.getExtraMap().get(CliKeys.VERSION); + Map extraMap = new HashMap<>(); + extraMap.put(CliKeys.VERSION, version); + LoggerManager.getPlaintTextLogger().info("Version=" + version); + return new VersionJobResult(true, "ok", extraMap); + } + + @Override + public void onDestroy() {} +} diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/entity/version/Version.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJobResult.java similarity index 53% rename from linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/entity/version/Version.java rename to linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJobResult.java index b0bcd8b6d9..e2f12cd7c2 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/entity/version/Version.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/version/VersionJobResult.java @@ -15,6 +15,35 @@ * limitations under the License. */ -package org.apache.linkis.cli.application.entity.version; +package org.apache.linkis.cli.application.interactor.job.version; -public interface Version {} +import org.apache.linkis.cli.application.entity.job.JobResult; + +import java.util.Map; + +public class VersionJobResult implements JobResult { + private Boolean success; + private String message; + private Map extraMsg; + + public VersionJobResult(Boolean success, String message, Map extraMsg) { + this.success = success; + this.message = message; + this.extraMsg = extraMsg; + } + + @Override + public Boolean isSuccess() { + return success; + } + + @Override + public String getMessage() { + return message; + } + + @Override + public Map getExtraMessage() { + return extraMsg; + } +} diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java index c4f7a3aadc..191061332b 100644 --- a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java @@ -269,7 +269,7 @@ private JobInfoResult queryJobInfoInternal(String user, String taskID) } String msg = MessageFormat.format( - "Get job info failed. retry time : {0}/{1}. taskID={0}, Reason: {1}", + "Get job info failed. retry time : {0}/{1}. taskID={2}, Reason: {3}", retryTime, MAX_RETRY_TIME, taskID, reason); logger.debug( @@ -299,7 +299,7 @@ private JobInfoResult queryJobInfoInternal(String user, String taskID) if (jobInfoResult == null) { reason = "JobInfoResult is null"; } else { - reason = "server returns non-zero status-code"; + reason = "server returns non-zero status-code. "; reason += jobInfoResult.getMessage(); } String msg = diff --git a/linkis-computation-governance/linkis-client/linkis-cli/src/main/resources/version.properties b/linkis-computation-governance/linkis-client/linkis-cli/src/main/resources/version.properties new file mode 100644 index 0000000000..0da37e5c03 --- /dev/null +++ b/linkis-computation-governance/linkis-client/linkis-cli/src/main/resources/version.properties @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cli.version=${project.version} \ No newline at end of file diff --git a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala index 6b41b4c62b..9eb748691e 100644 --- a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala +++ b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala @@ -35,6 +35,9 @@ object ResultSetAction { private var pageSize: Int = _ private var charset: String = Configuration.BDP_ENCODING.getValue + // default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL + private var nullValue: String = "LINKIS_NULL" + def setUser(user: String): Builder = { this.user = user this @@ -60,6 +63,11 @@ object ResultSetAction { this } + def setNullValue(nullValue: String): Builder = { + this.nullValue = nullValue + this + } + def build(): ResultSetAction = { if (user == null) throw new UJESClientBuilderException("user is needed!") if (path == null) throw new UJESClientBuilderException("path is needed!") @@ -68,6 +76,7 @@ object ResultSetAction { if (page > 0) resultSetAction.setParameter("page", page) if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize) resultSetAction.setParameter("charset", charset) + resultSetAction.setParameter("nullValue", nullValue) resultSetAction.setUser(user) resultSetAction } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java index 392f4dd097..c12e2791b3 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java @@ -48,9 +48,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver { static String TOKEN_VALUE = "value"; static String PASSWORD = "password"; static boolean TABLEAU_SERVER = false; - static String LIMIT_ENABLED = "true"; - static String LIMIT = "limit"; - + static String FIXED_SESSION = "fixedSession"; static String VERSION = "version"; static int DEFAULT_VERSION = 1; static String MAX_CONNECTION_SIZE = "maxConnectionSize"; diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java index 8f7dcfed12..3e1e7e3182 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java @@ -17,10 +17,16 @@ package org.apache.linkis.ujes.jdbc.utils; +import org.apache.linkis.common.utils.Utils; + +import java.util.concurrent.atomic.AtomicInteger; + public class JDBCUtils { private static final char SEARCH_STRING_ESCAPE = '\\'; + public static final AtomicInteger idCreator = new AtomicInteger(); + public static String convertPattern(final String pattern) { if (pattern == null) { return ".*"; @@ -50,4 +56,8 @@ public static String convertPattern(final String pattern) { return result.toString(); } } + + public static String getUniqId() { + return Utils.getLocalHostname() + "_" + JDBCUtils.idCreator.getAndIncrement(); + } } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..a9b58e153b 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -25,6 +25,7 @@ import org.apache.linkis.ujes.client.UJESClient import org.apache.linkis.ujes.client.request.JobSubmitAction import org.apache.linkis.ujes.client.response.JobExecuteResult import org.apache.linkis.ujes.jdbc.UJESSQLDriverMain._ +import org.apache.linkis.ujes.jdbc.utils.JDBCUtils import org.apache.commons.lang3.StringUtils @@ -99,6 +100,18 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope private[jdbc] val serverURL = props.getProperty("URL") + private[jdbc] val fixedSessionEnabled = + if ( + props + .containsKey(FIXED_SESSION) && "true".equalsIgnoreCase(props.getProperty(FIXED_SESSION)) + ) { + true + } else { + false + } + + private val connectionId = JDBCUtils.getUniqId() + private val labelMap: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef] private val startupParams: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef] @@ -444,6 +457,10 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope labelMap.put(LabelKeyConstant.ENGINE_TYPE_KEY, engineTypeLabel.getStringValue) labelMap.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, s"$user-$creator") labelMap.put(LabelKeyConstant.CODE_TYPE_KEY, engineToCodeType(engineTypeLabel.getEngineType)) + if (fixedSessionEnabled) { + labelMap.put(LabelKeyConstant.FIXED_EC_KEY, connectionId) + logger.info("Fixed session is enable session id is {}", connectionId) + } val jobSubmitAction = JobSubmitAction.builder .addExecuteCode(code) @@ -462,4 +479,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope result } + override def toString: String = "LinkisConnection_" + connectionId + } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala index 517f8b07ae..bea24181a3 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala @@ -66,7 +66,6 @@ object UJESClientFactory extends Logging { clientConfigBuilder.setAuthenticationStrategy(new StaticAuthenticationStrategy()) clientConfigBuilder.readTimeout(100000) clientConfigBuilder.maxConnectionSize(20) - clientConfigBuilder.readTimeout(10000) val params = props.getProperty(PARAMS) var versioned = false if (StringUtils.isNotBlank(params)) { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala index 6296f399e5..ab2f6dda10 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala @@ -29,10 +29,10 @@ import java.sql.{ DriverPropertyInfo, SQLFeatureNotSupportedException } -import java.util.Properties +import java.util.{Locale, Properties} import java.util.logging.Logger -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ class UJESSQLDriverMain extends Driver with Logging { @@ -72,9 +72,8 @@ class UJESSQLDriverMain extends Driver with Logging { case Array(TOKEN_VALUE, value) => props.setProperty(TOKEN_VALUE, value) false - case Array(LIMIT, value) => - props.setProperty(LIMIT, value) - UJESSQLDriverMain.LIMIT_ENABLED = value.toLowerCase() + case Array(FIXED_SESSION, value) => + props.setProperty(FIXED_SESSION, value) false case Array(key, _) => if (StringUtils.isBlank(key)) { @@ -138,8 +137,7 @@ object UJESSQLDriverMain { val TOKEN_VALUE = UJESSQLDriver.TOKEN_VALUE val PASSWORD = UJESSQLDriver.PASSWORD val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER - val LIMIT = UJESSQLDriver.LIMIT - var LIMIT_ENABLED = UJESSQLDriver.LIMIT_ENABLED + val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION val VERSION = UJESSQLDriver.VERSION val DEFAULT_VERSION = UJESSQLDriver.DEFAULT_VERSION @@ -157,8 +155,7 @@ object UJESSQLDriverMain { connectionParams: String, variableMap: java.util.Map[String, Any] ): String = { - val variables = JavaConversions - .mapAsScalaMap(variableMap) + val variables = variableMap.asScala .map(kv => VARIABLE_HEADER + kv._1 + KV_SPLIT + kv._2) .mkString(PARAM_SPLIT) if (StringUtils.isNotBlank(connectionParams)) connectionParams + PARAM_SPLIT + variables @@ -186,17 +183,20 @@ object UJESSQLDriverMain { ): String = { val sb = new StringBuilder if (StringUtils.isNotBlank(version)) sb.append(VERSION).append(KV_SPLIT).append(version) - if (maxConnectionSize > 0) + if (maxConnectionSize > 0) { sb.append(PARAM_SPLIT).append(MAX_CONNECTION_SIZE).append(KV_SPLIT).append(maxConnectionSize) - if (readTimeout > 0) + } + if (readTimeout > 0) { sb.append(PARAM_SPLIT).append(READ_TIMEOUT).append(KV_SPLIT).append(readTimeout) + } if (enableDiscovery) { sb.append(PARAM_SPLIT).append(ENABLE_DISCOVERY).append(KV_SPLIT).append(enableDiscovery) - if (enableLoadBalancer) + if (enableLoadBalancer) { sb.append(PARAM_SPLIT) .append(ENABLE_LOADBALANCER) .append(KV_SPLIT) .append(enableLoadBalancer) + } } if (sb.startsWith(PARAM_SPLIT)) sb.toString.substring(PARAM_SPLIT.length) else sb.toString } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 5df56733e4..0ed47925c6 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -193,7 +193,7 @@ class UJESSQLResultSet( if (metaData == null) init() currentRowCursor += 1 if (null == resultSetRow || currentRowCursor > resultSetRow.size() - 1) { - if (UJESSQLDriverMain.LIMIT_ENABLED.equals("false") && !isCompleted) { + if (!isCompleted) { updateResultSet() if (isCompleted) { return false @@ -242,7 +242,7 @@ class UJESSQLResultSet( } else { dataType.toLowerCase(Locale.getDefault) match { case null => throw new LinkisSQLException(LinkisSQLErrorCode.METADATA_EMPTY) - case "string" => value.toString + case "char" | "varchar" | "nvarchar" | "string" => value case "short" => value.toShort case "int" => value.toInt case "long" => value.toLong @@ -250,11 +250,8 @@ class UJESSQLResultSet( case "double" => value.toDouble case "boolean" => value.toBoolean case "byte" => value.toByte - case "char" => value.toString - case "timestamp" => value.toString - case "varchar" => value.toString - case "nvarchar" => value.toString - case "date" => value.toString + case "timestamp" => value + case "date" => value case "bigint" => value.toLong case "decimal" => value.toDouble case "array" => value.toArray @@ -974,8 +971,9 @@ class UJESSQLResultSet( logger.info(s"the value of value is $value and the value of localTimeZone is $localTimeZone") if (wasNull()) { null - } else + } else { new Timestamp(TIMESTAMP_FORMATTER.withZone(localTimeZone).parseMillis(String.valueOf(value))) + } } override def getTimestamp(columnIndex: Int, cal: Calendar): Timestamp = { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala index ed8936cc77..c7de7d3734 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala @@ -34,8 +34,7 @@ object JDBCDriverPreExecutionHook extends Logging { val hooks = new ArrayBuffer[JDBCDriverPreExecutionHook]() CommonVars( "wds.linkis.jdbc.pre.hook.class", - "org.apache.linkis.ujes.jdbc.hook.impl.TableauPreExecutionHook," + - "org.apache.linkis.ujes.jdbc.hook.impl.NoLimitExecutionHook" + "org.apache.linkis.ujes.jdbc.hook.impl.TableauPreExecutionHook" ).getValue.split(",") foreach { hookStr => Utils.tryCatch { val clazz = Class.forName(hookStr.trim) diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/impl/NoLimitExecutionHook.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/impl/NoLimitExecutionHook.scala deleted file mode 100644 index 103a544772..0000000000 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/impl/NoLimitExecutionHook.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.ujes.jdbc.hook.impl - -import org.apache.linkis.ujes.jdbc.UJESSQLDriverMain -import org.apache.linkis.ujes.jdbc.hook.JDBCDriverPreExecutionHook - -import java.util.Locale - -class NoLimitExecutionHook extends JDBCDriverPreExecutionHook { - - override def callPreExecutionHook(sql: String, skip: Boolean): String = { - if (UJESSQLDriverMain.LIMIT_ENABLED.equalsIgnoreCase("false")) { - var noLimitSql = "--set ide.engine.no.limit.allow=true\n" + sql - val lowerCaseLimitSql = noLimitSql.toLowerCase() - if (lowerCaseLimitSql.contains("limit ") && lowerCaseLimitSql.contains("tableausql")) { - val lastIndexOfLimit = lowerCaseLimitSql.lastIndexOf("limit ") - noLimitSql = noLimitSql.substring(0, lastIndexOfLimit) - } - noLimitSql - } else { - sql - } - - } - -}