Skip to content

Commit

Permalink
spark support yarn cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Aug 11, 2023
1 parent 160ab55 commit e676775
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {

@Override
public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
EngineNode dbEngineNode = nodeManagerPersistence.getEngineNode(ticketId);
EngineNode dbEngineNode = nodeManagerPersistence.getEngineNodeByTicketId(ticketId);
if (null == dbEngineNode) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
return false;
}

if (engineNodeInfo.getServiceInstance() != null) {
engineNode.setServiceInstance(engineNodeInfo.getServiceInstance());
}

if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
Pair<String, Optional<Boolean>> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node)
*/
EngineNode getEngineNode(ServiceInstance serviceInstance);

EngineNode getEngineNode(String ticketId);
EngineNode getEngineNodeByTicketId(String ticketId);

/**
* 通过Em的ServiceInstance 获取EM下面Engine的列表
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,34 +275,24 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) {
}

@Override
public EngineNode getEngineNode(String ticketId) {
public EngineNode getEngineNodeByTicketId(String ticketId) {
AMEngineNode amEngineNode = new AMEngineNode();
// amEngineNode.setServiceInstance(serviceInstance);
PersistenceNode engineNode = nodeManagerMapper.getNodeInstanceByTicketId(ticketId);

if (null == engineNode) {
return null;
}

ServiceInstance serviceInstance = new ServiceInstance();
serviceInstance.setInstance(engineNode.getInstance());
serviceInstance.setApplicationName(engineNode.getName());
amEngineNode.setServiceInstance(serviceInstance);

amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
amEngineNode.setIdentifier(engineNode.getIdentifier());
amEngineNode.setTicketId(engineNode.getTicketId());
amEngineNode.setStartTime(engineNode.getCreateTime());
// PersistenceNode emNode =
//
// nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance());
// if (emNode != null) {
// String emInstance = emNode.getInstance();
// String emName = emNode.getName();
// ServiceInstance emServiceInstance = new ServiceInstance();
// emServiceInstance.setApplicationName(emName);
// emServiceInstance.setInstance(emInstance);
// AMEMNode amemNode = new AMEMNode();
// amemNode.setMark(emNode.getMark());
// amemNode.setOwner(emNode.getOwner());
// amemNode.setServiceInstance(emServiceInstance);
// amemNode.setStartTime(emNode.getCreateTime());
// amEngineNode.setEMNode(amemNode);
// }
return amEngineNode;
}

Expand Down

0 comments on commit e676775

Please sign in to comment.