Skip to content

Commit

Permalink
Add the function of kill task and add TEXT ResultSet
Browse files Browse the repository at this point in the history
  • Loading branch information
Longping Jie committed Sep 26, 2023
1 parent bb4672e commit 745699c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 12 deletions.
2 changes: 1 addition & 1 deletion linkis-dist/release-docs/licenses/LICENSE-jaxb-impl.txt
Original file line number Diff line number Diff line change
Expand Up @@ -756,4 +756,4 @@ module. An independent module is a module which is not derived from or
based on this library. If you modify this library, you may extend this
exception to your version of the library, but you are not obligated to
do so. If you do not wish to do so, delete this exception statement
from your version.
from your version.
2 changes: 1 addition & 1 deletion linkis-dist/release-docs/licenses/LICENSE-jersey-guice.txt
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1

The Covered Software is a "commercial item," as that term is defined
in 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer
software" (as that term is defined at 48 C.F.R. ß
software" (as that term is defined at 48 C.F.R. §
252.227-7014(a)(1)) and "commercial computer software documentation"
as such terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent
with 48 C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void run() {

while (times < KERBEROS_RE_LOGIN_MAX_RETRY) {
if (runKerberosLogin()) {
LOG.info("Ran kerberos re login command successfully.");
LOG.info("Run kerberos re login command successfully.");
break;
} else {
times++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ public Result execute(String cmd) {

@Override
public void destroy() {
if (!this.isConnected()) {
LOGGER.info("The hbase shell session has closed.");
}
if (this.scriptingContainer != null) {
this.scriptingContainer.terminate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ package org.apache.linkis.manager.engineplugin.hbase.conf
import org.apache.linkis.common.conf.CommonVars

object HBaseConfiguration {
val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("wds.linkis.engineconn.hbase.concurrent.limit", 100)
val HBASE_CONCURRENT_LIMIT = CommonVars[Int]("linkis.engineconn.hbase.concurrent.limit", 100)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.manager.engineplugin.hbase.executor

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{OverloadUtils, Utils}
import org.apache.linkis.engineconn.common.conf.EngineConnConstant
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
Expand Down Expand Up @@ -47,20 +48,34 @@ import org.apache.linkis.protocol.CacheableProtocol
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.rpc.{RPCMapCache, Sender}
import org.apache.linkis.scheduler.executer.{
AliasOutputExecuteResponse,
ErrorExecuteResponse,
ExecuteResponse,
SuccessExecuteResponse
ExecuteResponse
}
import org.apache.linkis.storage.{LineMetaData, LineRecord}
import org.apache.linkis.storage.resultset.ResultSetFactory

import org.apache.commons.collections.CollectionUtils
import org.apache.commons.io.IOUtils

import java.util
import java.util.Collections

import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder}

class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor {
private val shellSessionManager = HBaseShellSessionManager.getInstance();
private val shellSessionManager = HBaseShellSessionManager.getInstance()

private val hbaseShellTaskRunningContainer: Cache[String, String] =
CacheBuilder.newBuilder.maximumSize(EngineConnConstant.MAX_TASK_NUM).build[String, String]

private val hbaseShellSessionCache: Cache[String, HBaseShellSession] =
CacheBuilder.newBuilder
.maximumSize(EngineConnConstant.MAX_TASK_NUM)
.build[String, HBaseShellSession]

private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]](2)

override def init(): Unit = {
Expand All @@ -73,6 +88,7 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor
code: String
): ExecuteResponse = {
val realCode = code.trim()
val taskId = engineExecutorContext.getJobId.get
var properties: util.Map[String, String] = Collections.emptyMap()
Utils.tryCatch({
properties = getHBaseRuntimeParams(engineExecutorContext)
Expand All @@ -84,16 +100,29 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor
var shellSession: HBaseShellSession = null
Utils.tryCatch({
shellSession = shellSessionManager.getHBaseShellSession(properties)
hbaseShellSessionCache.put(taskId, shellSession)
}) { e: Throwable =>
logger.error(s"created hbase shell session error! $e")
return ErrorExecuteResponse("created hbase shell session error!", e)
}

hbaseShellTaskRunningContainer.put(taskId, "1")
val result: Result = shellSession.execute(realCode)
hbaseShellTaskRunningContainer.invalidate(taskId)
if (!result.isSuccess) {
return ErrorExecuteResponse(result.getResult, result.getThrowable)
}
engineExecutorContext.appendStdout(result.getResult)
SuccessExecuteResponse()
val resultSetWriter =
engineExecutorContext.createResultSetWriter(ResultSetFactory.TEXT_TYPE)
resultSetWriter.addMetaData(new LineMetaData())
resultSetWriter.addRecord(new LineRecord(result.getResult))

val output = if (resultSetWriter != null) resultSetWriter.toString else null
Utils.tryQuietly {
IOUtils.closeQuietly(resultSetWriter)
}
logger.info("HBase shell command executed completed.")
AliasOutputExecuteResponse(null, output)
}

private def getHBaseRuntimeParams(
Expand Down Expand Up @@ -169,13 +198,24 @@ class HBaseEngineConnExecutor(val id: Int) extends ConcurrentComputationExecutor

override def killAll(): Unit = {
logger.info("Killing all query task.")

val concurrentMap = hbaseShellTaskRunningContainer.asMap()
if (concurrentMap.isEmpty) {
return
}
val taskIdSet = concurrentMap.keySet().asScala
for (taskId <- taskIdSet) {
killTask(taskId)
}
logger.info("All query task has killed successfully.")
}

override def killTask(taskId: String): Unit = {
logger.info(s"Killing hbase query task $taskId")
super.killTask(taskId)
val hbaseShellSession: HBaseShellSession = hbaseShellSessionCache.getIfPresent(taskId)
if (hbaseShellSession == null) {
logger.info(s"Can not get hbase shell session by taskId $taskId")
}
hbaseShellSession.destroy()
logger.info(s"The query task $taskId has killed successfully.")
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@
<activation>
<property>
<name>hbase.profile</name>
<value>2.5</value>
<value>1.2</value>
</property>
</activation>
<properties>
Expand Down

0 comments on commit 745699c

Please sign in to comment.