Skip to content

Commit

Permalink
Handle OpenSearchRejectedExecutionException while retrying refresh (#…
Browse files Browse the repository at this point in the history
…13301)

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Apr 23, 2024
1 parent 41a3055 commit ae49fd2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -129,6 +130,12 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole
);
scheduled = true;
getLogger().info("Scheduled retry with didRefresh={}", didRefresh);
} catch (OpenSearchRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
getLogger().info("Scheduling retry with didRefresh={} failed due to executor shut down", didRefresh);
} else {
throw e;
}
} finally {
if (scheduled == false) {
retryScheduled.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,32 @@ protected Logger getLogger() {
public void testScheduleRetryAfterClose() throws Exception {
// This tests that once the listener has been closed, even the retries would not be scheduled.
final AtomicLong runCount = new AtomicLong();
ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) {
ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount);
Thread thread1 = new Thread(() -> {
try {
testRefreshListener.afterRefresh(true);
} catch (IOException e) {
throw new AssertionError(e);
}
});
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(500);
testRefreshListener.drainRefreshes();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
assertBusy(() -> assertEquals(1, runCount.get()));
assertRefreshListenerClosed(testRefreshListener);
}

private ReleasableRetryableRefreshListener getRetryableRefreshListener(AtomicLong runCount) {
return new ReleasableRetryableRefreshListener(threadPool) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
try {
Expand All @@ -341,6 +366,11 @@ protected String getRetryThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
}

@Override
protected boolean isRetryEnabled() {
return true;
}

@Override
protected TimeValue getNextRetryInterval() {
try {
Expand All @@ -351,6 +381,12 @@ protected TimeValue getNextRetryInterval() {
return TimeValue.timeValueMillis(100);
}
};
}

public void testScheduleRetryAfterThreadpoolShutdown() throws Exception {
// This tests that once the thread-pool is shut down, the exception is handled.
final AtomicLong runCount = new AtomicLong();
ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount);
Thread thread1 = new Thread(() -> {
try {
testRefreshListener.afterRefresh(true);
Expand All @@ -361,7 +397,7 @@ protected TimeValue getNextRetryInterval() {
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(500);
testRefreshListener.drainRefreshes();
threadPool.shutdown();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Expand All @@ -371,7 +407,7 @@ protected TimeValue getNextRetryInterval() {
thread1.join();
thread2.join();
assertBusy(() -> assertEquals(1, runCount.get()));
assertRefreshListenerClosed(testRefreshListener);
assertFalse(testRefreshListener.getRetryScheduledStatus());
}

public void testConcurrentScheduleRetry() throws Exception {
Expand Down

0 comments on commit ae49fd2

Please sign in to comment.