From 4dc6e25edab334e13ce06d649ac6e11755ed6ffb Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 12 Sep 2023 15:36:59 +0800 Subject: [PATCH] [WIP][Feature] Add nebula engine to linkis --- linkis-engineconn-plugins/nebula/pom.xml | 111 +++++ .../nebula/src/main/assembly/distribution.xml | 71 +++ .../nebula/NebulaEngineConnPlugin.java | 72 +++ .../NebulaProcessEngineConnLaunchBuilder.java | 30 ++ .../nebula/conf/NebulaConfiguration.java | 63 +++ .../nebula/conf/NebulaEngineConf.java | 53 ++ .../errorcode/NebulaErrorCodeSummary.java | 47 ++ .../exception/NebulaClientException.java | 27 + .../NebulaStateInvalidException.java | 27 + .../executor/NebulaEngineConnExecutor.java | 468 ++++++++++++++++++ .../nebula/utils/NebulaSQLHook.java | 34 ++ .../resources/linkis-engineconn.properties | 23 + .../nebula/src/main/resources/log4j2.xml | 91 ++++ .../factory/NebulaEngineConnFactory.scala | 44 ++ pom.xml | 1 + 15 files changed, 1162 insertions(+) create mode 100644 linkis-engineconn-plugins/nebula/pom.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml create mode 100644 linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala diff --git a/linkis-engineconn-plugins/nebula/pom.xml b/linkis-engineconn-plugins/nebula/pom.xml new file mode 100644 index 0000000000..6b7cc32d81 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/pom.xml @@ -0,0 +1,111 @@ + + + + 4.0.0 + + org.apache.linkis + linkis + ${revision} + ../../pom.xml + + + linkis-engineplugin-nebula + + + + org.apache.linkis + linkis-engineconn-plugin-core + ${project.version} + + + + org.apache.linkis + linkis-computation-engineconn + ${project.version} + + + + org.apache.linkis + linkis-storage + ${project.version} + provided + + + + org.apache.linkis + linkis-rpc + ${project.version} + provided + + + + org.apache.linkis + linkis-common + ${project.version} + provided + + + + + com.vesoft + client + ${nebula.version} + + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + org.apache.maven.plugins + maven-assembly-plugin + false + + false + out + false + false + + src/main/assembly/distribution.xml + + + + + make-assembly + + single + + package + + + src/main/assembly/distribution.xml + + + + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml new file mode 100644 index 0000000000..eaa9c296f1 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml @@ -0,0 +1,71 @@ + + + + + linkis-engineplugin-nebula + + dir + zip + + true + nebula + + + + + + /dist/${nebula.version}/lib + true + true + false + false + true + + + + + + + + ${basedir}/src/main/resources + + linkis-engineconn.properties + log4j2.xml + + 0777 + dist/${nebula.version}/conf + unix + + + + ${basedir}/target + + *.jar + + + *doc.jar + + 0777 + plugin/${nebula.version} + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java new file mode 100644 index 0000000000..a22d2c8a84 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula; + +import org.apache.linkis.engineplugin.nebula.builder.NebulaProcessEngineConnLaunchBuilder; +import org.apache.linkis.engineplugin.nebula.factory.NebulaEngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin; +import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory; +import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder; +import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory; +import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory; +import org.apache.linkis.manager.label.entity.Label; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class NebulaEngineConnPlugin implements EngineConnPlugin { + private Object resourceLocker = new Object(); + private Object engineFactoryLocker = new Object(); + private volatile EngineResourceFactory engineResourceFactory; + private volatile EngineConnFactory engineFactory; + private List> defaultLabels = new ArrayList<>(); + + @Override + public void init(Map params) {} + + @Override + public EngineResourceFactory getEngineResourceFactory() { + if (null == engineResourceFactory) { + synchronized (resourceLocker) { + engineResourceFactory = new GenericEngineResourceFactory(); + } + } + return engineResourceFactory; + } + + @Override + public EngineConnLaunchBuilder getEngineConnLaunchBuilder() { + return new NebulaProcessEngineConnLaunchBuilder(); + } + + @Override + public EngineConnFactory getEngineConnFactory() { + if (null == engineFactory) { + synchronized (engineFactoryLocker) { + engineFactory = new NebulaEngineConnFactory(); + } + } + return engineFactory; + } + + @Override + public List> getDefaultLabels() { + return defaultLabels; + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java new file mode 100644 index 0000000000..4efcf85765 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.builder; + +import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.storage.utils.StorageConfiguration; + +public class NebulaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { + + @Override + public String getEngineStartUser(UserCreatorLabel label) { + return StorageConfiguration.HDFS_ROOT_USER.getValue(); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java new file mode 100644 index 0000000000..64f186d075 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.conf; + +import org.apache.linkis.common.conf.CommonVars; + +public class NebulaConfiguration { + + public static final CommonVars ENGINE_CONCURRENT_LIMIT = + CommonVars.apply("wds.linkis.engineconn.concurrent.limit", 100); + + // unit in seconds + public static final CommonVars PRESTO_HTTP_CONNECT_TIME_OUT = + CommonVars.apply("wds.linkis.presto.http.connectTimeout", 60L); + + public static final CommonVars PRESTO_HTTP_READ_TIME_OUT = + CommonVars.apply("wds.linkis.presto.http.readTimeout", 60L); + + public static final CommonVars ENGINE_DEFAULT_LIMIT = + CommonVars.apply("wds.linkis.presto.default.limit", 5000); + + public static final CommonVars PRESTO_URL = + CommonVars.apply("wds.linkis.presto.url", "http://127.0.0.1:8080"); + + public static final CommonVars PRESTO_RESOURCE_CONFIG_PATH = + CommonVars.apply("wds.linkis.presto.resource.config", ""); + + public static final CommonVars PRESTO_USER_NAME = + CommonVars.apply("wds.linkis.presto.username", "default"); + + public static final CommonVars PRESTO_PASSWORD = + CommonVars.apply("wds.linkis.presto.password", ""); + + public static final CommonVars PRESTO_CATALOG = + CommonVars.apply("wds.linkis.presto.catalog", "system"); + + public static final CommonVars PRESTO_SCHEMA = + CommonVars.apply("wds.linkis.presto.schema", ""); + + public static final CommonVars PRESTO_SOURCE = + CommonVars.apply("wds.linkis.presto.source", "global"); + + public static final CommonVars PRESTO_REQUEST_MEMORY = + CommonVars.apply("presto.session.query_max_total_memory", "8GB"); + + public static final CommonVars PRESTO_SQL_HOOK_ENABLED = + CommonVars.apply("linkis.presto.sql.hook.enabled", true, "presto sql hook"); +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java new file mode 100644 index 0000000000..92cc32ca01 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.conf; + +import org.apache.linkis.common.conf.Configuration; +import org.apache.linkis.governance.common.protocol.conf.RequestQueryEngineConfigWithGlobalConfig; +import org.apache.linkis.governance.common.protocol.conf.ResponseQueryConfig; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.CacheableProtocol; +import org.apache.linkis.rpc.RPCMapCache; + +import java.util.Map; + +import scala.Tuple2; + +public class NebulaEngineConf + extends RPCMapCache, String, String> { + + public NebulaEngineConf() { + super(Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME().getValue()); + } + + @Override + public CacheableProtocol createRequest(Tuple2 labelTuple) { + return new RequestQueryEngineConfigWithGlobalConfig(labelTuple._1(), labelTuple._2(), null); + } + + @Override + public Map createMap(Object obj) { + if (obj instanceof ResponseQueryConfig) { + ResponseQueryConfig response = (ResponseQueryConfig) obj; + return response.getKeyAndValue(); + } else { + return null; + } + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java new file mode 100644 index 0000000000..082361ef6d --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.errorcode; + +import org.apache.linkis.common.errorcode.ErrorCodeUtils; +import org.apache.linkis.common.errorcode.LinkisErrorCode; + +public enum NebulaErrorCodeSummary implements LinkisErrorCode { + PRESTO_STATE_INVALID( + 26001, "Presto status error,statement is not finished(Presto服务状态异常, 查询语句没有执行结束)"), + PRESTO_CLIENT_ERROR(26002, "Presto client error(Presto客户端异常)"); + + private final int errorCode; + + private final String errorDesc; + + NebulaErrorCodeSummary(int errorCode, String errorDesc) { + ErrorCodeUtils.validateErrorCode(errorCode, 26000, 29999); + this.errorCode = errorCode; + this.errorDesc = errorDesc; + } + + @Override + public int getErrorCode() { + return errorCode; + } + + @Override + public String getErrorDesc() { + return errorDesc; + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java new file mode 100644 index 0000000000..59b3620b03 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class NebulaClientException extends ErrorException { + + public NebulaClientException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java new file mode 100644 index 0000000000..202d478b76 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.exception; + +import org.apache.linkis.common.exception.ErrorException; + +public class NebulaStateInvalidException extends ErrorException { + + public NebulaStateInvalidException(int errorCode, String message) { + super(errorCode, message); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java new file mode 100644 index 0000000000..76e1821172 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.executor; + +import org.apache.linkis.common.exception.ErrorException; +import org.apache.linkis.common.io.resultset.ResultSetWriter; +import org.apache.linkis.common.log.LogUtils; +import org.apache.linkis.common.utils.OverloadUtils; +import org.apache.linkis.engineconn.common.conf.EngineConnConf; +import org.apache.linkis.engineconn.common.conf.EngineConnConstant; +import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask; +import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor; +import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext; +import org.apache.linkis.engineconn.core.EngineConnObject; +import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration; +import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf; +import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary; +import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException; +import org.apache.linkis.engineplugin.nebula.exception.NebulaStateInvalidException; +import org.apache.linkis.engineplugin.nebula.utils.NebulaSQLHook; +import org.apache.linkis.governance.common.paser.SQLCodeParser; +import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; +import org.apache.linkis.manager.common.entity.resource.LoadResource; +import org.apache.linkis.manager.common.entity.resource.NodeResource; +import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils; +import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; +import org.apache.linkis.protocol.engine.JobProgressInfo; +import org.apache.linkis.rpc.Sender; +import org.apache.linkis.scheduler.executer.ErrorExecuteResponse; +import org.apache.linkis.scheduler.executer.ExecuteResponse; +import org.apache.linkis.scheduler.executer.SuccessExecuteResponse; +import org.apache.linkis.storage.domain.Column; +import org.apache.linkis.storage.domain.DataType; +import org.apache.linkis.storage.resultset.ResultSetFactory; +import org.apache.linkis.storage.resultset.table.TableMetaData; +import org.apache.linkis.storage.resultset.table.TableRecord; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import org.springframework.util.CollectionUtils; + +import java.net.URI; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import com.facebook.presto.client.*; +import com.facebook.presto.spi.security.SelectedRole; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import okhttp3.OkHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor { + + private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class); + + private static OkHttpClient okHttpClient = + new OkHttpClient.Builder() + .socketFactory(new SocketChannelSocketFactory()) + .connectTimeout( + NebulaConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue(), TimeUnit.SECONDS) + .readTimeout(NebulaConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue(), TimeUnit.SECONDS) + .build(); + private int id; + private List> executorLabels = new ArrayList<>(2); + private Map statementClientCache = new ConcurrentHashMap<>(); + private Cache clientSessionCache = + CacheBuilder.newBuilder() + .expireAfterAccess( + Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()), + TimeUnit.MILLISECONDS) + .maximumSize(EngineConnConstant.MAX_TASK_NUM()) + .build(); + + public NebulaEngineConnExecutor(int outputPrintLimit, int id) { + super(outputPrintLimit); + this.id = id; + } + + @Override + public void init() { + setCodeParser(new SQLCodeParser()); + super.init(); + } + + @Override + public ExecuteResponse execute(EngineConnTask engineConnTask) { + String user = getUserCreatorLabel(engineConnTask.getLables()).getUser(); + Optional> userCreatorLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof UserCreatorLabel) + .findFirst(); + Optional> engineTypeLabelOp = + Arrays.stream(engineConnTask.getLables()) + .filter(label -> label instanceof EngineTypeLabel) + .findFirst(); + + Map configMap = null; + if (userCreatorLabelOp.isPresent() && engineTypeLabelOp.isPresent()) { + UserCreatorLabel userCreatorLabel = (UserCreatorLabel) userCreatorLabelOp.get(); + EngineTypeLabel engineTypeLabel = (EngineTypeLabel) engineTypeLabelOp.get(); + + configMap = + new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); + } + + clientSessionCache.put( + engineConnTask.getTaskId(), + getClientSession(user, engineConnTask.getProperties(), configMap)); + return super.execute(engineConnTask); + } + + @Override + public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { + boolean enableSqlHook = NebulaConfiguration.PRESTO_SQL_HOOK_ENABLED.getValue(); + String realCode; + if (StringUtils.isBlank(code)) { + realCode = "SELECT 1"; + } else if (enableSqlHook) { + realCode = NebulaSQLHook.preExecuteHook(code.trim()); + } else { + realCode = code.trim(); + } + logger.info("presto client begins to run psql code:\n {}", realCode); + + String taskId = engineExecutorContext.getJobId().get(); + ClientSession clientSession = clientSessionCache.getIfPresent(taskId); + StatementClient statement = + StatementClientFactory.newStatementClient(okHttpClient, clientSession, realCode); + statementClientCache.put(taskId, statement); + + try { + initialStatusUpdates(taskId, engineExecutorContext, statement); + if (statement.isRunning() + || (statement.isFinished() && statement.finalStatusInfo().getError() == null)) { + queryOutput(taskId, engineExecutorContext, statement); + } + ErrorExecuteResponse errorResponse = null; + try { + errorResponse = verifyServerError(taskId, engineExecutorContext, statement); + } catch (ErrorException e) { + logger.error("Presto execute failed (#{}): {}", e.getErrCode(), e.getMessage()); + } + if (errorResponse == null) { + // update session + clientSessionCache.put(taskId, updateSession(clientSession, statement)); + return new SuccessExecuteResponse(); + } else { + return errorResponse; + } + } finally { + statementClientCache.remove(taskId); + } + } + + @Override + public ExecuteResponse executeCompletely( + EngineExecutionContext engineExecutorContext, String code, String completedLine) { + return null; + } + + // todo + @Override + public float progress(String taskID) { + return 0.0f; + } + + @Override + public JobProgressInfo[] getProgressInfo(String taskID) { + return new JobProgressInfo[0]; + } + + @Override + public void killTask(String taskId) { + StatementClient statement = statementClientCache.remove(taskId); + if (null != statement) { + statement.cancelLeafStage(); + } + super.killTask(taskId); + } + + @Override + public List> getExecutorLabels() { + return executorLabels; + } + + @Override + public void setExecutorLabels(List> labels) { + if (!CollectionUtils.isEmpty(labels)) { + executorLabels.clear(); + executorLabels.addAll(labels); + } + } + + @Override + public boolean supportCallBackLogs() { + return false; + } + + @Override + public NodeResource requestExpectedResource(NodeResource expectedResource) { + return null; + } + + @Override + public NodeResource getCurrentNodeResource() { + NodeResourceUtils.appendMemoryUnitIfMissing( + EngineConnObject.getEngineCreationContext().getOptions()); + + CommonNodeResource resource = new CommonNodeResource(); + LoadResource usedResource = new LoadResource(OverloadUtils.getProcessMaxMemory(), 1); + resource.setUsedResource(usedResource); + return resource; + } + + @Override + public String getId() { + return Sender.getThisServiceInstance().getInstance() + "_" + id; + } + + @Override + public int getConcurrentLimit() { + return NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue(); + } + + private ClientSession getClientSession( + String user, Map taskParams, Map cacheMap) { + Map configMap = new HashMap<>(); + // The parameter priority specified at runtime is higher than the configuration priority of the + // management console + if (!CollectionUtils.isEmpty(cacheMap)) { + configMap.putAll(cacheMap); + } + taskParams.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue()))); + + URI httpUri = URI.create(NebulaConfiguration.PRESTO_URL.getValue(configMap)); + String source = NebulaConfiguration.PRESTO_SOURCE.getValue(configMap); + String catalog = NebulaConfiguration.PRESTO_CATALOG.getValue(configMap); + String schema = NebulaConfiguration.PRESTO_SCHEMA.getValue(configMap); + + Map properties = + configMap.entrySet().stream() + .filter(entry -> entry.getKey().startsWith("presto.session.")) + .collect( + Collectors.toMap( + entry -> entry.getKey().substring("presto.session.".length()), + Map.Entry::getValue)); + + String clientInfo = "Linkis"; + String transactionId = null; + Optional traceToken = Optional.empty(); + Set clientTags = Collections.emptySet(); + String timeZonId = TimeZone.getDefault().getID(); + Locale locale = Locale.getDefault(); + Map resourceEstimates = Collections.emptyMap(); + Map preparedStatements = Collections.emptyMap(); + Map roles = Collections.emptyMap(); + Map extraCredentials = Collections.emptyMap(); + io.airlift.units.Duration clientRequestTimeout = + new io.airlift.units.Duration(0, TimeUnit.MILLISECONDS); + + return new ClientSession( + httpUri, + user, + source, + traceToken, + clientTags, + clientInfo, + catalog, + schema, + timeZonId, + locale, + resourceEstimates, + properties, + preparedStatements, + roles, + extraCredentials, + transactionId, + clientRequestTimeout); + } + + private UserCreatorLabel getUserCreatorLabel(Label[] labels) { + return (UserCreatorLabel) + Arrays.stream(labels).filter(label -> label instanceof UserCreatorLabel).findFirst().get(); + } + + private void initialStatusUpdates( + String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) { + while (statement.isRunning() + && (statement.currentData().getData() == null + || statement.currentStatusInfo().getUpdateType() != null)) { + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + statement.advance(); + } + } + + private void queryOutput( + String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) { + int columnCount = 0; + int rows = 0; + ResultSetWriter resultSetWriter = + engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE); + try { + QueryStatusInfo results = null; + if (statement.isRunning()) { + results = statement.currentStatusInfo(); + } else { + results = statement.finalStatusInfo(); + } + if (results.getColumns() == null) { + throw new RuntimeException("presto columns is null."); + } + List columns = + results.getColumns().stream() + .map( + column -> new Column(column.getName(), DataType.toDataType(column.getType()), "")) + .collect(Collectors.toList()); + columnCount = columns.size(); + resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new Column[0]))); + while (statement.isRunning()) { + Iterable> data = statement.currentData().getData(); + if (data != null) { + for (List row : data) { + String[] rowArray = row.stream().map(r -> String.valueOf(r)).toArray(String[]::new); + resultSetWriter.addRecord(new TableRecord(rowArray)); + rows += 1; + } + } + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + statement.advance(); + } + } catch (Exception e) { + IOUtils.closeQuietly(resultSetWriter); + } + String message = String.format("Fetched %d col(s) : %d row(s) in presto", columnCount, rows); + logger.info(message); + engineExecutorContext.appendStdout(LogUtils.generateInfo(message)); + engineExecutorContext.sendResultSet(resultSetWriter); + } + + private ErrorExecuteResponse verifyServerError( + String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) + throws ErrorException { + engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); + if (statement.isFinished()) { + QueryStatusInfo info = statement.finalStatusInfo(); + if (info.getError() != null) { + QueryError error = Objects.requireNonNull(info.getError()); + logger.error("Presto execute failed (#{}): {}", info.getId(), error.getMessage()); + Throwable cause = null; + if (error.getFailureInfo() != null) { + cause = error.getFailureInfo().toException(); + } + engineExecutorContext.appendStdout( + LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause))); + return new ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause); + } else { + return null; + } + } else if (statement.isClientAborted()) { + logger.warn("Presto statement is killed."); + return null; + } else if (statement.isClientError()) { + throw new NebulaClientException( + NebulaErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode(), + NebulaErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc()); + } else { + throw new NebulaStateInvalidException( + NebulaErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode(), + NebulaErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc()); + } + } + + private ClientSession updateSession(ClientSession clientSession, StatementClient statement) { + ClientSession newSession = clientSession; + + // update catalog and schema if present + if (statement.getSetCatalog().isPresent() || statement.getSetSchema().isPresent()) { + newSession = + ClientSession.builder(newSession) + .withCatalog(statement.getSetCatalog().orElse(newSession.getCatalog())) + .withSchema(statement.getSetSchema().orElse(newSession.getSchema())) + .build(); + } + + // update transaction ID if necessary + if (statement.isClearTransactionId()) { + newSession = ClientSession.stripTransactionId(newSession); + } + + ClientSession.Builder builder = ClientSession.builder(newSession); + + if (statement.getStartedTransactionId() != null) { + builder = builder.withTransactionId(statement.getStartedTransactionId()); + } + + // update session properties if present + if (!statement.getSetSessionProperties().isEmpty() + || !statement.getResetSessionProperties().isEmpty()) { + Map sessionProperties = new HashMap<>(newSession.getProperties()); + sessionProperties.putAll(statement.getSetSessionProperties()); + sessionProperties.keySet().removeAll(statement.getResetSessionProperties()); + builder = builder.withProperties(sessionProperties); + } + + // update session roles + if (!statement.getSetRoles().isEmpty()) { + Map roles = new HashMap<>(newSession.getRoles()); + roles.putAll(statement.getSetRoles()); + builder = builder.withRoles(roles); + } + + // update prepared statements if present + if (!statement.getAddedPreparedStatements().isEmpty() + || !statement.getDeallocatedPreparedStatements().isEmpty()) { + Map preparedStatements = new HashMap<>(newSession.getPreparedStatements()); + preparedStatements.putAll(statement.getAddedPreparedStatements()); + preparedStatements.keySet().removeAll(statement.getDeallocatedPreparedStatements()); + builder = builder.withPreparedStatements(preparedStatements); + } + + return builder.build(); + } + + @Override + public void killAll() { + Iterator iterator = statementClientCache.values().iterator(); + while (iterator.hasNext()) { + StatementClient statement = iterator.next(); + if (statement != null) { + statement.cancelLeafStage(); + } + } + statementClientCache.clear(); + } + + @Override + public void close() { + killAll(); + super.close(); + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java new file mode 100644 index 0000000000..37d1f8d658 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.utils; + +import org.apache.commons.lang3.StringUtils; + +public class NebulaSQLHook { + public static String preExecuteHook(String code) { + return replaceBackQuoted(code); + } + + private static String replaceBackQuoted(String code) { + if (StringUtils.isNotBlank(code)) { + return code.replaceAll("`", "\""); + } else { + return code; + } + } +} diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties new file mode 100644 index 0000000000..059eccb793 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +wds.linkis.server.version=v1 +#wds.linkis.engineconn.debug.enable=true +#wds.linkis.keytab.enable=true +wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.nebula.NebulaEngineConnPlugin + +wds.linkis.engineconn.support.parallelism=true + +wds.linkis.engineconn.max.free.time=0 \ No newline at end of file diff --git a/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..2cd3e264c3 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml @@ -0,0 +1,91 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala new file mode 100644 index 0000000000..9a3263f596 --- /dev/null +++ b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.engineplugin.nebula.factory + +import org.apache.linkis.engineconn.common.creation.EngineCreationContext +import org.apache.linkis.engineconn.common.engineconn.EngineConn +import org.apache.linkis.engineconn.computation.executor.creation.ComputationSingleExecutorEngineConnFactory +import org.apache.linkis.engineconn.executor.entity.LabelExecutor +import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration +import org.apache.linkis.engineplugin.nebula.executor.NebulaEngineConnExecutor +import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType +import org.apache.linkis.manager.label.entity.engine.RunType.RunType +import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} + +class NebulaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory { + + override def newExecutor( + id: Int, + engineCreationContext: EngineCreationContext, + engineConn: EngineConn + ): LabelExecutor = { + new NebulaEngineConnExecutor(NebulaConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id) + } + + override protected def getEngineConnType: EngineType = EngineType.PRESTO + + override protected def getRunType: RunType = RunType.PRESTO_SQL + +} diff --git a/pom.xml b/pom.xml index 001e8189d6..f9930b12b8 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 1.5.0 1 0.234 + 3.0.0 python2 2.1.2 1