Skip to content

Commit

Permalink
Reuse nebula session
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Sep 17, 2023
1 parent e7e0ec7 commit a51ed40
Showing 1 changed file with 14 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.linkis.engineplugin.nebula.errorcode.NebulaErrorCodeSummary;
import org.apache.linkis.engineplugin.nebula.exception.NebulaClientException;
import org.apache.linkis.engineplugin.nebula.exception.NebulaExecuteError;
import org.apache.linkis.governance.common.paser.SQLCodeParser;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
Expand Down Expand Up @@ -78,11 +79,11 @@ public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor {
private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class);
private int id;
private List<Label<?>> executorLabels = new ArrayList<>(2);
private Map<String, Session> sessionCache = new ConcurrentHashMap<>();
private Map<String, ResultSet> resultSetCache = new ConcurrentHashMap<>();

private Map<String, String> configMap = new HashMap<>();

private Cache<String, NebulaPool> nebulaPoolCache =
private Cache<String, Session> sessionCache =
CacheBuilder.newBuilder()
.expireAfterAccess(
Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()),
Expand All @@ -97,6 +98,7 @@ public NebulaEngineConnExecutor(int outputPrintLimit, int id) {

@Override
public void init() {
setCodeParser(new SQLCodeParser());
super.init();
}

Expand All @@ -120,8 +122,8 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) {
new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel));
}

nebulaPoolCache.put(
engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap));
sessionCache.put(
engineConnTask.getTaskId(), getSession(engineConnTask.getProperties(), configMap));
return super.execute(engineConnTask);
}

Expand All @@ -136,16 +138,15 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,
logger.info("Nebula client begins to run ngql code:\n {}", realCode);

String taskId = engineExecutorContext.getJobId().get();
NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId);
Session session = getSession(nebulaPool);
sessionCache.put(taskId, session);
Session session = sessionCache.getIfPresent(taskId);

try {
initialStatusUpdates(taskId, engineExecutorContext, session);
ResultSet resultSet = null;

try {
resultSet = session.execute(code);
resultSetCache.put(taskId, resultSet);
} catch (Exception e) {
logger.error("Nebula executor error.");
throw new NebulaExecuteError(
Expand All @@ -168,7 +169,7 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,
return errorResponse;
}
} finally {
sessionCache.remove(taskId);
resultSetCache.remove(taskId);
}
}

Expand All @@ -190,10 +191,7 @@ public JobProgressInfo[] getProgressInfo(String taskID) {

@Override
public void killTask(String taskId) {
Session session = sessionCache.remove(taskId);
if (null != session) {
session.release();
}
resultSetCache.remove(taskId);
super.killTask(taskId);
}

Expand Down Expand Up @@ -276,7 +274,9 @@ private NebulaPool getNebulaPool(Map<String, Object> taskParams, Map<String, Str
return nebulaPool;
}

private Session getSession(NebulaPool nebulaPool) {
private Session getSession(Map<String, Object> taskParams, Map<String, String> cacheMap) {
NebulaPool nebulaPool = getNebulaPool(taskParams, cacheMap);

Session session;
String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
Expand Down Expand Up @@ -366,14 +366,7 @@ private ErrorExecuteResponse verifyServerError(

@Override
public void killAll() {
Iterator<Session> iterator = sessionCache.values().iterator();
while (iterator.hasNext()) {
Session session = iterator.next();
if (session != null) {
session.release();
}
}
sessionCache.clear();
resultSetCache.clear();
}

@Override
Expand Down

0 comments on commit a51ed40

Please sign in to comment.