Skip to content

Commit

Permalink
Code optimization and remove wds prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Sep 19, 2023
1 parent a51ed40 commit 600638f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@
public class NebulaConfiguration {

public static final CommonVars<Integer> ENGINE_CONCURRENT_LIMIT =
CommonVars.apply("wds.linkis.engineconn.concurrent.limit", 100);
CommonVars.apply("linkis.engineconn.concurrent.limit", 100);

public static final CommonVars<Integer> ENGINE_DEFAULT_LIMIT =
CommonVars.apply("wds.linkis.nebula.default.limit", 5000);
CommonVars.apply("linkis.nebula.default.limit", 5000);

public static final CommonVars<String> NEBULA_HOST =
CommonVars.apply("wds.linkis.nebula.host", "127.0.0.1");
CommonVars.apply("linkis.nebula.host", "127.0.0.1");

public static final CommonVars<Integer> NEBULA_PORT =
CommonVars.apply("wds.linkis.nebula.port", 9669);
CommonVars.apply("linkis.nebula.port", 9669);

public static final CommonVars<Integer> NEBULA_MAX_CONN_SIZE =
CommonVars.apply("wds.linkis.nebula.max.conn.size", 100);
CommonVars.apply("linkis.nebula.max.conn.size", 100);

public static final CommonVars<String> NEBULA_USER_NAME =
CommonVars.apply("wds.linkis.nebula.username", "root");
CommonVars.apply("linkis.nebula.username", "root");

public static final CommonVars<String> NEBULA_PASSWORD =
CommonVars.apply("wds.linkis.nebula.password", "nebula");
CommonVars.apply("linkis.nebula.password", "nebula");

public static final CommonVars<Boolean> NEBULA_RECONNECT_ENABLED =
CommonVars.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public class NebulaEngineConnExecutor extends ConcurrentComputationExecutor {
private static final Logger logger = LoggerFactory.getLogger(NebulaEngineConnExecutor.class);
private int id;
private List<Label<?>> executorLabels = new ArrayList<>(2);
private Map<String, ResultSet> resultSetCache = new ConcurrentHashMap<>();
private Map<String, Session> sessionCache = new ConcurrentHashMap<>();

private Map<String, String> configMap = new HashMap<>();

private Cache<String, Session> sessionCache =
private Cache<String, NebulaPool> nebulaPoolCache =
CacheBuilder.newBuilder()
.expireAfterAccess(
Long.valueOf(EngineConnConf.ENGINE_TASK_EXPIRE_TIME().getValue().toString()),
Expand Down Expand Up @@ -122,8 +122,8 @@ public ExecuteResponse execute(EngineConnTask engineConnTask) {
new NebulaEngineConf().getCacheMap(new Tuple2<>(userCreatorLabel, engineTypeLabel));
}

sessionCache.put(
engineConnTask.getTaskId(), getSession(engineConnTask.getProperties(), configMap));
nebulaPoolCache.put(
engineConnTask.getTaskId(), getNebulaPool(engineConnTask.getProperties(), configMap));
return super.execute(engineConnTask);
}

Expand All @@ -138,38 +138,34 @@ public ExecuteResponse executeLine(EngineExecutionContext engineExecutorContext,
logger.info("Nebula client begins to run ngql code:\n {}", realCode);

String taskId = engineExecutorContext.getJobId().get();
Session session = sessionCache.getIfPresent(taskId);
NebulaPool nebulaPool = nebulaPoolCache.getIfPresent(taskId);
Session session = getSession(taskId, nebulaPool);

try {
initialStatusUpdates(taskId, engineExecutorContext, session);
ResultSet resultSet = null;
initialStatusUpdates(taskId, engineExecutorContext, session);
ResultSet resultSet = null;

try {
resultSet = session.execute(code);
resultSetCache.put(taskId, resultSet);
} catch (Exception e) {
logger.error("Nebula executor error.");
throw new NebulaExecuteError(
NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(),
NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc());
}
try {
resultSet = session.execute(code);
} catch (Exception e) {
logger.error("Nebula executor error.");
throw new NebulaExecuteError(
NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorCode(),
NebulaErrorCodeSummary.NEBULA_EXECUTOR_ERROR.getErrorDesc());
}

if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
queryOutput(taskId, engineExecutorContext, resultSet);
}
ErrorExecuteResponse errorResponse = null;
try {
errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet);
} catch (ErrorException e) {
logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage());
}
if (errorResponse == null) {
return new SuccessExecuteResponse();
} else {
return errorResponse;
}
} finally {
resultSetCache.remove(taskId);
if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
queryOutput(taskId, engineExecutorContext, resultSet);
}
ErrorExecuteResponse errorResponse = null;
try {
errorResponse = verifyServerError(taskId, engineExecutorContext, resultSet);
} catch (ErrorException e) {
logger.error("Nebula execute failed (#{}): {}", e.getErrCode(), e.getMessage());
}
if (errorResponse == null) {
return new SuccessExecuteResponse();
} else {
return errorResponse;
}
}

Expand All @@ -191,7 +187,10 @@ public JobProgressInfo[] getProgressInfo(String taskID) {

@Override
public void killTask(String taskId) {
resultSetCache.remove(taskId);
Session session = sessionCache.remove(taskId);
if (null != session) {
session.release();
}
super.killTask(taskId);
}

Expand Down Expand Up @@ -274,24 +273,29 @@ private NebulaPool getNebulaPool(Map<String, Object> taskParams, Map<String, Str
return nebulaPool;
}

private Session getSession(Map<String, Object> taskParams, Map<String, String> cacheMap) {
NebulaPool nebulaPool = getNebulaPool(taskParams, cacheMap);
private Session getSession(String taskId, NebulaPool nebulaPool) {
if (sessionCache.containsKey(taskId)
&& sessionCache.get(taskId) != null
&& sessionCache.get(taskId).ping()) {
return sessionCache.get(taskId);
} else {
Session session;
String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap);

Session session;
String username = NebulaConfiguration.NEBULA_USER_NAME.getValue(configMap);
String password = NebulaConfiguration.NEBULA_PASSWORD.getValue(configMap);
Boolean reconnect = NebulaConfiguration.NEBULA_RECONNECT_ENABLED.getValue(configMap);
try {
session = nebulaPool.getSession(username, password, reconnect);
} catch (Exception e) {
logger.error("Nebula Session initialization failed.");
throw new NebulaClientException(
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
}

try {
session = nebulaPool.getSession(username, password, reconnect);
} catch (Exception e) {
logger.error("Nebula Session initialization failed.");
throw new NebulaClientException(
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorCode(),
NebulaErrorCodeSummary.NEBULA_CLIENT_INITIALIZATION_FAILED.getErrorDesc());
sessionCache.put(taskId, session);
return session;
}

return session;
}

private void initialStatusUpdates(
Expand Down Expand Up @@ -366,7 +370,14 @@ private ErrorExecuteResponse verifyServerError(

@Override
public void killAll() {
resultSetCache.clear();
Iterator<Session> iterator = sessionCache.values().iterator();
while (iterator.hasNext()) {
Session session = iterator.next();
if (session != null) {
session.release();
}
}
sessionCache.clear();
}

@Override
Expand Down

0 comments on commit 600638f

Please sign in to comment.