diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 20afd7b2f3568..8cde5e9cf24a3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -179,6 +179,13 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) { // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. || isRefreshAfterCommitSafe() || isRemoteSegmentStoreInSync() == false; + // Ignore syncing segments if the underlying shard is closed + // This also makes sure that retries are not scheduled for shards + // with failed syncSegments invocation after they are closed + if (shardClosed()) { + logger.info("Shard is already closed, will stop scheduling retries"); + return false; + } if (shouldSync || skipPrimaryTermCheck) { return shouldSync; } @@ -607,6 +614,15 @@ public void onFailure(String file) { }; } + /** + * Checks if the underlying IndexShard instance is closed + * + * @return true if it is closed, false otherwise + */ + private boolean shardClosed() { + return indexShard.state() == IndexShardState.CLOSED; + } + @Override protected Logger getLogger() { return logger; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index bb0776e0ced25..67787e8583930 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; @@ -470,6 +471,25 @@ public void testRefreshPersistentFailure() throws Exception { assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync()); } + public void testRefreshPersistentFailureAndIndexShardClosed() throws Exception { + int succeedOnAttempt = 3; + int closeShardOnAttempt = 1; + CountDownLatch refreshCountLatch = new CountDownLatch(1); + CountDownLatch successLatch = new CountDownLatch(10); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + true, + closeShardOnAttempt + ); + // Giving 10ms for some iterations of remote refresh upload + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + RemoteStoreRefreshListener listener = tuple.v1(); + assertFalse("remote store should not in sync", listener.isRemoteSegmentStoreInSync()); + assertFalse(listener.getRetryScheduledStatus()); + } + private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception { assertBusy(() -> { assertEquals(0, segmentTracker.getBytesLag()); @@ -548,6 +568,49 @@ private Tuple mockIn return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int totalAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int checkpointPublishSucceedOnAttempt, + CountDownLatch reachedCheckpointPublishLatch, + boolean mockPrimaryTerm, + boolean testUploadTimeout + ) throws IOException { + return mockIndexShardWithRetryAndScheduleRefresh( + totalAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch, + mockPrimaryTerm, + testUploadTimeout, + false, + 0 + ); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + boolean closedShard, + int closeShardAfterAttempt + ) throws IOException { + CountDownLatch noOpLatch = new CountDownLatch(0); + return mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + 1, + noOpLatch, + true, + false, + closedShard, + closeShardAfterAttempt + ); + } + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, @@ -562,7 +625,9 @@ private Tuple mockIn succeedCheckpointPublishOnAttempt, reachedCheckpointPublishLatch, true, - false + false, + false, + 0 ); } @@ -573,7 +638,9 @@ private Tuple mockIn int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch, boolean mockPrimaryTerm, - boolean testUploadTimeout + boolean testUploadTimeout, + boolean closeShard, + int closeShardAfterAttempt ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -601,7 +668,6 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); - when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -663,6 +729,14 @@ private Tuple mockIn return indexShard.getLatestReplicationCheckpoint(); })).when(shard).computeReplicationCheckpoint(any()); + doAnswer((invocationOnMock -> { + if (closeShard && counter.get() == closeShardAfterAttempt) { + logger.info("Closing shard..."); + return IndexShardState.CLOSED; + } + return IndexShardState.STARTED; + })).when(shard).state(); + doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { successLatch.countDown();