diff --git a/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt b/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt
index b1c74f95ed..68076ad96b 100644
--- a/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt
+++ b/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt
@@ -756,4 +756,4 @@ module. An independent module is a module which is not derived from or
based on this library. If you modify this library, you may extend this
exception to your version of the library, but you are not obligated to
do so. If you do not wish to do so, delete this exception statement
-from your version.
+from your version.
\ No newline at end of file
diff --git a/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt b/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt
index 9e9429fce4..68076ad96b 100644
--- a/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt
+++ b/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt
@@ -307,7 +307,7 @@ COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1
The Covered Software is a "commercial item," as that term is defined
in 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer
- software" (as that term is defined at 48 C.F.R. ß
+ software" (as that term is defined at 48 C.F.R. §
252.227-7014(a)(1)) and "commercial computer software documentation"
as such terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent
with 48 C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4
diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java
index 0260a19e77..fb53747980 100644
--- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java
+++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java
@@ -219,7 +219,7 @@ public void run() {
while (times < KERBEROS_RE_LOGIN_MAX_RETRY) {
if (runKerberosLogin()) {
- LOG.info("Ran kerberos re login command successfully.");
+ LOG.info("Run kerberos re login command successfully.");
break;
} else {
times++;
diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java
index 0152741aa5..a0aef55797 100644
--- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java
+++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java
@@ -252,6 +252,9 @@ public Result execute(String cmd) {
@Override
public void destroy() {
+ if (!this.isConnected()) {
+ LOGGER.info("The hbase shell session has closed.");
+ }
if (this.scriptingContainer != null) {
this.scriptingContainer.terminate();
}
diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala
index c6959e738d..5569b981f0 100644
--- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala
+++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala
@@ -20,5 +20,5 @@ package org.apache.linkis.manager.engineplugin.hbase.conf
import org.apache.linkis.common.conf.CommonVars
object HBaseConfiguration {
- val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("wds.linkis.engineconn.hbase.concurrent.limit", 100)
+ val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.hbase.concurrent.limit", 100)
}
diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala
index 2d7c0d1899..db74116de4 100644
--- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala
+++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.manager.engineplugin.hbase.executor
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
+import org.apache.linkis.engineconn.common.conf.EngineConnConstant
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
@@ -47,20 +48,34 @@ import org.apache.linkis.protocol.CacheableProtocol
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.{RPCMapCache, Sender}
import org.apache.linkis.scheduler.executer.{
+ AliasOutputExecuteResponse,
ErrorExecuteResponse,
- ExecuteResponse,
- SuccessExecuteResponse
+ ExecuteResponse
}
+import org.apache.linkis.storage.{LineMetaData, LineRecord}
+import org.apache.linkis.storage.resultset.ResultSetFactory
import org.apache.commons.collections.CollectionUtils
+import org.apache.commons.io.IOUtils
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
+import com.google.common.cache.{Cache, CacheBuilder}
+
class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor {
- private val shellSessionManager = HBaseShellSessionManager.getInstance();
+ private val shellSessionManager = HBaseShellSessionManager.getInstance()
+
+ private val hbaseShellTaskRunningContainer: Cache[String, String] =
+ CacheBuilder.newBuilder.maximumSize(EngineConnConstant.MAX_TASK_NUM).build[String, String]
+
+ private val hbaseShellSessionCache: Cache[String, HBaseShellSession] =
+ CacheBuilder.newBuilder
+ .maximumSize(EngineConnConstant.MAX_TASK_NUM)
+ .build[String, HBaseShellSession]
+
private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)
override def init(): Unit = {
@@ -73,6 +88,7 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor
code: String
): ExecuteResponse = {
val realCode = code.trim()
+ val taskId = engineExecutorContext.getJobId.get
var properties: util.Map[String, String] = Collections.emptyMap()
Utils.tryCatch({
properties = getHBaseRuntimeParams(engineExecutorContext)
@@ -84,16 +100,29 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor
var shellSession: HBaseShellSession = null
Utils.tryCatch({
shellSession = shellSessionManager.getHBaseShellSession(properties)
+ hbaseShellSessionCache.put(taskId, shellSession)
}) { e: Throwable =>
logger.error(s"created hbase shell session error! $e")
return ErrorExecuteResponse("created hbase shell session error!", e)
}
+
+ hbaseShellTaskRunningContainer.put(taskId, "1")
val result: Result = shellSession.execute(realCode)
+ hbaseShellTaskRunningContainer.invalidate(taskId)
if (!result.isSuccess) {
return ErrorExecuteResponse(result.getResult, result.getThrowable)
}
- engineExecutorContext.appendStdout(result.getResult)
- SuccessExecuteResponse()
+ val resultSetWriter =
+ engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE)
+ resultSetWriter.addMetaData(new LineMetaData())
+ resultSetWriter.addRecord(new LineRecord(result.getResult))
+
+ val output = if (resultSetWriter != null) resultSetWriter.toString else null
+ Utils.tryQuietly {
+ IOUtils.closeQuietly(resultSetWriter)
+ }
+ logger.info("HBase shell command executed completed.")
+ AliasOutputExecuteResponse(null, output)
}
private def getHBaseRuntimeParams(
@@ -169,13 +198,24 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor
override def killAll(): Unit = {
logger.info("Killing all query task.")
-
+ val concurrentMap = hbaseShellTaskRunningContainer.asMap()
+ if (concurrentMap.isEmpty) {
+ return
+ }
+ val taskIdSet = concurrentMap.keySet().asScala
+ for (taskId <- taskIdSet) {
+ killTask(taskId)
+ }
logger.info("All query task has killed successfully.")
}
override def killTask(taskId: String): Unit = {
logger.info(s"Killing hbase query task $taskId")
- super.killTask(taskId)
+ val hbaseShellSession: HBaseShellSession = hbaseShellSessionCache.getIfPresent(taskId)
+ if (hbaseShellSession == null) {
+ logger.info(s"Can not get hbase shell session by taskId $taskId")
+ }
+ hbaseShellSession.destroy()
logger.info(s"The query task $taskId has killed successfully.")
}
diff --git a/pom.xml b/pom.xml
index 4699ad1b50..9ef6835630 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1791,7 +1791,7 @@
hbase.profile
- 2.5
+ 1.2