From a51ed4037d51803469655d1f0675c52bd0aa2973 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Sun, 17 Sep 2023 17:54:00 +0800 Subject: [PATCH] Reuse nebula session --- .../executor/NebulaEngineConnExecutor.java | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java index 9be5d0efee..45b904bca3 100644 --- a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java +++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java @@ -32,6 +32,7 @@ import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary; import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException; import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError; +import org.apache.linkis.governance.common.paser.SQLCodeParser; import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; import org.apache.linkis.manager.common.entity.resource.LoadResource; import org.apache.linkis.manager.common.entity.resource.NodeResource; @@ -78,11 +79,11 @@ public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor { private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class); private int id; private List> executorLabels = new ArrayList<>(2); - private Map sessionCache = new ConcurrentHashMap<>(); + private Map resultSetCache = new ConcurrentHashMap<>(); private Map configMap = new HashMap<>(); - private Cache nebulaPoolCache = + private Cache sessionCache = CacheBuilder.newBuilder() .expireAfterAccess( Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()), @@ -97,6 +98,7 @@ public NebulaEngineConnExecutor(int outputPrintLimit, int id) { @Override public void init() { + setCodeParser(new SQLCodeParser()); super.init(); } @@ -120,8 +122,8 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) { new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel)); } - nebulaPoolCache.put( - engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap)); + sessionCache.put( + engineConnTask.getTaskId(), getSession(engineConnTask.getProperties(), configMap)); return super.execute(engineConnTask); } @@ -136,9 +138,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, logger.info("Nebula client begins to run ngql code:\n {}", realCode); String taskId = engineExecutorContext.getJobId().get(); - NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId); - Session session = getSession(nebulaPool); - sessionCache.put(taskId, session); + Session session = sessionCache.getIfPresent(taskId); try { initialStatusUpdates(taskId, engineExecutorContext, session); @@ -146,6 +146,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, try { resultSet = session.execute(code); + resultSetCache.put(taskId, resultSet); } catch (Exception e) { logger.error("Nebula executor error."); throw new NebulaExecuteError( @@ -168,7 +169,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext, return errorResponse; } } finally { - sessionCache.remove(taskId); + resultSetCache.remove(taskId); } } @@ -190,10 +191,7 @@ public JobProgressInfo[] getProgressInfo(String taskID) { @Override public void killTask(String taskId) { - Session session = sessionCache.remove(taskId); - if (null != session) { - session.release(); - } + resultSetCache.remove(taskId); super.killTask(taskId); } @@ -276,7 +274,9 @@ private NebulaPool getNebulaPool(Map taskParams, Map taskParams, Map cacheMap) { + NebulaPool nebulaPool = getNebulaPool(taskParams, cacheMap); + Session session; String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap); String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap); @@ -366,14 +366,7 @@ private ErrorExecuteResponse verifyServerError( @Override public void killAll() { - Iterator iterator = sessionCache.values().iterator(); - while (iterator.hasNext()) { - Session session = iterator.next(); - if (session != null) { - session.release(); - } - } - sessionCache.clear(); + resultSetCache.clear(); } @Override