Skip to content

Commit

Permalink
Add retry logic to wait for active threads to finish
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Dec 4, 2024
1 parent 95fe862 commit 9c24aca
Showing 1 changed file with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,48 @@ public void stop() throws IOException {
Client client = clientNode().getInternalNodeClient();
AdminClient adminClient = client.admin();

final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();
// TODO Try the following code 3 times to see if there are active threads

int maxRetries = 3;
int retryCount = 0;
boolean threadsActive = true;

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
while (retryCount < maxRetries && threadsActive) {
try {
threadsActive = false;
final NodesStatsResponse nodesStatsResponse = adminClient.cluster()
.nodesStats(new NodesStatsRequest().addMetric(NodesStatsRequest.Metric.THREAD_POOL.metricName()))
.actionGet();

for (NodeStats stats : nodesStatsResponse.getNodes()) {
for (ThreadPoolStats.Stats stat : requireNonNull(stats.getThreadPool())) {
if ("management".equals(stat.getName())) {
continue;
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
threadsActive = true;
break;
}
}
if (threadsActive) {
break;
}
}
if (stat.getActive() > 0) {
log.warn("Thread pool {} has {} active threads", stat.getName(), stat.getActive());
throw new IOException("Thread pool " + stat.getName() + " has " + stat.getActive() + " active threads");

if (threadsActive && retryCount < maxRetries - 1) {
// Add a small delay between retries
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting between retries", e);
}
retryCount++;
}

if (threadsActive) {
throw new IOException("Thread pools still have active threads after " + maxRetries + " attempts");
}

List<CompletableFuture<Boolean>> stopFutures = new ArrayList<>();
Expand Down

0 comments on commit 9c24aca

Please sign in to comment.