Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Apr 30, 2024
1 parent a8008e2 commit 2d3ac19
Showing 1 changed file with 72 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.opensearch.tasks.TaskManager;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

Expand Down Expand Up @@ -1068,10 +1069,45 @@ void startNode() {
Settings closeForRestart(RestartCallback callback, int minClusterManagerNodes) throws Exception {
assert callback != null;
close();

return doAfterNodeClosed(callback, minClusterManagerNodes);
}

/**
* closes the node ungracefully and prepares it to be restarted
*/
Settings closeForRestartUngracefully(RestartCallback callback, int minClusterManagerNodes) throws Exception {
assert callback != null;
terminateAllNodeThreads();

return doAfterNodeClosed(callback, minClusterManagerNodes);
}

private void terminateAllNodeThreads() throws InterruptedException {
logger.info("Dismissing threadpool of node for ungraceful node restart {}", name);
node.injector().getInstance(ThreadPool.class).shutdownNow();
logger.info("Dismissing any lingering threads of node for ungraceful node restart {}", name);
Set<Thread> nodeThreads;
while (true) {
nodeThreads = Thread.getAllStackTraces()
.keySet()
.stream()
.filter(thread -> thread.getName().contains(name))
.collect(Collectors.toSet());
if (nodeThreads.isEmpty()) break;

for (Thread thread : nodeThreads) {
logger.info("Interrupting thread for ungraceful node restart {}", thread.getName());
thread.interrupt();
}
}
}

private Settings doAfterNodeClosed(RestartCallback callback, int minClusterManagerNodes) throws Exception {
removeNode(this);
Settings callbackSettings = callback.onNodeStopped(name);
assert callbackSettings != null;
Settings.Builder newSettings = Settings.builder();
Builder newSettings = Settings.builder();
newSettings.put(callbackSettings);
if (minClusterManagerNodes >= 0) {
if (INITIAL_CLUSTER_MANAGER_NODES_SETTING.exists(callbackSettings) == false) {
Expand Down Expand Up @@ -2020,20 +2056,47 @@ public synchronized void rollingRestart(RestartCallback callback) throws Excepti
}

private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
Set<String> excludedNodeIds = doBeforeNodeClose(nodeAndClient, false);
final Settings newSettings = nodeAndClient.closeForRestart(
callback,
autoManageClusterManagerNodes ? getMinClusterManagerNodes(getClusterManagerNodesCount()) : -1
);

doAfterNodeClose(nodeAndClient, callback, excludedNodeIds, newSettings);
}

private void restartNodeUngracefully(NodeAndClient nodeAndClient, RestartCallback callback) throws Exception {
Set<String> excludedNodeIds = doBeforeNodeClose(nodeAndClient, true);
final Settings newSettings = nodeAndClient.closeForRestartUngracefully(
callback,
autoManageClusterManagerNodes ? getMinClusterManagerNodes(getClusterManagerNodesCount()) : -1
);

doAfterNodeClose(nodeAndClient, callback, excludedNodeIds, newSettings);
}

private Set<String> doBeforeNodeClose(NodeAndClient nodeAndClient, boolean isUngracefulRestart) {
assert Thread.holdsLock(this);
logger.info("Restarting node [{}] ", nodeAndClient.name);
if (isUngracefulRestart) {
logger.info("Restarting node ungracefully [{}] ", nodeAndClient.name);
} else {
logger.info("Restarting node [{}] ", nodeAndClient.name);
}

if (activeDisruptionScheme != null) {
activeDisruptionScheme.removeFromNode(nodeAndClient.name, this);
}

Set<String> excludedNodeIds = excludeClusterManagers(Collections.singleton(nodeAndClient));
return excludedNodeIds;
}

final Settings newSettings = nodeAndClient.closeForRestart(
callback,
autoManageClusterManagerNodes ? getMinClusterManagerNodes(getClusterManagerNodesCount()) : -1
);

private void doAfterNodeClose(
NodeAndClient nodeAndClient,
RestartCallback callback,
Set<String> excludedNodeIds,
Settings newSettings
) {
removeExclusions(excludedNodeIds);

nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient)));
Expand Down Expand Up @@ -2078,7 +2141,8 @@ private Set<String> excludeClusterManagers(Collection<NodeAndClient> nodeAndClie

logger.info("adding voting config exclusions {} prior to restart/shutdown", excludedNodeNames);
try {
client().execute(
Client client = getRandomNodeAndClient(node -> excludedNodeNames.contains(node.name) == false).client();
client.execute(
AddVotingConfigExclusionsAction.INSTANCE,
new AddVotingConfigExclusionsRequest(excludedNodeNames.toArray(Strings.EMPTY_ARRAY))
).get();
Expand Down

0 comments on commit 2d3ac19

Please sign in to comment.