From 89157b829e8662e4762f50daaf3242ed1dee10db Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Wed, 13 Sep 2023 21:53:19 +0800 Subject: [PATCH] Add nebula engine to linkis --- .../ujes/jdbc/LinkisSQLConnection.scala | 1 + .../manager/am/conf/AMConfiguration.java | 7 +- .../manager/label/conf/LabelCommonConfig.java | 3 + .../label/entity/engine/EngineType.scala | 3 + .../manager/label/entity/engine/RunType.scala | 1 + .../label/utils/EngineTypeLabelCreator.java | 2 + linkis-engineconn-plugins/nebula/pom.xml | 7 +- .../NebulaProcessEngineConnLaunchBuilder.java | 10 +- .../nebula/conf/NebulaConfiguration.java | 45 +-- .../errorcode/NebulaErrorCodeSummary.java | 6 +- .../executor/NebulaEngineConnExecutor.java | 326 +++++++----------- .../nebula/utils/NebulaSQLHook.java | 34 -- .../factory/NebulaEngineConnFactory.scala | 6 +- 13 files changed, 161 insertions(+), 290 deletions(-) delete mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala index b800698766..e111615cee 100644 --- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala +++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala @@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope case EngineType.HIVE => RunType.HIVE case EngineType.TRINO => RunType.TRINO_SQL case EngineType.PRESTO => RunType.PRESTO_SQL + case EngineType.NEBULA => RunType.NEBULA_SQL case EngineType.ELASTICSEARCH => RunType.ES_SQL case EngineType.JDBC => RunType.JDBC case EngineType.PYTHON => RunType.SHELL diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java index d916387d29..8aba142670 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java @@ -68,7 +68,8 @@ public class AMConfiguration { public static final CommonVars MULTI_USER_ENGINE_TYPES = CommonVars.apply( - "wds.linkis.multi.user.engine.types", "jdbc,es,presto,io_file,appconn,openlookeng,trino"); + "wds.linkis.multi.user.engine.types", + "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula"); public static final CommonVars ALLOW_BATCH_KILL_ENGINE_TYPES = CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python"); @@ -104,8 +105,8 @@ public class AMConfiguration { public static String getDefaultMultiEngineUser() { String jvmUser = Utils.getJvmUser(); return String.format( - "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}", - jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); + "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}", + jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser); } public static boolean isMultiUserEngine(String engineType) { diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java index d0854186a5..f4b52a156b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java @@ -69,6 +69,9 @@ public class LabelCommonConfig { public static final CommonVars DATAX_ENGINE_VERSION = CommonVars.apply("wds.linkis.datax.engine.version", "3.0.0"); + public static final CommonVars NEBULA_ENGINE_VERSION = + CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0"); + public static final CommonVars PRESTO_ENGINE_VERSION = CommonVars.apply("wds.linkis.presto.engine.version", "0.234"); diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala index d47bb8ec39..77e7204a73 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala @@ -45,6 +45,8 @@ object EngineType extends Enumeration with Logging { val PRESTO = Value("presto") + val NEBULA = Value("nebula") + val FLINK = Value("flink") val APPCONN = Value("appconn") @@ -89,6 +91,7 @@ object EngineType extends Enumeration with Logging { case _ if IO_ENGINE_HDFS.toString.equalsIgnoreCase(str) => IO_ENGINE_HDFS case _ if PIPELINE.toString.equalsIgnoreCase(str) => PIPELINE case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO + case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK case _ if APPCONN.toString.equals(str) => APPCONN case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala index 21a067ed45..abb3e010f8 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala @@ -35,6 +35,7 @@ object RunType extends Enumeration { val PIPELINE = Value("pipeline") val JDBC = Value("jdbc") val PRESTO_SQL = Value("psql") + val NEBULA_SQL = Value("ngql") val JAR = Value("jar") val APPCONN = Value("appconn") val FUNCTION_MDQ_TYPE = Value("function.mdq") diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java index 0d6ae3c5c0..e90f282aaf 100644 --- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java +++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java @@ -69,6 +69,8 @@ private static void init() { EngineType.FLINK().toString(), LabelCommonConfig.FLINK_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.PRESTO().toString(), LabelCommonConfig.PRESTO_ENGINE_VERSION.getValue()); + defaultVersion.put( + EngineType.NEBULA().toString(), LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue()); defaultVersion.put( EngineType.SQOOP().toString(), LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue()); defaultVersion.put( diff --git a/linkis-engineconn-plugins/nebula/pom.xml b/linkis-engineconn-plugins/nebula/pom.xml index 6b7cc32d81..bfe9714569 100644 --- a/linkis-engineconn-plugins/nebula/pom.xml +++ b/linkis-engineconn-plugins/nebula/pom.xml @@ -62,13 +62,12 @@ - com.vesoft - client - ${nebula.version} + com.vesoft + client + ${nebula.version} - diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java index 4efcf85765..fb95910cf5 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java @@ -18,13 +18,5 @@ package org.apache.linkis.engineplugin.nebula.builder; import org.apache.linkis.manager.engineplugin.common.launch.process.JavaProcessEngineConnLaunchBuilder; -import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel; -import org.apache.linkis.storage.utils.StorageConfiguration; -public class NebulaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder { - - @Override - public String getEngineStartUser(UserCreatorLabel label) { - return StorageConfiguration.HDFS_ROOT_USER.getValue(); - } -} +public class NebulaProcessEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {} diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java index 64f186d075..d18e1e5ee8 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java @@ -24,40 +24,27 @@ public class NebulaConfiguration { public static final CommonVars ENGINE_CONCURRENT_LIMIT = CommonVars.apply("wds.linkis.engineconn.concurrent.limit", 100); - // unit in seconds - public static final CommonVars PRESTO_HTTP_CONNECT_TIME_OUT = - CommonVars.apply("wds.linkis.presto.http.connectTimeout", 60L); - - public static final CommonVars PRESTO_HTTP_READ_TIME_OUT = - CommonVars.apply("wds.linkis.presto.http.readTimeout", 60L); - public static final CommonVars ENGINE_DEFAULT_LIMIT = - CommonVars.apply("wds.linkis.presto.default.limit", 5000); - - public static final CommonVars PRESTO_URL = - CommonVars.apply("wds.linkis.presto.url", "http://127.0.0.1:8080"); - - public static final CommonVars PRESTO_RESOURCE_CONFIG_PATH = - CommonVars.apply("wds.linkis.presto.resource.config", ""); - - public static final CommonVars PRESTO_USER_NAME = - CommonVars.apply("wds.linkis.presto.username", "default"); + CommonVars.apply("wds.linkis.nebula.default.limit", 5000); - public static final CommonVars PRESTO_PASSWORD = - CommonVars.apply("wds.linkis.presto.password", ""); + public static final CommonVars NEBULA_HOST = + CommonVars.apply("wds.linkis.nebula.host", "127.0.0.1"); - public static final CommonVars PRESTO_CATALOG = - CommonVars.apply("wds.linkis.presto.catalog", "system"); + public static final CommonVars NEBULA_PORT = + CommonVars.apply("wds.linkis.nebula.port", 9669); - public static final CommonVars PRESTO_SCHEMA = - CommonVars.apply("wds.linkis.presto.schema", ""); + public static final CommonVars NEBULA_MAX_CONN_SIZE = + CommonVars.apply("wds.linkis.nebula.max.conn.size", 100); - public static final CommonVars PRESTO_SOURCE = - CommonVars.apply("wds.linkis.presto.source", "global"); + public static final CommonVars NEBULA_USER_NAME = + CommonVars.apply("wds.linkis.nebula.username", "root"); - public static final CommonVars PRESTO_REQUEST_MEMORY = - CommonVars.apply("presto.session.query_max_total_memory", "8GB"); + public static final CommonVars NEBULA_PASSWORD = + CommonVars.apply("wds.linkis.nebula.password", "nebula"); - public static final CommonVars PRESTO_SQL_HOOK_ENABLED = - CommonVars.apply("linkis.presto.sql.hook.enabled", true, "presto sql hook"); + public static final CommonVars NEBULA_RECONNECT_ENABLED = + CommonVars.apply( + "linkis.nebula.reconnect.enabled", + false, + "whether to retry after the connection is disconnected"); } diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java index 082361ef6d..80aa2e197e 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java @@ -21,9 +21,9 @@ import org.apache.linkis.common.errorcode.LinkisErrorCode; public enum NebulaErrorCodeSummary implements LinkisErrorCode { - PRESTO_STATE_INVALID( - 26001, "Presto status error,statement is not finished(Presto服务状态异常, 查询语句没有执行结束)"), - PRESTO_CLIENT_ERROR(26002, "Presto client error(Presto客户端异常)"); + NEBULA_CLIENT_INITIALIZATION_FAILED(28001, "Nebula client initialization failed(Nebula客户端初始化失败)"), + NEBULA_EXECUTOR_ERROR(28002, "Nebula executor error(Nebula执行异常)"), + NEBULA_CLIENT_ERROR(28003, "Nebula client error(Nebula客户端异常)"); private final int errorCode; diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java index 76e1821172..9be5d0efee 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -31,9 +31,7 @@ import org.apache.linkis.engineplugin.nebula.conf.NebulaEngineConf; import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary; import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException; -import org.apache.linkis.engineplugin.nebula.exception.NebulaStateInvalidException; -import org.apache.linkis.engineplugin.nebula.utils.NebulaSQLHook; -import org.apache.linkis.governance.common.paser.SQLCodeParser; +import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError; import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; import org.apache.linkis.manager.common.entity.resource.LoadResource; import org.apache.linkis.manager.common.entity.resource.NodeResource; @@ -54,11 +52,9 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.CollectionUtils; -import java.net.URI; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -66,29 +62,27 @@ import scala.Tuple2; -import com.facebook.presto.client.*; -import com.facebook.presto.spi.security.SelectedRole; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import okhttp3.OkHttpClient; +import com.vesoft.nebula.ErrorCode; +import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor { private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class); - - private static OkHttpClient okHttpClient = - new OkHttpClient.Builder() - .socketFactory(new SocketChannelSocketFactory()) - .connectTimeout( - NebulaConfiguration.PRESTO_HTTP_CONNECT_TIME_OUT.getValue(), TimeUnit.SECONDS) - .readTimeout(NebulaConfiguration.PRESTO_HTTP_READ_TIME_OUT.getValue(), TimeUnit.SECONDS) - .build(); private int id; private List> executorLabels = new ArrayList<>(2); - private Map statementClientCache = new ConcurrentHashMap<>(); - private Cache clientSessionCache = + private Map sessionCache = new ConcurrentHashMap<>(); + + private Map configMap = new HashMap<>(); + + private Cache nebulaPoolCache = CacheBuilder.newBuilder() .expireAfterAccess( Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()), @@ -103,13 +97,11 @@ public NebulaEngineConnExecutor(int outputPrintLimit, int id) { @Override public void init() { - setCodeParser(new SQLCodeParser()); super.init(); } @Override public ExecuteResponse execute(EngineConnTask engineConnTask) { - String user = getUserCreatorLabel(engineConnTask.getLables()).getUser(); Optional> userCreatorLabelOp = Arrays.stream(engineConnTask.getLables()) .filter(label -> label instanceof UserCreatorLabel) @@ -128,52 +120,55 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) { new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); } - clientSessionCache.put( - engineConnTask.getTaskId(), - getClientSession(user, engineConnTask.getProperties(), configMap)); + nebulaPoolCache.put( + engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap)); return super.execute(engineConnTask); } @Override public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, String code) { - boolean enableSqlHook = NebulaConfiguration.PRESTO_SQL_HOOK_ENABLED.getValue(); String realCode; if (StringUtils.isBlank(code)) { - realCode = "SELECT 1"; - } else if (enableSqlHook) { - realCode = NebulaSQLHook.preExecuteHook(code.trim()); + realCode = "SHOW SPACES"; } else { realCode = code.trim(); } - logger.info("presto client begins to run psql code:\n {}", realCode); + logger.info("Nebula client begins to run ngql code:\n {}", realCode); String taskId = engineExecutorContext.getJobId().get(); - ClientSession clientSession = clientSessionCache.getIfPresent(taskId); - StatementClient statement = - StatementClientFactory.newStatementClient(okHttpClient, clientSession, realCode); - statementClientCache.put(taskId, statement); + NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId); + Session session = getSession(nebulaPool); + sessionCache.put(taskId, session); try { - initialStatusUpdates(taskId, engineExecutorContext, statement); - if (statement.isRunning() - || (statement.isFinished() && statement.finalStatusInfo().getError() == null)) { - queryOutput(taskId, engineExecutorContext, statement); + initialStatusUpdates(taskId, engineExecutorContext, session); + ResultSet resultSet = null; + + try { + resultSet = session.execute(code); + } catch (Exception e) { + logger.error("Nebula executor error."); + throw new NebulaExecuteError( + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc()); + } + + if (resultSet.isSucceeded() && !resultSet.isEmpty()) { + queryOutput(taskId, engineExecutorContext, resultSet); } ErrorExecuteResponse errorResponse = null; try { - errorResponse = verifyServerError(taskId, engineExecutorContext, statement); + errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet); } catch (ErrorException e) { - logger.error("Presto execute failed (#{}): {}", e.getErrCode(), e.getMessage()); + logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage()); } if (errorResponse == null) { - // update session - clientSessionCache.put(taskId, updateSession(clientSession, statement)); return new SuccessExecuteResponse(); } else { return errorResponse; } } finally { - statementClientCache.remove(taskId); + sessionCache.remove(taskId); } } @@ -183,7 +178,6 @@ public ExecuteResponse executeCompletely( return null; } - // todo @Override public float progress(String taskID) { return 0.0f; @@ -196,9 +190,9 @@ public JobProgressInfo[] getProgressInfo(String taskID) { @Override public void killTask(String taskId) { - StatementClient statement = statementClientCache.remove(taskId); - if (null != statement) { - statement.cancelLeafStage(); + Session session = sessionCache.remove(taskId); + if (null != session) { + session.release(); } super.killTask(taskId); } @@ -247,11 +241,7 @@ public int getConcurrentLimit() { return NebulaConfiguration.ENGINE_CONCURRENT_LIMIT.getValue(); } - private ClientSession getClientSession( - String user, Map taskParams, Map cacheMap) { - Map configMap = new HashMap<>(); - // The parameter priority specified at runtime is higher than the configuration priority of the - // management console + private NebulaPool getNebulaPool(Map taskParams, Map cacheMap) { if (!CollectionUtils.isEmpty(cacheMap)) { configMap.putAll(cacheMap); } @@ -259,205 +249,131 @@ private ClientSession getClientSession( .filter(entry -> entry.getValue() != null) .forEach(entry -> configMap.put(entry.getKey(), String.valueOf(entry.getValue()))); - URI httpUri = URI.create(NebulaConfiguration.PRESTO_URL.getValue(configMap)); - String source = NebulaConfiguration.PRESTO_SOURCE.getValue(configMap); - String catalog = NebulaConfiguration.PRESTO_CATALOG.getValue(configMap); - String schema = NebulaConfiguration.PRESTO_SCHEMA.getValue(configMap); - - Map properties = - configMap.entrySet().stream() - .filter(entry -> entry.getKey().startsWith("presto.session.")) - .collect( - Collectors.toMap( - entry -> entry.getKey().substring("presto.session.".length()), - Map.Entry::getValue)); - - String clientInfo = "Linkis"; - String transactionId = null; - Optional traceToken = Optional.empty(); - Set clientTags = Collections.emptySet(); - String timeZonId = TimeZone.getDefault().getID(); - Locale locale = Locale.getDefault(); - Map resourceEstimates = Collections.emptyMap(); - Map preparedStatements = Collections.emptyMap(); - Map roles = Collections.emptyMap(); - Map extraCredentials = Collections.emptyMap(); - io.airlift.units.Duration clientRequestTimeout = - new io.airlift.units.Duration(0, TimeUnit.MILLISECONDS); - - return new ClientSession( - httpUri, - user, - source, - traceToken, - clientTags, - clientInfo, - catalog, - schema, - timeZonId, - locale, - resourceEstimates, - properties, - preparedStatements, - roles, - extraCredentials, - transactionId, - clientRequestTimeout); + String host = NebulaConfiguration.NEBULA_HOST.getValue(configMap); + Integer port = NebulaConfiguration.NEBULA_PORT.getValue(configMap); + Integer maxConnSize = NebulaConfiguration.NEBULA_MAX_CONN_SIZE.getValue(configMap); + + NebulaPool nebulaPool = new NebulaPool(); + Boolean initResult = false; + try { + + NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); + nebulaPoolConfig.setMaxConnSize(maxConnSize); + List addresses = Arrays.asList(new HostAddress(host, port)); + initResult = nebulaPool.init(addresses, nebulaPoolConfig); + } catch (Exception e) { + logger.error("NebulaPool initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + if (!initResult) { + logger.error("NebulaPool initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + return nebulaPool; } - private UserCreatorLabel getUserCreatorLabel(Label[] labels) { - return (UserCreatorLabel) - Arrays.stream(labels).filter(label -> label instanceof UserCreatorLabel).findFirst().get(); + private Session getSession(NebulaPool nebulaPool) { + Session session; + String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); + String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); + Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap); + + try { + session = nebulaPool.getSession(username, password, reconnect); + } catch (Exception e) { + logger.error("Nebula Session initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } + + return session; } private void initialStatusUpdates( - String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) { - while (statement.isRunning() - && (statement.currentData().getData() == null - || statement.currentStatusInfo().getUpdateType() != null)) { + String taskId, EngineExecutionContext engineExecutorContext, Session session) { + if (session.ping()) { engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); - statement.advance(); } } private void queryOutput( - String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) { + String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) { int columnCount = 0; - int rows = 0; ResultSetWriter resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE); + try { - QueryStatusInfo results = null; - if (statement.isRunning()) { - results = statement.currentStatusInfo(); - } else { - results = statement.finalStatusInfo(); - } - if (results.getColumns() == null) { - throw new RuntimeException("presto columns is null."); + List colNames = resultSet.keys(); + + if (CollectionUtils.isEmpty(colNames)) { + throw new RuntimeException("Nebula columns is null."); } + List columns = - results.getColumns().stream() - .map( - column -> new Column(column.getName(), DataType.toDataType(column.getType()), "")) + colNames.stream() + .map(column -> new Column(column, DataType.toDataType("string"), "")) .collect(Collectors.toList()); columnCount = columns.size(); resultSetWriter.addMetaData(new TableMetaData(columns.toArray(new Column[0]))); - while (statement.isRunning()) { - Iterable> data = statement.currentData().getData(); - if (data != null) { - for (List row : data) { - String[] rowArray = row.stream().map(r -> String.valueOf(r)).toArray(String[]::new); + if (!resultSet.isEmpty()) { + for (int i = 0; i < resultSet.rowsSize(); i++) { + ResultSet.Record record = resultSet.rowValues(i); + if (record != null) { + String[] rowArray = + record.values().stream() + .map( + x -> { + try { + return x.asString(); + } catch (Exception e) { + return ""; + } + }) + .toArray(String[]::new); resultSetWriter.addRecord(new TableRecord(rowArray)); - rows += 1; } } engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); - statement.advance(); } } catch (Exception e) { IOUtils.closeQuietly(resultSetWriter); } - String message = String.format("Fetched %d col(s) : %d row(s) in presto", columnCount, rows); + String message = + String.format("Fetched %d col(s) : %d row(s) in Nebula", columnCount, resultSet.rowsSize()); logger.info(message); engineExecutorContext.appendStdout(LogUtils.generateInfo(message)); engineExecutorContext.sendResultSet(resultSetWriter); } private ErrorExecuteResponse verifyServerError( - String taskId, EngineExecutionContext engineExecutorContext, StatementClient statement) + String taskId, EngineExecutionContext engineExecutorContext, ResultSet resultSet) throws ErrorException { engineExecutorContext.pushProgress(progress(taskId), getProgressInfo(taskId)); - if (statement.isFinished()) { - QueryStatusInfo info = statement.finalStatusInfo(); - if (info.getError() != null) { - QueryError error = Objects.requireNonNull(info.getError()); - logger.error("Presto execute failed (#{}): {}", info.getId(), error.getMessage()); - Throwable cause = null; - if (error.getFailureInfo() != null) { - cause = error.getFailureInfo().toException(); - } - engineExecutorContext.appendStdout( - LogUtils.generateERROR(ExceptionUtils.getStackTrace(cause))); - return new ErrorExecuteResponse(ExceptionUtils.getMessage(cause), cause); - } else { - return null; - } - } else if (statement.isClientAborted()) { - logger.warn("Presto statement is killed."); - return null; - } else if (statement.isClientError()) { - throw new NebulaClientException( - NebulaErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorCode(), - NebulaErrorCodeSummary.PRESTO_CLIENT_ERROR.getErrorDesc()); - } else { - throw new NebulaStateInvalidException( - NebulaErrorCodeSummary.PRESTO_STATE_INVALID.getErrorCode(), - NebulaErrorCodeSummary.PRESTO_STATE_INVALID.getErrorDesc()); - } - } - - private ClientSession updateSession(ClientSession clientSession, StatementClient statement) { - ClientSession newSession = clientSession; - - // update catalog and schema if present - if (statement.getSetCatalog().isPresent() || statement.getSetSchema().isPresent()) { - newSession = - ClientSession.builder(newSession) - .withCatalog(statement.getSetCatalog().orElse(newSession.getCatalog())) - .withSchema(statement.getSetSchema().orElse(newSession.getSchema())) - .build(); - } - - // update transaction ID if necessary - if (statement.isClearTransactionId()) { - newSession = ClientSession.stripTransactionId(newSession); - } - - ClientSession.Builder builder = ClientSession.builder(newSession); - - if (statement.getStartedTransactionId() != null) { - builder = builder.withTransactionId(statement.getStartedTransactionId()); - } - // update session properties if present - if (!statement.getSetSessionProperties().isEmpty() - || !statement.getResetSessionProperties().isEmpty()) { - Map sessionProperties = new HashMap<>(newSession.getProperties()); - sessionProperties.putAll(statement.getSetSessionProperties()); - sessionProperties.keySet().removeAll(statement.getResetSessionProperties()); - builder = builder.withProperties(sessionProperties); + if (!resultSet.isSucceeded() || resultSet.getErrorCode() != ErrorCode.SUCCEEDED.getValue()) { + logger.error( + "Nebula execute failed (#{}): {}", resultSet.getErrorCode(), resultSet.getErrorMessage()); + engineExecutorContext.appendStdout(LogUtils.generateERROR(resultSet.getErrorMessage())); + return new ErrorExecuteResponse(resultSet.getErrorMessage(), null); } - - // update session roles - if (!statement.getSetRoles().isEmpty()) { - Map roles = new HashMap<>(newSession.getRoles()); - roles.putAll(statement.getSetRoles()); - builder = builder.withRoles(roles); - } - - // update prepared statements if present - if (!statement.getAddedPreparedStatements().isEmpty() - || !statement.getDeallocatedPreparedStatements().isEmpty()) { - Map preparedStatements = new HashMap<>(newSession.getPreparedStatements()); - preparedStatements.putAll(statement.getAddedPreparedStatements()); - preparedStatements.keySet().removeAll(statement.getDeallocatedPreparedStatements()); - builder = builder.withPreparedStatements(preparedStatements); - } - - return builder.build(); + return null; } @Override public void killAll() { - Iterator iterator = statementClientCache.values().iterator(); + Iterator iterator = sessionCache.values().iterator(); while (iterator.hasNext()) { - StatementClient statement = iterator.next(); - if (statement != null) { - statement.cancelLeafStage(); + Session session = iterator.next(); + if (session != null) { + session.release(); } } - statementClientCache.clear(); + sessionCache.clear(); } @Override diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java deleted file mode 100644 index 37d1f8d658..0000000000 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/utils/NebulaSQLHook.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.engineplugin.nebula.utils; - -import org.apache.commons.lang3.StringUtils; - -public class NebulaSQLHook { - public static String preExecuteHook(String code) { - return replaceBackQuoted(code); - } - - private static String replaceBackQuoted(String code) { - if (StringUtils.isNotBlank(code)) { - return code.replaceAll("`", "\""); - } else { - return code; - } - } -} diff --git a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala index 9a3263f596..2f7c3c8fb8 100644 --- a/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala +++ b/linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala @@ -23,9 +23,9 @@ import org.apache.linkis.engineconn.computation.executor.creation.ComputationSin import org.apache.linkis.engineconn.executor.entity.LabelExecutor import org.apache.linkis.engineplugin.nebula.conf.NebulaConfiguration import org.apache.linkis.engineplugin.nebula.executor.NebulaEngineConnExecutor +import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType import org.apache.linkis.manager.label.entity.engine.RunType.RunType -import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType} class NebulaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory { @@ -37,8 +37,8 @@ class NebulaEngineConnFactory extends ComputationSingleExecutorEngineConnFactory new NebulaEngineConnExecutor(NebulaConfiguration.ENGINE_DEFAULT_LIMIT.getValue, id) } - override protected def getEngineConnType: EngineType = EngineType.PRESTO + override protected def getEngineConnType: EngineType = EngineType.NEBULA - override protected def getRunType: RunType = RunType.PRESTO_SQL + override protected def getRunType: RunType = RunType.NEBULA_SQL }