diff --git a/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java index 757275932c5f1..80daefc4482fc 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java @@ -13,6 +13,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -129,6 +130,12 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole ); scheduled = true; getLogger().info("Scheduled retry with didRefresh={}", didRefresh); + } catch (OpenSearchRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + getLogger().info("Scheduling retry with didRefresh={} failed due to executor shut down", didRefresh); + } else { + throw e; + } } finally { if (scheduled == false) { retryScheduled.set(false); diff --git a/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java index a0641c365a2a1..e0ad09efac367 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java @@ -316,7 +316,32 @@ protected Logger getLogger() { public void testScheduleRetryAfterClose() throws Exception { // This tests that once the listener has been closed, even the retries would not be scheduled. final AtomicLong runCount = new AtomicLong(); - ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount); + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + Thread thread2 = new Thread(() -> { + try { + Thread.sleep(500); + testRefreshListener.drainRefreshes(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + assertBusy(() -> assertEquals(1, runCount.get())); + assertRefreshListenerClosed(testRefreshListener); + } + + private ReleasableRetryableRefreshListener getRetryableRefreshListener(AtomicLong runCount) { + return new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -341,6 +366,11 @@ protected String getRetryThreadPoolName() { return ThreadPool.Names.REMOTE_REFRESH_RETRY; } + @Override + protected boolean isRetryEnabled() { + return true; + } + @Override protected TimeValue getNextRetryInterval() { try { @@ -351,6 +381,12 @@ protected TimeValue getNextRetryInterval() { return TimeValue.timeValueMillis(100); } }; + } + + public void testScheduleRetryAfterThreadpoolShutdown() throws Exception { + // This tests that once the thread-pool is shut down, the exception is handled. + final AtomicLong runCount = new AtomicLong(); + ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount); Thread thread1 = new Thread(() -> { try { testRefreshListener.afterRefresh(true); @@ -361,7 +397,7 @@ protected TimeValue getNextRetryInterval() { Thread thread2 = new Thread(() -> { try { Thread.sleep(500); - testRefreshListener.drainRefreshes(); + threadPool.shutdown(); } catch (InterruptedException e) { throw new AssertionError(e); } @@ -371,7 +407,7 @@ protected TimeValue getNextRetryInterval() { thread1.join(); thread2.join(); assertBusy(() -> assertEquals(1, runCount.get())); - assertRefreshListenerClosed(testRefreshListener); + assertFalse(testRefreshListener.getRetryScheduledStatus()); } public void testConcurrentScheduleRetry() throws Exception {