From 745699ca98b6e8855fc7bf5561bf899b6828eb34 Mon Sep 17 00:00:00 2001 From: Longping Jie Date: Tue, 26 Sep 2023 17:44:36 +0800 Subject: [PATCH] Add the function of kill task and add TEXT ResultSet --- .../licenses/LICENSE-jaxb-impl.txt | 2 +- .../licenses/LICENSE-jersey-guice.txt | 2 +- .../hbase/HBaseConnectionManager.java | 2 +- .../hbase/shell/HBaseShellSession.java | 3 ++ .../hbase/conf/HBaseConfiguration.scala | 2 +- .../executor/HBaseEngineConnExecutor.scala | 54 ++++++++++++++++--- pom.xml | 2 +- 7 files changed, 55 insertions(+), 12 deletions(-) diff --git a/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt b/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt index b1c74f95ed..68076ad96b 100644 --- a/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt +++ b/linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt @@ -756,4 +756,4 @@ module. An independent module is a module which is not derived from or based on this library. If you modify this library, you may extend this exception to your version of the library, but you are not obligated to do so. If you do not wish to do so, delete this exception statement -from your version. +from your version. \ No newline at end of file diff --git a/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt b/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt index 9e9429fce4..68076ad96b 100644 --- a/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt +++ b/linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt @@ -307,7 +307,7 @@ COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1 The Covered Software is a "commercial item," as that term is defined in 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer - software" (as that term is defined at 48 C.F.R. ß + software" (as that term is defined at 48 C.F.R. § 252.227-7014(a)(1)) and "commercial computer software documentation" as such terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48 C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java index 0260a19e77..fb53747980 100644 --- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java +++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/HBaseConnectionManager.java @@ -219,7 +219,7 @@ public void run() { while (times < KERBEROS_RE_LOGIN_MAX_RETRY) { if (runKerberosLogin()) { - LOG.info("Ran kerberos re login command successfully."); + LOG.info("Run kerberos re login command successfully."); break; } else { times++; diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java index 0152741aa5..a0aef55797 100644 --- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java +++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/java/org/apache/linkis/manager/engineplugin/hbase/shell/HBaseShellSession.java @@ -252,6 +252,9 @@ public Result execute(String cmd) { @Override public void destroy() { + if (!this.isConnected()) { + LOGGER.info("The hbase shell session has closed."); + } if (this.scriptingContainer != null) { this.scriptingContainer.terminate(); } diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala index c6959e738d..5569b981f0 100644 --- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala +++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/conf/HBaseConfiguration.scala @@ -20,5 +20,5 @@ package org.apache.linkis.manager.engineplugin.hbase.conf import org.apache.linkis.common.conf.CommonVars object HBaseConfiguration { - val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("wds.linkis.engineconn.hbase.concurrent.limit", 100) + val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.hbase.concurrent.limit", 100) } diff --git a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala index 2d7c0d1899..db74116de4 100644 --- a/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala +++ b/linkis-engineconn-plugins/hbase/hbase-core/src/main/scala/org/apache/linkis/manager/engineplugin/hbase/executor/HBaseEngineConnExecutor.scala @@ -19,6 +19,7 @@ package org.apache.linkis.manager.engineplugin.hbase.executor import org.apache.linkis.common.conf.Configuration import org.apache.linkis.common.utils.{OverloadUtils, Utils} +import org.apache.linkis.engineconn.common.conf.EngineConnConstant import org.apache.linkis.engineconn.computation.executor.execute.{ ConcurrentComputationExecutor, EngineExecutionContext @@ -47,20 +48,34 @@ import org.apache.linkis.protocol.CacheableProtocol import org.apache.linkis.protocol.engine.JobProgressInfo import org.apache.linkis.rpc.{RPCMapCache, Sender} import org.apache.linkis.scheduler.executer.{ + AliasOutputExecuteResponse, ErrorExecuteResponse, - ExecuteResponse, - SuccessExecuteResponse + ExecuteResponse } +import org.apache.linkis.storage.{LineMetaData, LineRecord} +import org.apache.linkis.storage.resultset.ResultSetFactory import org.apache.commons.collections.CollectionUtils +import org.apache.commons.io.IOUtils import java.util import java.util.Collections import scala.collection.JavaConverters._ +import com.google.common.cache.{Cache, CacheBuilder} + class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor { - private val shellSessionManager = HBaseShellSessionManager.getInstance(); + private val shellSessionManager = HBaseShellSessionManager.getInstance() + + private val hbaseShellTaskRunningContainer: Cache[String, String] = + CacheBuilder.newBuilder.maximumSize(EngineConnConstant.MAX_TASK_NUM).build[String, String] + + private val hbaseShellSessionCache: Cache[String, HBaseShellSession] = + CacheBuilder.newBuilder + .maximumSize(EngineConnConstant.MAX_TASK_NUM) + .build[String, HBaseShellSession] + private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2) override def init(): Unit = { @@ -73,6 +88,7 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor code: String ): ExecuteResponse = { val realCode = code.trim() + val taskId = engineExecutorContext.getJobId.get var properties: util.Map[String, String] = Collections.emptyMap() Utils.tryCatch({ properties = getHBaseRuntimeParams(engineExecutorContext) @@ -84,16 +100,29 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor var shellSession: HBaseShellSession = null Utils.tryCatch({ shellSession = shellSessionManager.getHBaseShellSession(properties) + hbaseShellSessionCache.put(taskId, shellSession) }) { e: Throwable => logger.error(s"created hbase shell session error! $e") return ErrorExecuteResponse("created hbase shell session error!", e) } + + hbaseShellTaskRunningContainer.put(taskId, "1") val result: Result = shellSession.execute(realCode) + hbaseShellTaskRunningContainer.invalidate(taskId) if (!result.isSuccess) { return ErrorExecuteResponse(result.getResult, result.getThrowable) } - engineExecutorContext.appendStdout(result.getResult) - SuccessExecuteResponse() + val resultSetWriter = + engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE) + resultSetWriter.addMetaData(new LineMetaData()) + resultSetWriter.addRecord(new LineRecord(result.getResult)) + + val output = if (resultSetWriter != null) resultSetWriter.toString else null + Utils.tryQuietly { + IOUtils.closeQuietly(resultSetWriter) + } + logger.info("HBase shell command executed completed.") + AliasOutputExecuteResponse(null, output) } private def getHBaseRuntimeParams( @@ -169,13 +198,24 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor override def killAll(): Unit = { logger.info("Killing all query task.") - + val concurrentMap = hbaseShellTaskRunningContainer.asMap() + if (concurrentMap.isEmpty) { + return + } + val taskIdSet = concurrentMap.keySet().asScala + for (taskId <- taskIdSet) { + killTask(taskId) + } logger.info("All query task has killed successfully.") } override def killTask(taskId: String): Unit = { logger.info(s"Killing hbase query task $taskId") - super.killTask(taskId) + val hbaseShellSession: HBaseShellSession = hbaseShellSessionCache.getIfPresent(taskId) + if (hbaseShellSession == null) { + logger.info(s"Can not get hbase shell session by taskId $taskId") + } + hbaseShellSession.destroy() logger.info(s"The query task $taskId has killed successfully.") } diff --git a/pom.xml b/pom.xml index 4699ad1b50..9ef6835630 100644 --- a/pom.xml +++ b/pom.xml @@ -1791,7 +1791,7 @@ hbase.profile - 2.5 + 1.2