From 0a38021f86995e94d38f33f1277e34a3c0877000 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Thu, 8 Sep 2022 21:47:25 +0530 Subject: [PATCH] Check for connection count Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- .../decommission/DecommissionController.java | 45 ++++++++++++------- .../decommission/DecommissionService.java | 15 ++++--- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 6b599aab60ec3..f468c07c58b1b 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -68,7 +68,6 @@ public class DecommissionController { private final ClusterService clusterService; private final TransportService transportService; private final ThreadPool threadPool; - private final TimeValue decommissionedNodeRequestCheckInterval = TimeValue.timeValueMillis(5000); DecommissionController( ClusterService clusterService, @@ -149,10 +148,11 @@ public void handleNodesDecommissionRequest( List zones, String reason, TimeValue timeout, + TimeValue timeoutForNodeDecommission, ActionListener nodesRemovedListener ) { setWeightForDecommissionedZone(zones); - checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, nodesRemovedListener); + checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, timeoutForNodeDecommission, nodesRemovedListener); } private void setWeightForDecommissionedZone(List zones) { @@ -304,22 +304,16 @@ public void checkHttpStatsForDecommissionedNodes( Set decommissionedNodes, String reason, TimeValue timeout, + TimeValue timeoutForNodeDecommission, ActionListener listener) { ActionListener nodesStatsResponseActionListener = new ActionListener() { @Override public void onResponse(NodesStatsResponse nodesStatsResponse) { - boolean hasActiveConnections = false; - List responseNodes = nodesStatsResponse.getNodes(); - for (int i=0; i < responseNodes.size(); i++) { - HttpStats httpStats = responseNodes.get(i).getHttp(); - if (httpStats != null && httpStats.getServerOpen() != 0) { - hasActiveConnections = true; - break; - } - } - if (hasActiveConnections) { + boolean hasActiveConnections = hasActiveConnections(nodesStatsResponse, false); + + if (hasActiveConnections && timeoutForNodeDecommission.getSeconds() > 0) { // Slow down the next call to get the Http stats from the decommissioned nodes. - scheduleDecommissionNodesRequestCheck(decommissionedNodes, this); + scheduleDecommissionNodesRequestCheck(decommissionedNodes, this, timeoutForNodeDecommission); } else { updateClusterStatusForDecommissioning(decommissionedNodes, reason, timeout, listener); } @@ -333,7 +327,28 @@ public void onFailure(Exception e) { waitForGracefulDecommission(decommissionedNodes, nodesStatsResponseActionListener); } - private void scheduleDecommissionNodesRequestCheck(Set decommissionedNodes, ActionListener listener) { + private boolean hasActiveConnections(NodesStatsResponse nodesStatsResponse, boolean logConnectionStatus) { + boolean hasActiveConnections = false; + Map nodeActiveConnectionMap = new HashMap<>(); + List responseNodes = nodesStatsResponse.getNodes(); + for (int i=0; i < responseNodes.size(); i++) { + HttpStats httpStats = responseNodes.get(i).getHttp(); + DiscoveryNode node = responseNodes.get(i).getNode(); + if (httpStats != null && httpStats.getServerOpen() != 0) { + hasActiveConnections = true; + } + nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen()); + } + if (logConnectionStatus) { + logger.info("Decommissioning node with connections : " + nodeActiveConnectionMap); + } + return hasActiveConnections; + } + + private void scheduleDecommissionNodesRequestCheck( + Set decommissionedNodes, + ActionListener listener, + TimeValue timeoutForNodeDecommission) { transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { @@ -345,7 +360,7 @@ public void run() { public String toString() { return ""; } - }, decommissionedNodeRequestCheckInterval, org.opensearch.threadpool.ThreadPool.Names.SAME); + }, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME); } private void waitForGracefulDecommission(Set decommissionedNodes, ActionListener listener) { diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index b3b8feeabe836..abc0221c45daa 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -109,6 +109,7 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { public void initiateAttributeDecommissioning( final DecommissionAttribute decommissionAttribute, + TimeValue timeOutForNodeDecommission, final ActionListener listener, ClusterState state ) { @@ -130,7 +131,7 @@ public void initiateAttributeDecommissioning( // be abdicated and soon will no longer be cluster manager. if (transportService.getLocalNode().isClusterManagerNode() && !nodeHasDecommissionedAttribute(transportService.getLocalNode(), decommissionAttribute)) { - registerDecommissionAttribute(decommissionAttribute, listener); + registerDecommissionAttribute(decommissionAttribute, listener, timeOutForNodeDecommission); } else { throw new NotClusterManagerException( "node [" @@ -200,7 +201,8 @@ public void onFailure(Exception e) { */ private void registerDecommissionAttribute( final DecommissionAttribute decommissionAttribute, - final ActionListener listener + final ActionListener listener, + final TimeValue timeOutForNodeDecommission ) { clusterService.submitStateUpdateTask( "put_decommission [" + decommissionAttribute + "]", @@ -257,13 +259,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert DecommissionStatus.DECOMMISSION_INIT.equals(decommissionAttributeMetadata.status()); listener.onResponse(new ClusterStateUpdateResponse(true)); - initiateGracefulDecommission(); + initiateGracefulDecommission(timeOutForNodeDecommission); } } ); } - private void initiateGracefulDecommission() { + private void initiateGracefulDecommission(final TimeValue timeOutForNodeDecommission) { decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.DECOMMISSION_IN_PROGRESS, @@ -275,7 +277,7 @@ public void onResponse(Void unused) { DecommissionStatus.DECOMMISSION_IN_PROGRESS ); // TODO - should trigger weigh away here and on successful weigh away -> fail the decommissioned nodes - failDecommissionedNodes(clusterService.getClusterApplierService().state()); + failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission); } @Override @@ -292,7 +294,7 @@ public void onFailure(Exception e) { ); } - private void failDecommissionedNodes(ClusterState state) { + private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) { DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().custom(DecommissionAttributeMetadata.TYPE); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DECOMMISSION_IN_PROGRESS) : "unexpected status encountered while decommissioning nodes"; @@ -307,6 +309,7 @@ private void failDecommissionedNodes(ClusterState state) { awarenessValues, "nodes-decommissioned", TimeValue.timeValueSeconds(30L), // TODO - read timeout from request while integrating with API + timeOutForNodeDecommission, new ActionListener() { @Override public void onResponse(Void unused) {