Skip to content

Commit

Permalink
Check for connection count
Browse files Browse the repository at this point in the history
Signed-off-by: pranikum <[email protected]>
  • Loading branch information
pranikum committed Sep 8, 2022
1 parent 032bd5e commit 0a38021
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class DecommissionController {
private final ClusterService clusterService;
private final TransportService transportService;
private final ThreadPool threadPool;
private final TimeValue decommissionedNodeRequestCheckInterval = TimeValue.timeValueMillis(5000);

DecommissionController(
ClusterService clusterService,
Expand Down Expand Up @@ -149,10 +148,11 @@ public void handleNodesDecommissionRequest(
List<String> zones,
String reason,
TimeValue timeout,
TimeValue timeoutForNodeDecommission,
ActionListener<Void> nodesRemovedListener
) {
setWeightForDecommissionedZone(zones);
checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, nodesRemovedListener);
checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, timeoutForNodeDecommission, nodesRemovedListener);
}

private void setWeightForDecommissionedZone(List<String> zones) {
Expand Down Expand Up @@ -304,22 +304,16 @@ public void checkHttpStatsForDecommissionedNodes(
Set<DiscoveryNode> decommissionedNodes,
String reason,
TimeValue timeout,
TimeValue timeoutForNodeDecommission,
ActionListener<Void> listener) {
ActionListener<NodesStatsResponse> nodesStatsResponseActionListener = new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodesStatsResponse) {
boolean hasActiveConnections = false;
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i=0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
if (httpStats != null && httpStats.getServerOpen() != 0) {
hasActiveConnections = true;
break;
}
}
if (hasActiveConnections) {
boolean hasActiveConnections = hasActiveConnections(nodesStatsResponse, false);

if (hasActiveConnections && timeoutForNodeDecommission.getSeconds() > 0) {
// Slow down the next call to get the Http stats from the decommissioned nodes.
scheduleDecommissionNodesRequestCheck(decommissionedNodes, this);
scheduleDecommissionNodesRequestCheck(decommissionedNodes, this, timeoutForNodeDecommission);
} else {
updateClusterStatusForDecommissioning(decommissionedNodes, reason, timeout, listener);
}
Expand All @@ -333,7 +327,28 @@ public void onFailure(Exception e) {
waitForGracefulDecommission(decommissionedNodes, nodesStatsResponseActionListener);
}

private void scheduleDecommissionNodesRequestCheck(Set<DiscoveryNode> decommissionedNodes, ActionListener<NodesStatsResponse> listener) {
private boolean hasActiveConnections(NodesStatsResponse nodesStatsResponse, boolean logConnectionStatus) {
boolean hasActiveConnections = false;
Map<String, Long> nodeActiveConnectionMap = new HashMap<>();
List<NodeStats> responseNodes = nodesStatsResponse.getNodes();
for (int i=0; i < responseNodes.size(); i++) {
HttpStats httpStats = responseNodes.get(i).getHttp();
DiscoveryNode node = responseNodes.get(i).getNode();
if (httpStats != null && httpStats.getServerOpen() != 0) {
hasActiveConnections = true;
}
nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen());
}
if (logConnectionStatus) {
logger.info("Decommissioning node with connections : " + nodeActiveConnectionMap);
}
return hasActiveConnections;
}

private void scheduleDecommissionNodesRequestCheck(
Set<DiscoveryNode> decommissionedNodes,
ActionListener<NodesStatsResponse> listener,
TimeValue timeoutForNodeDecommission) {
transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
Expand All @@ -345,7 +360,7 @@ public void run() {
public String toString() {
return "";
}
}, decommissionedNodeRequestCheckInterval, org.opensearch.threadpool.ThreadPool.Names.SAME);
}, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME);
}

private void waitForGracefulDecommission(Set<DiscoveryNode> decommissionedNodes, ActionListener<NodesStatsResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {

public void initiateAttributeDecommissioning(
final DecommissionAttribute decommissionAttribute,
TimeValue timeOutForNodeDecommission,
final ActionListener<ClusterStateUpdateResponse> listener,
ClusterState state
) {
Expand All @@ -130,7 +131,7 @@ public void initiateAttributeDecommissioning(
// be abdicated and soon will no longer be cluster manager.
if (transportService.getLocalNode().isClusterManagerNode()
&& !nodeHasDecommissionedAttribute(transportService.getLocalNode(), decommissionAttribute)) {
registerDecommissionAttribute(decommissionAttribute, listener);
registerDecommissionAttribute(decommissionAttribute, listener, timeOutForNodeDecommission);
} else {
throw new NotClusterManagerException(
"node ["
Expand Down Expand Up @@ -200,7 +201,8 @@ public void onFailure(Exception e) {
*/
private void registerDecommissionAttribute(
final DecommissionAttribute decommissionAttribute,
final ActionListener<ClusterStateUpdateResponse> listener
final ActionListener<ClusterStateUpdateResponse> listener,
final TimeValue timeOutForNodeDecommission
) {
clusterService.submitStateUpdateTask(
"put_decommission [" + decommissionAttribute + "]",
Expand Down Expand Up @@ -257,13 +259,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute());
assert DecommissionStatus.DECOMMISSION_INIT.equals(decommissionAttributeMetadata.status());
listener.onResponse(new ClusterStateUpdateResponse(true));
initiateGracefulDecommission();
initiateGracefulDecommission(timeOutForNodeDecommission);
}
}
);
}

private void initiateGracefulDecommission() {
private void initiateGracefulDecommission(final TimeValue timeOutForNodeDecommission) {

decommissionController.updateMetadataWithDecommissionStatus(
DecommissionStatus.DECOMMISSION_IN_PROGRESS,
Expand All @@ -275,7 +277,7 @@ public void onResponse(Void unused) {
DecommissionStatus.DECOMMISSION_IN_PROGRESS
);
// TODO - should trigger weigh away here and on successful weigh away -> fail the decommissioned nodes
failDecommissionedNodes(clusterService.getClusterApplierService().state());
failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission);
}

@Override
Expand All @@ -292,7 +294,7 @@ public void onFailure(Exception e) {
);
}

private void failDecommissionedNodes(ClusterState state) {
private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) {
DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().custom(DecommissionAttributeMetadata.TYPE);
assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DECOMMISSION_IN_PROGRESS)
: "unexpected status encountered while decommissioning nodes";
Expand All @@ -307,6 +309,7 @@ private void failDecommissionedNodes(ClusterState state) {
awarenessValues,
"nodes-decommissioned",
TimeValue.timeValueSeconds(30L), // TODO - read timeout from request while integrating with API
timeOutForNodeDecommission,
new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
Expand Down

0 comments on commit 0a38021

Please sign in to comment.