diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java index 392f4dd097..c12e2791b3 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java @@ -48,9 +48,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver { static String TOKEN_VALUE = "value"; static String PASSWORD = "password"; static boolean TABLEAU_SERVER = false; - static String LIMIT_ENABLED = "true"; - static String LIMIT = "limit"; - + static String FIXED_SESSION = "fixedSession"; static String VERSION = "version"; static int DEFAULT_VERSION = 1; static String MAX_CONNECTION_SIZE = "maxConnectionSize"; diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java index 8f7dcfed12..3e1e7e3182 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/utils/JDBCUtils.java @@ -17,10 +17,16 @@ package org.apache.linkis.ujes.jdbc.utils; +import org.apache.linkis.common.utils.Utils; + +import java.util.concurrent.atomic.AtomicInteger; + public class JDBCUtils { private static final char SEARCH_STRING_ESCAPE = '\\'; + public static final AtomicInteger idCreator = new AtomicInteger(); + public static String convertPattern(final String pattern) { if (pattern == null) { return ".*"; @@ -50,4 +56,8 @@ public static String convertPattern(final String pattern) { return result.toString(); } } + + public static String getUniqId() { + return Utils.getLocalHostname() + "_" + JDBCUtils.idCreator.getAndIncrement(); + } } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..a9b58e153b 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -25,6 +25,7 @@ import org.apache.linkis.ujes.client.UJESClient import org.apache.linkis.ujes.client.request.JobSubmitAction import org.apache.linkis.ujes.client.response.JobExecuteResult import org.apache.linkis.ujes.jdbc.UJESSQLDriverMain._ +import org.apache.linkis.ujes.jdbc.utils.JDBCUtils import org.apache.commons.lang3.StringUtils @@ -99,6 +100,18 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope private[jdbc] val serverURL = props.getProperty("URL") + private[jdbc] val fixedSessionEnabled = + if ( + props + .containsKey(FIXED_SESSION) && "true".equalsIgnoreCase(props.getProperty(FIXED_SESSION)) + ) { + true + } else { + false + } + + private val connectionId = JDBCUtils.getUniqId() + private val labelMap: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef] private val startupParams: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef] @@ -444,6 +457,10 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope labelMap.put(LabelKeyConstant.ENGINE_TYPE_KEY, engineTypeLabel.getStringValue) labelMap.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, s"$user-$creator") labelMap.put(LabelKeyConstant.CODE_TYPE_KEY, engineToCodeType(engineTypeLabel.getEngineType)) + if (fixedSessionEnabled) { + labelMap.put(LabelKeyConstant.FIXED_EC_KEY, connectionId) + logger.info("Fixed session is enable session id is {}", connectionId) + } val jobSubmitAction = JobSubmitAction.builder .addExecuteCode(code) @@ -462,4 +479,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope result } + override def toString: String = "LinkisConnection_" + connectionId + } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala index 517f8b07ae..bea24181a3 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESClientFactory.scala @@ -66,7 +66,6 @@ object UJESClientFactory extends Logging { clientConfigBuilder.setAuthenticationStrategy(new StaticAuthenticationStrategy()) clientConfigBuilder.readTimeout(100000) clientConfigBuilder.maxConnectionSize(20) - clientConfigBuilder.readTimeout(10000) val params = props.getProperty(PARAMS) var versioned = false if (StringUtils.isNotBlank(params)) { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala index 6296f399e5..ab2f6dda10 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala @@ -29,10 +29,10 @@ import java.sql.{ DriverPropertyInfo, SQLFeatureNotSupportedException } -import java.util.Properties +import java.util.{Locale, Properties} import java.util.logging.Logger -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ class UJESSQLDriverMain extends Driver with Logging { @@ -72,9 +72,8 @@ class UJESSQLDriverMain extends Driver with Logging { case Array(TOKEN_VALUE, value) => props.setProperty(TOKEN_VALUE, value) false - case Array(LIMIT, value) => - props.setProperty(LIMIT, value) - UJESSQLDriverMain.LIMIT_ENABLED = value.toLowerCase() + case Array(FIXED_SESSION, value) => + props.setProperty(FIXED_SESSION, value) false case Array(key, _) => if (StringUtils.isBlank(key)) { @@ -138,8 +137,7 @@ object UJESSQLDriverMain { val TOKEN_VALUE = UJESSQLDriver.TOKEN_VALUE val PASSWORD = UJESSQLDriver.PASSWORD val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER - val LIMIT = UJESSQLDriver.LIMIT - var LIMIT_ENABLED = UJESSQLDriver.LIMIT_ENABLED + val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION val VERSION = UJESSQLDriver.VERSION val DEFAULT_VERSION = UJESSQLDriver.DEFAULT_VERSION @@ -157,8 +155,7 @@ object UJESSQLDriverMain { connectionParams: String, variableMap: java.util.Map[String, Any] ): String = { - val variables = JavaConversions - .mapAsScalaMap(variableMap) + val variables = variableMap.asScala .map(kv => VARIABLE_HEADER + kv._1 + KV_SPLIT + kv._2) .mkString(PARAM_SPLIT) if (StringUtils.isNotBlank(connectionParams)) connectionParams + PARAM_SPLIT + variables @@ -186,17 +183,20 @@ object UJESSQLDriverMain { ): String = { val sb = new StringBuilder if (StringUtils.isNotBlank(version)) sb.append(VERSION).append(KV_SPLIT).append(version) - if (maxConnectionSize > 0) + if (maxConnectionSize > 0) { sb.append(PARAM_SPLIT).append(MAX_CONNECTION_SIZE).append(KV_SPLIT).append(maxConnectionSize) - if (readTimeout > 0) + } + if (readTimeout > 0) { sb.append(PARAM_SPLIT).append(READ_TIMEOUT).append(KV_SPLIT).append(readTimeout) + } if (enableDiscovery) { sb.append(PARAM_SPLIT).append(ENABLE_DISCOVERY).append(KV_SPLIT).append(enableDiscovery) - if (enableLoadBalancer) + if (enableLoadBalancer) { sb.append(PARAM_SPLIT) .append(ENABLE_LOADBALANCER) .append(KV_SPLIT) .append(enableLoadBalancer) + } } if (sb.startsWith(PARAM_SPLIT)) sb.toString.substring(PARAM_SPLIT.length) else sb.toString } diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala index 5df56733e4..0ed47925c6 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala @@ -193,7 +193,7 @@ class UJESSQLResultSet( if (metaData == null) init() currentRowCursor += 1 if (null == resultSetRow || currentRowCursor > resultSetRow.size() - 1) { - if (UJESSQLDriverMain.LIMIT_ENABLED.equals("false") && !isCompleted) { + if (!isCompleted) { updateResultSet() if (isCompleted) { return false @@ -242,7 +242,7 @@ class UJESSQLResultSet( } else { dataType.toLowerCase(Locale.getDefault) match { case null => throw new LinkisSQLException(LinkisSQLErrorCode.METADATA_EMPTY) - case "string" => value.toString + case "char" | "varchar" | "nvarchar" | "string" => value case "short" => value.toShort case "int" => value.toInt case "long" => value.toLong @@ -250,11 +250,8 @@ class UJESSQLResultSet( case "double" => value.toDouble case "boolean" => value.toBoolean case "byte" => value.toByte - case "char" => value.toString - case "timestamp" => value.toString - case "varchar" => value.toString - case "nvarchar" => value.toString - case "date" => value.toString + case "timestamp" => value + case "date" => value case "bigint" => value.toLong case "decimal" => value.toDouble case "array" => value.toArray @@ -974,8 +971,9 @@ class UJESSQLResultSet( logger.info(s"the value of value is $value and the value of localTimeZone is $localTimeZone") if (wasNull()) { null - } else + } else { new Timestamp(TIMESTAMP_FORMATTER.withZone(localTimeZone).parseMillis(String.valueOf(value))) + } } override def getTimestamp(columnIndex: Int, cal: Calendar): Timestamp = { diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala index ed8936cc77..c7de7d3734 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/hook/JDBCDriverPreExecutionHook.scala @@ -34,8 +34,7 @@ object JDBCDriverPreExecutionHook extends Logging { val hooks = new ArrayBuffer[JDBCDriverPreExecutionHook]() CommonVars( "wds.linkis.jdbc.pre.hook.class", - "org.apache.linkis.ujes.jdbc.hook.impl.TableauPreExecutionHook," + - "org.apache.linkis.ujes.jdbc.hook.impl.NoLimitExecutionHook" + "org.apache.linkis.ujes.jdbc.hook.impl.TableauPreExecutionHook" ).getValue.split(",") foreach { hookStr => Utils.tryCatch { val clazz = Class.forName(hookStr.trim)