diff --git a/bml/bml-engine-hook/pom.xml b/bml/bml-engine-hook/pom.xml new file mode 100644 index 0000000000..037a4ab3b0 --- /dev/null +++ b/bml/bml-engine-hook/pom.xml @@ -0,0 +1,58 @@ + + + + linkis + com.webank.wedatasphere.linkis + 0.9.1 + + 4.0.0 + + linkis-bml-hook + ${linkis.version} + + + + com.webank.wedatasphere.linkis + linkis-bmlclient + ${linkis.version} + + + com.webank.wedatasphere.linkis + linkis-bmlcommon + ${linkis.version} + + + com.webank.wedatasphere.linkis + linkis-ujes-engine + ${linkis.version} + provided + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + + ${basedir}/src/main/resources + + + + + + \ No newline at end of file diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala new file mode 100644 index 0000000000..a8d04c8346 --- /dev/null +++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/conf/BmlHookConf.scala @@ -0,0 +1,11 @@ +package com.webank.wedatasphere.linkis.bml.conf + +import com.webank.wedatasphere.linkis.common.conf.CommonVars + +/** + * created by cooperyang on 2019/9/23 + * Description: + */ +object BmlHookConf { + val WORK_DIR_STR = CommonVars("wds.linkis.bml.work.dir", "user.dir") +} diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala new file mode 100644 index 0000000000..b67b065076 --- /dev/null +++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/exception/BmlHookDownloadException.scala @@ -0,0 +1,9 @@ +package com.webank.wedatasphere.linkis.bml.exception + +import com.webank.wedatasphere.linkis.common.exception.ErrorException + +/** + * created by cooperyang on 2019/9/25 + * Description: + */ +case class BmlHookDownloadException(errMsg:String) extends ErrorException(50046, errMsg) diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala new file mode 100644 index 0000000000..28f3cc0403 --- /dev/null +++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlEnginePreExecuteHook.scala @@ -0,0 +1,78 @@ +package com.webank.wedatasphere.linkis.bml.hook + +import java.io.File +import java.util + +import com.webank.wedatasphere.linkis.bml.client.{BmlClient, BmlClientFactory} +import com.webank.wedatasphere.linkis.bml.exception.BmlHookDownloadException +import com.webank.wedatasphere.linkis.bml.utils.BmlHookUtils +import com.webank.wedatasphere.linkis.common.exception.ErrorException +import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} +import com.webank.wedatasphere.linkis.engine.ResourceExecuteRequest +import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext +import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook +import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest +import org.apache.commons.lang.StringUtils + +import scala.collection.JavaConversions._ +/** + * created by cooperyang on 2019/9/23 + * Description: + */ +class BmlEnginePreExecuteHook extends EnginePreExecuteHook with Logging{ + override val hookName: String = "BmlEnginePreExecuteHook" + + val RESOURCES_STR = "resources" + + val RESOURCE_ID_STR = "resourceId" + + val VERSION_STR = "version" + + val FILE_NAME_STR = "fileName" + + val processUser:String = System.getProperty("user.name") + + val defaultUser:String = "hadoop" + + val bmlClient:BmlClient = if (StringUtils.isNotEmpty(processUser)) + BmlClientFactory.createBmlClient(processUser) else BmlClientFactory.createBmlClient(defaultUser) + + val seperator:String = File.separator + + val pathType:String = "file://" + + override def callPreExecuteHook(engineExecutorContext: EngineExecutorContext, executeRequest: ExecuteRequest): Unit = { + //1.删除工作目录以前的资源文件 + //2.下载资源到当前进程的工作目录 + + val workDir = BmlHookUtils.getCurrentWorkDir + val jobId = engineExecutorContext.getJobId + executeRequest match { + case resourceExecuteRequest:ResourceExecuteRequest => val resources = resourceExecuteRequest.resources + resources foreach { + case resource:util.Map[String, Object] => val fileName = resource.get(FILE_NAME_STR).toString + val resourceId = resource.get(RESOURCE_ID_STR).toString + val version = resource.get(VERSION_STR).toString + val fullPath = if (workDir.endsWith(seperator)) pathType + workDir + fileName else + pathType + workDir + seperator + fileName + val response = Utils.tryCatch{ + bmlClient.downloadResource(processUser, resourceId, version, fullPath, true) + }{ + case error:ErrorException => logger.error("download resource for {} failed", error) + throw error + case t:Throwable => logger.error(s"download resource for $jobId failed", t) + val e1 = BmlHookDownloadException(t.getMessage) + e1.initCause(t) + throw t + } + if (response.isSuccess){ + logger.info(s"for job $jobId resourceId $resourceId version $version download to path $fullPath ok") + }else{ + logger.warn(s"for job $jobId resourceId $resourceId version $version download to path $fullPath Failed") + } + case _ => logger.warn("job resource cannot download") + } + case _ => + } + } +} diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala new file mode 100644 index 0000000000..632e9f9ba2 --- /dev/null +++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/hook/BmlResourceParser.scala @@ -0,0 +1,29 @@ +package com.webank.wedatasphere.linkis.bml.hook + +/** + * created by cooperyang on 2019/9/23 + * Description: + */ + +case class ResourceVersion(resourceId:String, version:String) + + +trait BmlResourceParser { + /** + * 通过传入的code + * @param code + * @return + */ + def getResource(code:String):Array[ResourceVersion] +} + + +object DefaultBmlResourceParser extends BmlResourceParser{ + /** + * 通过传入的code + * + * @param code + * @return + */ + override def getResource(code: String): Array[ResourceVersion] = Array.empty +} \ No newline at end of file diff --git a/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala new file mode 100644 index 0000000000..d82b46bd00 --- /dev/null +++ b/bml/bml-engine-hook/src/main/scala/com/webank/wedatasphere/linkis/bml/utils/BmlHookUtils.scala @@ -0,0 +1,20 @@ +package com.webank.wedatasphere.linkis.bml.utils + +import com.webank.wedatasphere.linkis.common.utils.Utils + +/** + * created by cooperyang on 2019/9/24 + * Description: + */ +object BmlHookUtils { + val WORK_DIR_STR = "user.dir" + def getCurrentWorkDir:String = System.getProperty(WORK_DIR_STR) + + + def deleteAllFiles(workDir:String):Unit = { + + } + + + +} diff --git a/pom.xml b/pom.xml index f6f0d0b76a..e22161c567 100644 --- a/pom.xml +++ b/pom.xml @@ -77,8 +77,10 @@ publicService/configuration publicService/variable publicService/workspace + publicService/workspace/client/workspace-httpclient metadata ujes/engine + bml/bml-engine-hook ujes/enginemanager ujes/entrance ujes/definedEngines/spark/engine diff --git a/ujes/definedEngines/python/engine/pom.xml b/ujes/definedEngines/python/engine/pom.xml index eadecd3551..c6150329e3 100644 --- a/ujes/definedEngines/python/engine/pom.xml +++ b/ujes/definedEngines/python/engine/pom.xml @@ -65,6 +65,11 @@ scalatest_2.11 2.2.6 + + com.webank.wedatasphere.linkis + linkis-bml-hook + ${linkis.version} + diff --git a/ujes/definedEngines/spark/engine/pom.xml b/ujes/definedEngines/spark/engine/pom.xml index fcf86d5922..1981f34883 100644 --- a/ujes/definedEngines/spark/engine/pom.xml +++ b/ujes/definedEngines/spark/engine/pom.xml @@ -56,6 +56,11 @@ guava compile + + com.webank.wedatasphere.linkis + linkis-bml-hook + ${linkis.version} + org.apache.spark spark-core_${scala.binary.version} diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala new file mode 100644 index 0000000000..52d2998222 --- /dev/null +++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/ResourceExecuteRequest.scala @@ -0,0 +1,9 @@ +package com.webank.wedatasphere.linkis.engine + +/** + * created by cooperyang on 2019/11/29 + * Description: + */ +trait ResourceExecuteRequest { + val resources:java.util.List[Object] +} diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala index 2512a10c74..3de772fc8e 100644 --- a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala +++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/conf/EngineConfiguration.scala @@ -59,5 +59,5 @@ object EngineConfiguration { val ENGINE_PUSH_PROGRESS_TO_ENTRANCE = CommonVars("wds.linkis.engine.push.progress.enable", true) - + val ENGINE_PRE_EXECUTE_HOOK_CLASSES = CommonVars("wds.linkis.engine.pre.hook.class", "com.webank.wedatasphere.linkis.bml.hook.BmlEnginePreExecuteHook") } diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala index dd192e8a67..95c4107c0e 100644 --- a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala +++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/execute/EngineExecutor.scala @@ -18,13 +18,17 @@ package com.webank.wedatasphere.linkis.engine.execute import com.webank.wedatasphere.linkis.common.log.LogUtils import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} +import com.webank.wedatasphere.linkis.engine.conf.EngineConfiguration import com.webank.wedatasphere.linkis.engine.exception.EngineErrorException +import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook import com.webank.wedatasphere.linkis.resourcemanager.Resource import com.webank.wedatasphere.linkis.scheduler.executer._ import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory import org.apache.commons.lang.StringUtils import org.apache.commons.lang.exception.ExceptionUtils +import scala.collection.mutable.ArrayBuffer + /** * Created by enjoyyin on 2018/9/17. */ @@ -41,6 +45,24 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole private var succeedNum = 0 + private val enginePreExecuteHooks:Array[EnginePreExecuteHook] = { + val hooks = new ArrayBuffer[EnginePreExecuteHook]() + EngineConfiguration.ENGINE_PRE_EXECUTE_HOOK_CLASSES.getValue.split(",") foreach { + hookStr => Utils.tryCatch{ + val clazz = Class.forName(hookStr) + val obj = clazz.newInstance() + obj match { + case hook:EnginePreExecuteHook => hooks += hook + case _ => logger.warn(s"obj is not a engineHook obj is ${obj.getClass}") + } + }{ + case e:Exception => logger.error("failed to load class", e) + } + } + hooks.toArray + } + + def setCodeParser(codeParser: CodeParser) = this.codeParser = Some(codeParser) def setResultSetListener(resultSetListener: ResultSetListener) = this.resultSetListener = Some(resultSetListener) def getResultSetListener = resultSetListener @@ -95,6 +117,15 @@ abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boole else if(isSupportParallelism) whenAvailable(f) else ensureIdle(f) ensureOp { val engineExecutorContext = createEngineExecutorContext(executeRequest) + Utils.tryCatch{ + enginePreExecuteHooks foreach { + hook => logger.info(s"${hook.hookName} begins to do a hook") + hook.callPreExecuteHook(engineExecutorContext, executeRequest) + logger.info(s"${hook.hookName} ends to do a hook") + } + }{ + case e:Exception => logger.info("failed to do with hook") + } var response: ExecuteResponse = null val incomplete = new StringBuilder val codes = Utils.tryCatch(codeParser.map(_.parse(executeRequest.code, engineExecutorContext)).getOrElse(Array(executeRequest.code))){ diff --git a/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala new file mode 100644 index 0000000000..1bbbfcbd34 --- /dev/null +++ b/ujes/engine/src/main/scala/com/webank/wedatasphere/linkis/engine/extension/EnginePreExecuteHook.scala @@ -0,0 +1,13 @@ +package com.webank.wedatasphere.linkis.engine.extension + +import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext +import com.webank.wedatasphere.linkis.scheduler.executer.ExecuteRequest + +/** + * created by cooperyang on 2019/11/29 + * Description: + */ +trait EnginePreExecuteHook { + val hookName:String + def callPreExecuteHook(engineExecutorContext:EngineExecutorContext, executeRequest: ExecuteRequest) +} \ No newline at end of file