diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java index d18e1e5ee8..dfbb7a8b13 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java @@ -22,25 +22,25 @@ public class NebulaConfiguration { public static final CommonVars ENGINE_CONCURRENT_LIMIT = - CommonVars.apply("wds.linkis.engineconn.concurrent.limit", 100); + CommonVars.apply("linkis.engineconn.concurrent.limit", 100); public static final CommonVars ENGINE_DEFAULT_LIMIT = - CommonVars.apply("wds.linkis.nebula.default.limit", 5000); + CommonVars.apply("linkis.nebula.default.limit", 5000); public static final CommonVars NEBULA_HOST = - CommonVars.apply("wds.linkis.nebula.host", "127.0.0.1"); + CommonVars.apply("linkis.nebula.host", "127.0.0.1"); public static final CommonVars NEBULA_PORT = - CommonVars.apply("wds.linkis.nebula.port", 9669); + CommonVars.apply("linkis.nebula.port", 9669); public static final CommonVars NEBULA_MAX_CONN_SIZE = - CommonVars.apply("wds.linkis.nebula.max.conn.size", 100); + CommonVars.apply("linkis.nebula.max.conn.size", 100); public static final CommonVars NEBULA_USER_NAME = - CommonVars.apply("wds.linkis.nebula.username", "root"); + CommonVars.apply("linkis.nebula.username", "root"); public static final CommonVars NEBULA_PASSWORD = - CommonVars.apply("wds.linkis.nebula.password", "nebula"); + CommonVars.apply("linkis.nebula.password", "nebula"); public static final CommonVars NEBULA_RECONNECT_ENABLED = CommonVars.apply( diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java index 45b904bca3..188ea60ec4 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -79,11 +79,11 @@ public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor { private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class); private int id; private List> executorLabels = new ArrayList<>(2); - private Map resultSetCache = new ConcurrentHashMap<>(); + private Map sessionCache = new ConcurrentHashMap<>(); private Map configMap = new HashMap<>(); - private Cache sessionCache = + private Cache nebulaPoolCache = CacheBuilder.newBuilder() .expireAfterAccess( Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()), @@ -122,8 +122,8 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) { new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); } - sessionCache.put( - engineConnTask.getTaskId(), getSession(engineConnTask.getProperties(), configMap)); + nebulaPoolCache.put( + engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap)); return super.execute(engineConnTask); } @@ -138,38 +138,34 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, logger.info("Nebula client begins to run ngql code:\n {}", realCode); String taskId = engineExecutorContext.getJobId().get(); - Session session = sessionCache.getIfPresent(taskId); + NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId); + Session session = getSession(taskId, nebulaPool); - try { - initialStatusUpdates(taskId, engineExecutorContext, session); - ResultSet resultSet = null; + initialStatusUpdates(taskId, engineExecutorContext, session); + ResultSet resultSet = null; - try { - resultSet = session.execute(code); - resultSetCache.put(taskId, resultSet); - } catch (Exception e) { - logger.error("Nebula executor error."); - throw new NebulaExecuteError( - NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(), - NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc()); - } + try { + resultSet = session.execute(code); + } catch (Exception e) { + logger.error("Nebula executor error."); + throw new NebulaExecuteError( + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc()); + } - if (resultSet.isSucceeded() && !resultSet.isEmpty()) { - queryOutput(taskId, engineExecutorContext, resultSet); - } - ErrorExecuteResponse errorResponse = null; - try { - errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet); - } catch (ErrorException e) { - logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage()); - } - if (errorResponse == null) { - return new SuccessExecuteResponse(); - } else { - return errorResponse; - } - } finally { - resultSetCache.remove(taskId); + if (resultSet.isSucceeded() && !resultSet.isEmpty()) { + queryOutput(taskId, engineExecutorContext, resultSet); + } + ErrorExecuteResponse errorResponse = null; + try { + errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet); + } catch (ErrorException e) { + logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage()); + } + if (errorResponse == null) { + return new SuccessExecuteResponse(); + } else { + return errorResponse; } } @@ -191,7 +187,10 @@ public JobProgressInfo[] getProgressInfo(String taskID) { @Override public void killTask(String taskId) { - resultSetCache.remove(taskId); + Session session = sessionCache.remove(taskId); + if (null != session) { + session.release(); + } super.killTask(taskId); } @@ -274,24 +273,29 @@ private NebulaPool getNebulaPool(Map taskParams, Map taskParams, Map cacheMap) { - NebulaPool nebulaPool = getNebulaPool(taskParams, cacheMap); + private Session getSession(String taskId, NebulaPool nebulaPool) { + if (sessionCache.containsKey(taskId) + && sessionCache.get(taskId) != null + && sessionCache.get(taskId).ping()) { + return sessionCache.get(taskId); + } else { + Session session; + String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); + String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); + Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap); - Session session; - String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); - String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); - Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap); + try { + session = nebulaPool.getSession(username, password, reconnect); + } catch (Exception e) { + logger.error("Nebula Session initialization failed."); + throw new NebulaClientException( + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), + NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + } - try { - session = nebulaPool.getSession(username, password, reconnect); - } catch (Exception e) { - logger.error("Nebula Session initialization failed."); - throw new NebulaClientException( - NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(), - NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc()); + sessionCache.put(taskId, session); + return session; } - - return session; } private void initialStatusUpdates( @@ -366,7 +370,14 @@ private ErrorExecuteResponse verifyServerError( @Override public void killAll() { - resultSetCache.clear(); + Iterator iterator = sessionCache.values().iterator(); + while (iterator.hasNext()) { + Session session = iterator.next(); + if (session != null) { + session.release(); + } + } + sessionCache.clear(); } @Override