Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add checks to skip remote uploads after shard is closed #13998

Merged
merged 3 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe()
|| isRemoteSegmentStoreInSync() == false;
// Ignore syncing segments if the underlying shard is closed
// This also makes sure that retries are not scheduled for shards
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
// with failed syncSegments invocation after they are closed
if (shardClosed()) {
logger.info("Shard is already closed, will stop scheduling retries");
return false;
}
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
}
Expand Down Expand Up @@ -607,6 +614,15 @@ public void onFailure(String file) {
};
}

/**
* Checks if the underlying IndexShard instance is closed
*
* @return true if it is closed, false otherwise
*/
private boolean shardClosed() {
return indexShard.state() == IndexShardState.CLOSED;
}

@Override
protected Logger getLogger() {
return logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
Expand Down Expand Up @@ -470,6 +471,25 @@ public void testRefreshPersistentFailure() throws Exception {
assertFalse("remote store should not in sync", tuple.v1().isRemoteSegmentStoreInSync());
}

public void testRefreshPersistentFailureAndIndexShardClosed() throws Exception {
int succeedOnAttempt = 3;
shourya035 marked this conversation as resolved.
Show resolved Hide resolved
int closeShardOnAttempt = 1;
CountDownLatch refreshCountLatch = new CountDownLatch(1);
CountDownLatch successLatch = new CountDownLatch(10);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
true,
closeShardOnAttempt
);
// Giving 10ms for some iterations of remote refresh upload
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
RemoteStoreRefreshListener listener = tuple.v1();
assertFalse("remote store should not in sync", listener.isRemoteSegmentStoreInSync());
assertFalse(listener.getRetryScheduledStatus());
}

private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception {
assertBusy(() -> {
assertEquals(0, segmentTracker.getBytesLag());
Expand Down Expand Up @@ -548,6 +568,49 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int totalAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
int checkpointPublishSucceedOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm,
boolean testUploadTimeout
) throws IOException {
return mockIndexShardWithRetryAndScheduleRefresh(
totalAttempt,
refreshCountLatch,
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch,
mockPrimaryTerm,
testUploadTimeout,
false,
0
);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
boolean closedShard,
int closeShardAfterAttempt
) throws IOException {
CountDownLatch noOpLatch = new CountDownLatch(0);
return mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
1,
noOpLatch,
true,
false,
closedShard,
closeShardAfterAttempt
);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
Expand All @@ -562,7 +625,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
succeedCheckpointPublishOnAttempt,
reachedCheckpointPublishLatch,
true,
false
false,
false,
0
);
}

Expand All @@ -573,7 +638,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm,
boolean testUploadTimeout
boolean testUploadTimeout,
boolean closeShard,
int closeShardAfterAttempt
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -601,7 +668,6 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
IndexShard shard = mock(IndexShard.class);
Store store = mock(Store.class);
when(shard.store()).thenReturn(store);
when(shard.state()).thenReturn(IndexShardState.STARTED);
when(store.directory()).thenReturn(indexShard.store().directory());

// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand Down Expand Up @@ -663,6 +729,14 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
return indexShard.getLatestReplicationCheckpoint();
})).when(shard).computeReplicationCheckpoint(any());

doAnswer((invocationOnMock -> {
if (closeShard && counter.get() == closeShardAfterAttempt) {
logger.info("Closing shard...");
return IndexShardState.CLOSED;
}
return IndexShardState.STARTED;
})).when(shard).state();

doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
successLatch.countDown();
Expand Down
Loading